audio: rewrite the audio stack with a more modular architecture and less code

This commit is contained in:
2026-06-17 20:00:01 +02:00
parent e78a2c3215
commit cbf7cbd1dd
5 changed files with 166 additions and 114 deletions
-1
View File
@@ -1,4 +1,3 @@
use log::Record;
use musicbrainz_rs::entity::artist_credit::ArtistCredit; use musicbrainz_rs::entity::artist_credit::ArtistCredit;
use musicbrainz_rs::entity::release::Release; use musicbrainz_rs::entity::release::Release;
use musicbrainz_rs::{ApiEndpointError, entity::recording::Recording}; use musicbrainz_rs::{ApiEndpointError, entity::recording::Recording};
+157 -104
View File
@@ -1,4 +1,6 @@
use jack::{AudioIn, AudioOut, ClientOptions, NotificationHandler}; use std::{collections::HashMap, fmt::Display};
use jack::{AudioIn, AudioOut, ClientOptions, NotificationHandler, Port, ProcessScope};
use oximedia_metering::vu_meter::VuMeter; use oximedia_metering::vu_meter::VuMeter;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::*; use tokio::sync::*;
@@ -27,29 +29,123 @@ impl AudioInputControl {
} }
} }
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy, Hash)]
enum Role {
Mic,
Tts,
Sfx
}
impl Display for Role {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let val = match self {
Self::Mic => "Microphone Input",
Self::Tts => "TTS Output",
Self::Sfx => "SFX Output"
};
f.write_str(val)
}
}
impl Role {
fn is_input(&self) -> bool {
match self {
Role::Mic => true,
_ => false
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct MicStream { pub struct AudioInStream {
pub src: mpsc::Receiver<Vec<f32>>, pub src: mpsc::Receiver<Vec<f32>>,
pub sample_rate: u32 pub sample_rate: u32
} }
#[derive(Debug)] #[derive(Debug)]
pub struct TtsOutStream { pub struct AudioOutStream {
pub sink: mpsc::Sender<Vec<f32>>, pub sink: mpsc::Sender<Vec<f32>>,
pub sample_rate: u32 pub sample_rate: u32
} }
struct AudioSource {
port: Port<jack::AudioIn>,
sample_sink: mpsc::Sender<Vec<f32>>,
meter: VuMeter
}
impl AudioSource {
fn new(client: &jack::Client, name: &str) -> (Self, AudioInStream) {
let (sample_sink, receiver) = mpsc::channel(32);
let port = client.register_port(name, AudioIn::default()).unwrap();
(AudioSource {
port,
sample_sink,
meter: VuMeter::new(client.sample_rate().into(), 1, None)
}, AudioInStream {
sample_rate: client.sample_rate(),
src: receiver
})
}
fn process(&mut self, scope: &ProcessScope) -> Option<f64> {
if self.port.connected_count().unwrap() > 0 {
let buf: Vec<_> = self.port.as_slice(scope).iter().copied().collect();
self.meter.process_interleaved(&buf);
self.sample_sink.blocking_send(buf).unwrap();
self.meter.channel_vu(0)
} else {
None
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct SfxOutStream { struct AudioSink {
pub sink: mpsc::Sender<Vec<f32>>, output_buf: Vec<f32>,
pub sample_rate: u32 port: Port<jack::AudioOut>,
sample_src: mpsc::Receiver<Vec<f32>>
}
impl AudioSink {
fn new(client: &jack::Client, name: &str) -> (Self, AudioOutStream) {
let (sender, sample_src) = mpsc::channel(32);
let port = client.register_port(name, AudioOut::default()).unwrap();
(AudioSink {
output_buf: Vec::with_capacity(1024),
port,
sample_src
}, AudioOutStream {
sample_rate: client.sample_rate(),
sink: sender,
})
}
fn process(&mut self, scope: &ProcessScope) {
if let Ok(mut next_outbuf) = self.sample_src.try_recv() {
self.output_buf.append(&mut next_outbuf);
}
if self.port.connected_count().unwrap() > 0 && !self.output_buf.is_empty() {
let outbuf = self.port.as_mut_slice(scope);
let mut next_segment: Vec<f32> = self.output_buf.drain(0..(outbuf.len()).min(self.output_buf.len())).collect();
let underrun = outbuf.len() - next_segment.len();
if underrun > 0 {
for _ in 0..underrun {
next_segment.push(0.);
}
}
outbuf.copy_from_slice(&next_segment);
}
}
} }
#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct AudioConfig { struct AudioConfig {
mic_in_connections: Vec<String>, connections: HashMap<Role, Vec<String>>
tts_out_connections: Vec<String>,
sfx_out_connections: Vec<String>,
} }
impl AudioConfig { impl AudioConfig {
@@ -65,9 +161,7 @@ impl AudioConfig {
#[derive(Debug)] #[derive(Debug)]
struct Notify { struct Notify {
config: AudioConfig, config: AudioConfig,
mic_port: jack::Port<jack::Unowned>, ports: HashMap<Role, jack::Port<jack::Unowned>>
tts_port: jack::Port<jack::Unowned>,
sfx_port: jack::Port<jack::Unowned>,
} }
impl NotificationHandler for Notify { impl NotificationHandler for Notify {
@@ -78,26 +172,31 @@ impl NotificationHandler for Notify {
port_id_b: jack::PortId, port_id_b: jack::PortId,
are_connected: bool, are_connected: bool,
) { ) {
let port_a = client.port_by_id(port_id_a).unwrap(); let port_src = client.port_by_id(port_id_a).unwrap();
let port_b = client.port_by_id(port_id_b).unwrap(); let port_dst = client.port_by_id(port_id_b).unwrap();
let (stream_name, other_port, target_cfg) = if port_b == self.mic_port { let port_match = self.ports.iter().filter_map(|(role, local_port)| {
("Microphone input", port_a, &mut self.config.mic_in_connections) if role.is_input() && *local_port == port_dst {
} else if port_a == self.tts_port { Some((role, port_src.name()))
("TTS output", port_b, &mut self.config.tts_out_connections) } else if *local_port == port_src {
} else if port_a == self.sfx_port { Some((role, port_dst.name()))
("SFX output", port_b, &mut self.config.sfx_out_connections)
} else { } else {
return; None
}; }
}).next();
if let Some((role, Ok(target_port))) = port_match {
if !self.config.connections.contains_key(role) {
self.config.connections.insert(*role, Default::default());
}
let cfg_slot = self.config.connections.get_mut(role).unwrap();
if let Ok(port_name) = other_port.name() {
if are_connected { if are_connected {
log::info!("{} connected to {}", stream_name, port_name); log::info!("{} connected to {}", role, target_port);
target_cfg.push(port_name); cfg_slot.push(target_port);
} else { } else {
log::info!("{} disconnected from {}", stream_name, port_name); log::info!("{} disconnected from {}", role, target_port);
target_cfg.retain(|x| { x != &port_name} ); cfg_slot.retain(|x| { x != &target_port} );
} }
let save_data = serde_json::to_string_pretty(&self.config).unwrap(); let save_data = serde_json::to_string_pretty(&self.config).unwrap();
@@ -106,69 +205,51 @@ impl NotificationHandler for Notify {
} }
} }
pub async fn start_audio_input() -> (AudioInputControl, MicStream, TtsOutStream, SfxOutStream) { pub async fn start_audio_input() -> (AudioInputControl, AudioInStream, AudioOutStream, AudioOutStream) {
let (exit_tx, exit_rx) = oneshot::channel(); let (exit_tx, exit_rx) = oneshot::channel();
let config = AudioConfig::load(); let config = AudioConfig::load();
let (mic_audio_sink, mic_audio_src) = mpsc::channel(32);
let (sfx_audio_sink, mut sfx_audio_src) = mpsc::channel(32);
let (tts_audio_sink, mut tts_audio_src) = mpsc::channel(32);
let (volume_sink, volume_src) = watch::channel(0.); let (volume_sink, volume_src) = watch::channel(0.);
let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap(); let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap();
let mic_port = client.register_port("microphone-in", AudioIn::default()).unwrap();
let mut tts_port = client.register_port("tts-out", AudioOut::default()).unwrap();
let mut sfx_port = client.register_port("sfx-out", AudioOut::default()).unwrap();
let rate = client.sample_rate();
for (port, connections) in [ let (mut tts_sink, tts_stream) = AudioSink::new(&client, "tts-out");
(&tts_port, &config.tts_out_connections), let (mut sfx_sink, sfx_stream) = AudioSink::new(&client, "sfx-out");
(&sfx_port, &config.sfx_out_connections), let (mut mic_src, mic_stream) = AudioSource::new(&client, "microphone-in");
] {
for peer_name in connections {
if let Some(peer) = client.port_by_name(peer_name) {
if let Err(err) = client.connect_ports(port, &peer) {
log::error!("Failed to reconnect {} to {}", port.name().unwrap(), peer_name);
} else {
log::info!("Reconnected {} to {}", port.name().unwrap(), peer_name);
}
}
}
}
for (port, connections) in [
(&mic_port, &config.mic_in_connections)
] {
for peer_name in connections {
if let Some(peer) = client.port_by_name(peer_name) {
client.connect_ports(&peer, port).unwrap();
}
}
}
let notifier = Notify { let notifier = Notify {
config, config,
mic_port: mic_port.clone_unowned(), ports: HashMap::from_iter([
tts_port: tts_port.clone_unowned(), (Role::Mic, mic_src.port.clone_unowned()),
sfx_port: sfx_port.clone_unowned() (Role::Tts, tts_sink.port.clone_unowned()),
(Role::Sfx, sfx_sink.port.clone_unowned())
])
}; };
let mut meter = VuMeter::new(rate.into(), 1, None); for (role, local_port) in &notifier.ports {
let mut tts_output_buf = vec![]; if let Some(targets) = notifier.config.connections.get(role) {
let mut sfx_output_buf = vec![]; for peer_name in targets {
tts_output_buf.reserve(1024); if let Some(peer) = client.port_by_name(peer_name) {
sfx_output_buf.reserve(1024); let (src, dst) = if role.is_input() {
(&peer, local_port)
} else {
(local_port, &peer)
};
if let Err(err) = client.connect_ports(&src, &dst) {
log::error!("Failed to reconnect {} to {}: {:?}", role, peer_name, err);
} else {
log::info!("Reconnected {} to {}", role, peer_name);
}
}
}
}
}
let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| { let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| {
if mic_port.connected_count().unwrap() > 0 { if let Some(next_vu) = mic_src.process(scope) {
let buf: Vec<_> = mic_port.as_slice(scope).iter().copied().collect();
meter.process_interleaved(&buf);
mic_audio_sink.blocking_send(buf).unwrap();
volume_sink.send_if_modified(|v| { volume_sink.send_if_modified(|v| {
let next_vu = meter.channel_vu(0).unwrap();
let next_vu = (next_vu * 100.0).round() / 100.0; let next_vu = (next_vu * 100.0).round() / 100.0;
if *v != next_vu { if *v != next_vu {
*v = next_vu; *v = next_vu;
@@ -179,27 +260,8 @@ pub async fn start_audio_input() -> (AudioInputControl, MicStream, TtsOutStream,
}); });
} }
for (src, output, port) in [ for sink in [&mut tts_sink, &mut sfx_sink] {
(&mut tts_audio_src, &mut tts_output_buf, &mut tts_port), sink.process(scope);
(&mut sfx_audio_src, &mut sfx_output_buf, &mut sfx_port)
] {
if let Ok(mut next_outbuf) = src.try_recv() {
output.append(&mut next_outbuf);
}
if port.connected_count().unwrap() > 0 && !output.is_empty() {
let outbuf = port.as_mut_slice(scope);
let mut next_segment: Vec<f32> = output.drain(0..(outbuf.len()).min(output.len())).collect();
let underrun = outbuf.len() - next_segment.len();
if underrun > 0 {
for _ in 0..underrun {
next_segment.push(0.);
}
}
outbuf.copy_from_slice(&next_segment);
}
} }
jack::Control::Continue jack::Control::Continue
}); });
@@ -215,14 +277,5 @@ pub async fn start_audio_input() -> (AudioInputControl, MicStream, TtsOutStream,
(AudioInputControl { (AudioInputControl {
volume_src, volume_src,
_jack_client: JackClientRef { killswitch: Some(exit_tx) } _jack_client: JackClientRef { killswitch: Some(exit_tx) }
}, MicStream { }, mic_stream, tts_stream, sfx_stream)
sample_rate: rate,
src: mic_audio_src
}, TtsOutStream {
sample_rate: rate,
sink: tts_audio_sink
}, SfxOutStream {
sample_rate: rate,
sink: sfx_audio_sink
})
} }
+1 -1
View File
@@ -130,7 +130,7 @@ async fn main() {
}; };
let prediction_ctrl = prediction::start_prediction(saved_session, sys_message_src).await; let prediction_ctrl = prediction::start_prediction(saved_session, sys_message_src).await;
let (audio_ctrl, mic_stream, tts_output, sfx_output) = start_audio_input().await; let (audio_ctrl, mic_stream, tts_output, _sfx_output) = start_audio_input().await;
let tts_ctrl = start_tts(tts_output).await; let tts_ctrl = start_tts(tts_output).await;
let transcription_ctrl = transcription::start_transcription(mic_stream).await; let transcription_ctrl = transcription::start_transcription(mic_stream).await;
+2 -2
View File
@@ -4,7 +4,7 @@ use async_openai::{Client, config::OpenAIConfig, types::{InputSource, audio::{Au
use tempfile::SpooledData; use tempfile::SpooledData;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use crate::{audio::MicStream, events::AudioRecordRequest}; use crate::{audio::AudioInStream, events::AudioRecordRequest};
#[derive(Debug)] #[derive(Debug)]
pub struct TranscriptionControl { pub struct TranscriptionControl {
@@ -44,7 +44,7 @@ impl<T: std::io::Seek> std::io::Seek for RcFile<T> {
} }
} }
pub async fn start_transcription(mut mic_src: MicStream) -> TranscriptionControl { pub async fn start_transcription(mut mic_src: AudioInStream) -> TranscriptionControl {
let (audio_control_in, mut audio_control_out) = watch::channel(AudioRecordRequest::Finish); let (audio_control_in, mut audio_control_out) = watch::channel(AudioRecordRequest::Finish);
let (transcription_in, transcription_out) = mpsc::channel(1); let (transcription_in, transcription_out) = mpsc::channel(1);
+2 -2
View File
@@ -1,6 +1,6 @@
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use crate::audio::TtsOutStream; use crate::audio::AudioOutStream;
#[derive(Debug)] #[derive(Debug)]
pub struct TtsControl { pub struct TtsControl {
@@ -13,7 +13,7 @@ impl TtsControl {
} }
} }
pub async fn start_tts(audio_sink: TtsOutStream) -> TtsControl { pub async fn start_tts(audio_sink: AudioOutStream) -> TtsControl {
let (tts_request_sender, mut tts_request_receiver) = tokio::sync::mpsc::channel(3); let (tts_request_sender, mut tts_request_receiver) = tokio::sync::mpsc::channel(3);