use jack::{AudioIn, AudioOut, ClientOptions, NotificationHandler}; use oximedia_metering::vu_meter::VuMeter; use serde::{Deserialize, Serialize}; use tokio::sync::*; #[derive(Debug)] pub struct JackClientRef { killswitch: Option> } impl Drop for JackClientRef { fn drop(&mut self) { self.killswitch.take().unwrap().send(()).unwrap(); } } #[derive(Debug)] pub struct AudioInputControl { volume_src: watch::Receiver, _jack_client: JackClientRef } impl AudioInputControl { pub async fn next(&mut self) -> f64 { self.volume_src.changed().await.unwrap(); *self.volume_src.borrow_and_update() } } #[derive(Debug)] pub struct MicStream { pub src: mpsc::Receiver>, pub sample_rate: u32 } #[derive(Debug)] pub struct TtsOutStream { pub sink: mpsc::Sender>, pub sample_rate: u32 } #[derive(Debug)] pub struct SfxOutStream { pub sink: mpsc::Sender>, pub sample_rate: u32 } #[derive(Debug, Default, Clone, Serialize, Deserialize)] struct AudioConfig { mic_in_connections: Vec, tts_out_connections: Vec, sfx_out_connections: Vec, } impl AudioConfig { pub fn load() -> Self { if let Ok(contents) = std::fs::read_to_string("audio.json") { serde_json::from_str(contents.as_str()).unwrap() } else { Default::default() } } } #[derive(Debug)] struct Notify { config: AudioConfig, mic_port: jack::Port, tts_port: jack::Port, sfx_port: jack::Port, } impl NotificationHandler for Notify { fn ports_connected( &mut self, client: &jack::Client, port_id_a: jack::PortId, port_id_b: jack::PortId, are_connected: bool, ) { let port_a = client.port_by_id(port_id_a).unwrap(); let port_b = client.port_by_id(port_id_b).unwrap(); let (stream_name, other_port, target_cfg) = if port_b == self.mic_port { ("Microphone input", port_a, &mut self.config.mic_in_connections) } else if port_a == self.tts_port { ("TTS output", port_b, &mut self.config.tts_out_connections) } else if port_a == self.sfx_port { ("SFX output", port_b, &mut self.config.sfx_out_connections) } else { return; }; if let Ok(port_name) = other_port.name() { if are_connected { log::info!("{} connected to {}", stream_name, port_name); target_cfg.push(port_name); } else { log::info!("{} disconnected from {}", stream_name, port_name); target_cfg.retain(|x| { x != &port_name} ); } let save_data = serde_json::to_string_pretty(&self.config).unwrap(); std::fs::write("audio.json", save_data).unwrap(); } } } pub async fn start_audio_input() -> (AudioInputControl, MicStream, TtsOutStream, SfxOutStream) { let (exit_tx, exit_rx) = oneshot::channel(); let config = AudioConfig::load(); let (mic_audio_sink, mic_audio_src) = mpsc::channel(32); let (tts_audio_sink, mut tts_audio_src) = mpsc::channel(32); let (sfx_audio_sink, mut sfx_audio_src) = mpsc::channel(32); let (volume_sink, volume_src) = watch::channel(0.); let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap(); let mic_port = client.register_port("microphone-in", AudioIn::default()).unwrap(); let mut tts_port = client.register_port("tts-out", AudioOut::default()).unwrap(); let mut sfx_port = client.register_port("sfx-out", AudioOut::default()).unwrap(); let rate = client.sample_rate(); let mic_name = mic_port.name().unwrap(); let tts_name = tts_port.name().unwrap(); for port in &config.mic_in_connections { if let Ok(_) = client.connect_ports_by_name(&port, &mic_name) { log::info!("Connected mic to {}", port); } else { log::warn!("Failed to reconnect mic to {}.", port); } } for port in &config.tts_out_connections { if let Ok(_) = client.connect_ports_by_name(&tts_name, &port) { log::info!("Connected TTS output to {}", port); } else { log::warn!("Failed to reconnect TTS output to {}.", port); } } let notifier = Notify { config, mic_port: mic_port.clone_unowned(), tts_port: tts_port.clone_unowned(), sfx_port: sfx_port.clone_unowned() }; let mut meter = VuMeter::new(rate.into(), 1, None); let mut tts_output_buf = vec![]; let mut sfx_output_buf = vec![]; tts_output_buf.reserve(1024); sfx_output_buf.reserve(1024); let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| { if mic_port.connected_count().unwrap() > 0 { let buf: Vec<_> = mic_port.as_slice(scope).iter().copied().collect(); meter.process_interleaved(&buf); mic_audio_sink.blocking_send(buf).unwrap(); volume_sink.send_if_modified(|v| { let next_vu = meter.channel_vu(0).unwrap(); let next_vu = (next_vu * 100.0).round() / 100.0; if *v != next_vu { *v = next_vu; true } else { false } }); } for (src, output, port) in [ (&mut tts_audio_src, &mut tts_output_buf, &mut tts_port), (&mut sfx_audio_src, &mut sfx_output_buf, &mut sfx_port) ] { if let Ok(mut next_outbuf) = src.try_recv() { output.append(&mut next_outbuf); } if port.connected_count().unwrap() > 0 && !output.is_empty() { let outbuf = port.as_mut_slice(scope); let mut next_segment: Vec = output.drain(0..(outbuf.len()).min(output.len())).collect(); let underrun = outbuf.len() - next_segment.len(); if underrun > 0 { for _ in 0..underrun { next_segment.push(0.); } } outbuf.copy_from_slice(&next_segment); } } /*if let Ok(mut next_outbuf) = tts_audio_src.try_recv() { tts_output_buf.append(&mut next_outbuf); } if tts_port.connected_count().unwrap() > 0 && !tts_output_buf.is_empty() { let outbuf = tts_port.as_mut_slice(scope); let mut next_segment: Vec = tts_output_buf.drain(0..(outbuf.len()).min(tts_output_buf.len())).collect(); let underrun = outbuf.len() - next_segment.len(); if underrun > 0 { for _ in 0..underrun { next_segment.push(0.); } } outbuf.copy_from_slice(&next_segment); }*/ jack::Control::Continue }); tokio::spawn(async move { let async_client = client.activate_async(notifier, handler).unwrap(); exit_rx.await.unwrap(); async_client.deactivate().unwrap(); }); (AudioInputControl { volume_src, _jack_client: JackClientRef { killswitch: Some(exit_tx) } }, MicStream { sample_rate: rate, src: mic_audio_src }, TtsOutStream { sample_rate: rate, sink: tts_audio_sink }, SfxOutStream { sample_rate: rate, sink: sfx_audio_sink }) }