use std::{collections::HashMap, fmt::Display}; use jack::{AudioIn, AudioOut, ClientOptions, NotificationHandler, Port, ProcessScope}; use oximedia_metering::vu_meter::VuMeter; use serde::{Deserialize, Serialize}; use tokio::sync::*; use crate::events::AudioRecordRequest; #[derive(Debug)] #[allow(unused)] pub enum AudioError { Jack(jack::Error), AudioBufferSend(mpsc::error::SendError>), AudioBufferRecv(mpsc::error::TryRecvError), AudioRequestSend(watch::error::SendError) } impl From for AudioError { fn from(value: jack::Error) -> Self { Self::Jack(value) } } impl From>> for AudioError { fn from(value: mpsc::error::SendError>) -> Self { Self::AudioBufferSend(value) } } impl From for AudioError { fn from(value: mpsc::error::TryRecvError) -> Self { Self::AudioBufferRecv(value) } } impl From> for AudioError { fn from(value: watch::error::SendError) -> Self { Self::AudioRequestSend(value) } } #[derive(Debug)] pub struct JackClientRef { killswitch: Option> } impl Drop for JackClientRef { fn drop(&mut self) { self.killswitch.take().expect("Killswitch was already dropped!").send(()).expect("Cannot fire Jack killswitch"); } } #[derive(Debug)] pub struct AudioInputControl { volume_src: watch::Receiver, _jack_client: JackClientRef } impl AudioInputControl { pub async fn next(&mut self) -> Result { self.volume_src.changed().await?; Ok(*self.volume_src.borrow_and_update()) } } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy, Hash)] enum Role { Mic, Tts, Sfx } impl Display for Role { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let val = match self { Self::Mic => "Microphone Input", Self::Tts => "TTS Output", Self::Sfx => "SFX Output" }; f.write_str(val) } } impl Role { fn is_input(&self) -> bool { match self { Role::Mic => true, _ => false } } } #[derive(Debug)] pub struct AudioInStream { pub src: mpsc::Receiver>, pub sample_rate: u32 } #[derive(Debug)] pub struct AudioOutStream { pub sink: mpsc::Sender>, pub sample_rate: u32 } struct AudioSource { port: Port, sample_sink: mpsc::Sender>, meter: VuMeter } impl AudioSource { fn new(client: &jack::Client, name: &str) -> Result<(Self, AudioInStream), AudioError> { let (sample_sink, receiver) = mpsc::channel(32); let port = client.register_port(name, AudioIn::default())?; Ok((AudioSource { port, sample_sink, meter: VuMeter::new(client.sample_rate().into(), 1, None) }, AudioInStream { sample_rate: client.sample_rate(), src: receiver })) } fn process(&mut self, scope: &ProcessScope) -> Result, AudioError> { if self.port.connected_count()? > 0 { let buf: Vec<_> = self.port.as_slice(scope).iter().copied().collect(); self.meter.process_interleaved(&buf); self.sample_sink.blocking_send(buf)?; Ok(self.meter.channel_vu(0)) } else { Ok(None) } } } #[derive(Debug)] struct AudioSink { output_buf: Vec, port: Port, sample_src: mpsc::Receiver> } impl AudioSink { fn new(client: &jack::Client, name: &str) -> Result<(Self, AudioOutStream), AudioError> { let (sender, sample_src) = mpsc::channel(32); let port = client.register_port(name, AudioOut::default())?; Ok((AudioSink { output_buf: Vec::with_capacity(1024), port, sample_src }, AudioOutStream { sample_rate: client.sample_rate(), sink: sender, })) } fn process(&mut self, scope: &ProcessScope) -> Result<(), AudioError> { let mut next_outbuf = self.sample_src.try_recv()?; self.output_buf.append(&mut next_outbuf); if self.port.connected_count()? > 0 && !self.output_buf.is_empty() { let outbuf = self.port.as_mut_slice(scope); let mut next_segment: Vec = self.output_buf.drain(0..(outbuf.len()).min(self.output_buf.len())).collect(); let underrun = outbuf.len() - next_segment.len(); if underrun > 0 { for _ in 0..underrun { next_segment.push(0.); } } outbuf.copy_from_slice(&next_segment); } Ok(()) } } #[derive(Debug, Default, Clone, Serialize, Deserialize)] struct AudioConfig { connections: HashMap> } impl AudioConfig { pub fn load() -> Self { if let Ok(contents) = std::fs::read_to_string("audio.json") { match serde_json::from_str(contents.as_str()) { Err(err) => { log::error!("Failed to load audio.json: {:?}", err); Default::default() }, Ok(ret) => ret } } else { Default::default() } } } #[derive(Debug)] struct Notify { config: AudioConfig, ports: HashMap> } impl NotificationHandler for Notify { fn ports_connected( &mut self, client: &jack::Client, port_id_a: jack::PortId, port_id_b: jack::PortId, are_connected: bool, ) { let port_src = client.port_by_id(port_id_a).unwrap(); let port_dst = client.port_by_id(port_id_b).unwrap(); let port_match = self.ports.iter().filter_map(|(role, local_port)| { if role.is_input() && *local_port == port_dst { Some((role, port_src.name())) } else if *local_port == port_src { Some((role, port_dst.name())) } else { None } }).next(); if let Some((role, Ok(target_port))) = port_match { let cfg_slot = self.config.connections.entry(*role).or_insert_with(|| Default::default()); if are_connected { log::info!("{} connected to {}", role, target_port); cfg_slot.push(target_port); } else { log::info!("{} disconnected from {}", role, target_port); cfg_slot.retain(|x| { x != &target_port} ); } let save_data = serde_json::to_string_pretty(&self.config).unwrap(); if let Err(err) = std::fs::write("audio.json", save_data) { log::error!("Failed to write audio.json: {:?}", err); } } } } pub async fn start_audio_input() -> (AudioInputControl, AudioInStream, AudioOutStream, AudioOutStream) { let (exit_tx, exit_rx) = oneshot::channel(); let config = AudioConfig::load(); let (volume_sink, volume_src) = watch::channel(0.); let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).expect("Could not create JACK client!"); let (mut tts_sink, tts_stream) = AudioSink::new(&client, "tts-out").expect("Could not create TTS sink"); let (mut sfx_sink, sfx_stream) = AudioSink::new(&client, "sfx-out").expect("Could not create SFX sink"); let (mut mic_src, mic_stream) = AudioSource::new(&client, "microphone-in").expect("Could not create microphone source"); let notifier = Notify { config, ports: HashMap::from_iter([ (Role::Mic, mic_src.port.clone_unowned()), (Role::Tts, tts_sink.port.clone_unowned()), (Role::Sfx, sfx_sink.port.clone_unowned()) ]) }; for (role, local_port) in ¬ifier.ports { if let Some(targets) = notifier.config.connections.get(role) { for peer_name in targets { if let Some(peer) = client.port_by_name(peer_name) { let (src, dst) = if role.is_input() { (&peer, local_port) } else { (local_port, &peer) }; if let Err(err) = client.connect_ports(&src, &dst) { log::error!("Failed to reconnect {} to {}: {:?}", role, peer_name, err); } else { log::info!("Reconnected {} to {}", role, peer_name); } } } } } let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| { match mic_src.process(scope) { Ok(Some(next_vu)) => { volume_sink.send_if_modified(|v| { let next_vu = (next_vu * 100.0).round() / 100.0; if *v != next_vu { *v = next_vu; true } else { false } }); }, Ok(None) => (), Err(err) => { log::error!("Error while processing mic source: {:?}", err); return jack::Control::Quit } } for sink in [&mut tts_sink, &mut sfx_sink] { if let Err(err) = sink.process(scope) { log::error!("Error while processing {:?} audio sink: {:?}", sink, err); } } jack::Control::Continue }); tokio::spawn(async move { let async_client = client.activate_async(notifier, handler).expect("Unable to start jack client!"); if let Err(err) = exit_rx.await { log::warn!("Premature killswitch triggered: {:?}", err); } async_client.deactivate().expect("Unable to stop the jack client"); }); (AudioInputControl { volume_src, _jack_client: JackClientRef { killswitch: Some(exit_tx) } }, mic_stream, tts_stream, sfx_stream) }