Files
eva-pwm-cohost/src/audio.rs
T

200 lines
6.1 KiB
Rust

use jack::{AudioIn, AudioOut, ClientOptions, NotificationHandler};
use oximedia_metering::vu_meter::VuMeter;
use serde::{Deserialize, Serialize};
use tokio::sync::*;
#[derive(Debug)]
pub struct JackClientRef {
killswitch: Option<oneshot::Sender<()>>
}
impl Drop for JackClientRef {
fn drop(&mut self) {
self.killswitch.take().unwrap().send(()).unwrap();
}
}
#[derive(Debug)]
pub struct AudioInputControl {
volume_src: watch::Receiver<f64>,
_jack_client: JackClientRef
}
impl AudioInputControl {
pub async fn next(&mut self) -> f64 {
self.volume_src.changed().await.unwrap();
*self.volume_src.borrow_and_update()
}
}
#[derive(Debug)]
pub struct MicStream {
pub src: mpsc::Receiver<Vec<f32>>,
pub sample_rate: u32
}
#[derive(Debug)]
pub struct TtsOutStream {
pub sink: mpsc::Sender<Vec<f32>>,
pub sample_rate: u32
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct AudioConfig {
mic_in_connections: Vec<String>,
tts_out_connections: Vec<String>
}
impl AudioConfig {
pub fn load() -> Self {
if let Ok(contents) = std::fs::read_to_string("audio.json") {
serde_json::from_str(contents.as_str()).unwrap()
} else {
Default::default()
}
}
}
#[derive(Debug)]
struct Notify {
config: AudioConfig,
mic_port: jack::Port<jack::Unowned>,
tts_port: jack::Port<jack::Unowned>,
log: mpsc::Sender<String>
}
impl NotificationHandler for Notify {
fn ports_connected(
&mut self,
client: &jack::Client,
port_id_a: jack::PortId,
port_id_b: jack::PortId,
are_connected: bool,
) {
let port_a = client.port_by_id(port_id_a).unwrap();
let port_b = client.port_by_id(port_id_b).unwrap();
let (stream_name, other_port, target_cfg) = if port_b == self.mic_port {
("Microphone input", port_a, &mut self.config.mic_in_connections)
} else if port_a == self.tts_port {
("TTS output", port_b, &mut self.config.tts_out_connections)
} else {
return;
};
if let Ok(port_name) = other_port.name() {
if are_connected {
self.log.blocking_send(format!("{} connected to {}", stream_name, port_name)).unwrap();
target_cfg.push(port_name);
} else {
self.log.blocking_send(format!("{} disconnected from {}", stream_name, port_name)).unwrap();
target_cfg.retain(|x| { x != &port_name} );
}
let save_data = serde_json::to_string_pretty(&self.config).unwrap();
std::fs::write("audio.json", save_data).unwrap();
}
}
}
pub async fn start_audio_input(messages: &mpsc::Sender<String>) -> (AudioInputControl, MicStream, TtsOutStream) {
let (exit_tx, exit_rx) = oneshot::channel();
let config = AudioConfig::load();
let (mic_audio_sink, mic_audio_src) = mpsc::channel(32);
let (tts_audio_sink, mut tts_audio_src) = mpsc::channel(32);
let (volume_sink, volume_src) = watch::channel(0.);
let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap();
let mic_port = client.register_port("microphone-in", AudioIn::default()).unwrap();
let mut tts_port = client.register_port("tts-out", AudioOut::default()).unwrap();
let rate = client.sample_rate();
let mic_name = mic_port.name().unwrap();
let tts_name = tts_port.name().unwrap();
for port in &config.mic_in_connections {
if let Ok(_) = client.connect_ports_by_name(&port, &mic_name) {
messages.send(format!("Connected mic to {}", port)).await.unwrap();
} else {
messages.send(format!("Failed to reconnect mic to {}.", port)).await.unwrap();
}
}
for port in &config.tts_out_connections {
if let Ok(_) = client.connect_ports_by_name(&tts_name, &port) {
messages.send(format!("Connected TTS output to {}", port)).await.unwrap();
} else {
messages.send(format!("Failed to reconnect TTS output to {}.", port)).await.unwrap();
}
}
let notifier = Notify {
config,
mic_port: mic_port.clone_unowned(),
tts_port: tts_port.clone_unowned(),
log: messages.clone()
};
let mut meter = VuMeter::new(rate.into(), 1, None);
let mut tts_output_buf = vec![];
tts_output_buf.reserve(1024);
let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| {
if mic_port.connected_count().unwrap() > 0 {
let buf: Vec<_> = mic_port.as_slice(scope).iter().copied().collect();
meter.process_interleaved(&buf);
mic_audio_sink.blocking_send(buf).unwrap();
volume_sink.send_if_modified(|v| {
let next_vu = meter.channel_vu(0).unwrap();
if *v != next_vu {
*v = next_vu;
true
} else {
false
}
});
}
if let Ok(mut next_outbuf) = tts_audio_src.try_recv() {
tts_output_buf.append(&mut next_outbuf);
}
if tts_port.connected_count().unwrap() > 0 && !tts_output_buf.is_empty() {
let outbuf = tts_port.as_mut_slice(scope);
let mut next_segment: Vec<f32> = tts_output_buf.drain(0..(outbuf.len()).min(tts_output_buf.len())).collect();
let underrun = outbuf.len() - next_segment.len();
if underrun > 0 {
for _ in 0..underrun {
next_segment.push(0.);
}
}
outbuf.copy_from_slice(&next_segment);
}
jack::Control::Continue
});
tokio::spawn(async move {
let async_client = client.activate_async(notifier, handler).unwrap();
exit_rx.await.unwrap();
async_client.deactivate().unwrap();
});
(AudioInputControl {
volume_src,
_jack_client: JackClientRef { killswitch: Some(exit_tx) }
}, MicStream {
sample_rate: rate,
src: mic_audio_src
}, TtsOutStream {
sample_rate: rate,
sink: tts_audio_sink
})
}