Files
eva-pwm-cohost/src/audio.rs
T

330 lines
10 KiB
Rust

use std::{collections::HashMap, fmt::Display};
use jack::{AudioIn, AudioOut, ClientOptions, NotificationHandler, Port, ProcessScope};
use oximedia_metering::vu_meter::VuMeter;
use serde::{Deserialize, Serialize};
use tokio::sync::*;
use crate::events::AudioRecordRequest;
#[derive(Debug)]
#[allow(unused)]
pub enum AudioError {
Jack(jack::Error),
AudioBufferSend(mpsc::error::SendError<Vec<f32>>),
AudioBufferRecv(mpsc::error::TryRecvError),
AudioRequestSend(watch::error::SendError<AudioRecordRequest>)
}
impl From<jack::Error> for AudioError {
fn from(value: jack::Error) -> Self {
Self::Jack(value)
}
}
impl From<mpsc::error::SendError<Vec<f32>>> for AudioError {
fn from(value: mpsc::error::SendError<Vec<f32>>) -> Self {
Self::AudioBufferSend(value)
}
}
impl From<mpsc::error::TryRecvError> for AudioError {
fn from(value: mpsc::error::TryRecvError) -> Self {
Self::AudioBufferRecv(value)
}
}
impl From<watch::error::SendError<AudioRecordRequest>> for AudioError {
fn from(value: watch::error::SendError<AudioRecordRequest>) -> Self {
Self::AudioRequestSend(value)
}
}
#[derive(Debug)]
pub struct JackClientRef {
killswitch: Option<oneshot::Sender<()>>
}
impl Drop for JackClientRef {
fn drop(&mut self) {
self.killswitch.take().expect("Killswitch was already dropped!").send(()).expect("Cannot fire Jack killswitch");
}
}
#[derive(Debug)]
pub struct AudioInputControl {
volume_src: watch::Receiver<f64>,
_jack_client: JackClientRef
}
impl AudioInputControl {
pub async fn next(&mut self) -> Result<f64, watch::error::RecvError> {
self.volume_src.changed().await?;
Ok(*self.volume_src.borrow_and_update())
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy, Hash)]
enum Role {
Mic,
Tts,
Sfx
}
impl Display for Role {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let val = match self {
Self::Mic => "Microphone Input",
Self::Tts => "TTS Output",
Self::Sfx => "SFX Output"
};
f.write_str(val)
}
}
impl Role {
fn is_input(&self) -> bool {
matches!(self, Role::Mic)
}
}
#[derive(Debug)]
pub struct AudioInStream {
pub src: mpsc::Receiver<Vec<f32>>,
pub sample_rate: u32
}
#[derive(Debug)]
pub struct AudioOutStream {
pub sink: mpsc::Sender<Vec<f32>>,
pub sample_rate: u32
}
struct AudioSource {
port: Port<jack::AudioIn>,
sample_sink: mpsc::Sender<Vec<f32>>,
meter: VuMeter
}
impl AudioSource {
fn new(client: &jack::Client, name: &str) -> Result<(Self, AudioInStream), AudioError> {
let (sample_sink, receiver) = mpsc::channel(32);
let port = client.register_port(name, AudioIn::default())?;
Ok((AudioSource {
port,
sample_sink,
meter: VuMeter::new(client.sample_rate().into(), 1, None)
}, AudioInStream {
sample_rate: client.sample_rate(),
src: receiver
}))
}
fn process(&mut self, scope: &ProcessScope) -> Result<Option<f64>, AudioError> {
if self.port.connected_count()? > 0 {
let buf: Vec<_> = self.port.as_slice(scope).to_vec();
self.meter.process_interleaved(&buf);
self.sample_sink.blocking_send(buf)?;
Ok(self.meter.channel_vu(0))
} else {
Ok(None)
}
}
}
#[derive(Debug)]
struct AudioSink {
output_buf: Vec<f32>,
port: Port<jack::AudioOut>,
sample_src: mpsc::Receiver<Vec<f32>>
}
impl AudioSink {
fn new(client: &jack::Client, name: &str) -> Result<(Self, AudioOutStream), AudioError> {
let (sender, sample_src) = mpsc::channel(32);
let port = client.register_port(name, AudioOut::default())?;
Ok((AudioSink {
output_buf: Vec::with_capacity(1024),
port,
sample_src
}, AudioOutStream {
sample_rate: client.sample_rate(),
sink: sender,
}))
}
fn process(&mut self, scope: &ProcessScope) -> Result<(), AudioError> {
if let Ok(buf) = self.sample_src.try_recv() {
self.output_buf.extend(buf);
}
if self.port.connected_count()? > 0 && !self.output_buf.is_empty() {
let outbuf = self.port.as_mut_slice(scope);
let mut next_segment: Vec<f32> = self.output_buf.drain(..(outbuf.len()).min(self.output_buf.len())).collect();
let underrun = outbuf.len() - next_segment.len();
if underrun > 0 {
log::warn!("Audio stream underrun: {} samples", underrun);
next_segment.extend(std::iter::repeat_n(0., underrun));
}
outbuf.copy_from_slice(&next_segment);
}
Ok(())
}
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct AudioConfig {
connections: HashMap<Role, Vec<String>>
}
impl AudioConfig {
pub fn load() -> Self {
if let Ok(contents) = std::fs::read_to_string("audio.json") {
match serde_json::from_str(contents.as_str()) {
Err(err) => {
log::error!("Failed to load audio.json: {:?}", err);
Default::default()
},
Ok(ret) => ret
}
} else {
Default::default()
}
}
}
#[derive(Debug)]
struct Notify {
config: AudioConfig,
ports: HashMap<Role, jack::Port<jack::Unowned>>
}
impl NotificationHandler for Notify {
fn ports_connected(
&mut self,
client: &jack::Client,
port_id_a: jack::PortId,
port_id_b: jack::PortId,
are_connected: bool,
) {
let port_src = client.port_by_id(port_id_a).unwrap();
let port_dst = client.port_by_id(port_id_b).unwrap();
let port_match = self.ports.iter().filter_map(|(role, local_port)| {
if role.is_input() && *local_port == port_dst {
Some((role, port_src.name()))
} else if *local_port == port_src {
Some((role, port_dst.name()))
} else {
None
}
}).next();
if let Some((role, Ok(target_port))) = port_match {
let cfg_slot = self.config.connections.entry(*role).or_default();
if are_connected {
log::info!("{} connected to {}", role, target_port);
cfg_slot.push(target_port);
} else {
log::info!("{} disconnected from {}", role, target_port);
cfg_slot.retain(|x| { x != &target_port} );
}
let save_data = serde_json::to_string_pretty(&self.config).unwrap();
if let Err(err) = std::fs::write("audio.json", save_data) {
log::error!("Failed to write audio.json: {:?}", err);
}
}
}
}
pub async fn start_audio_input() -> (AudioInputControl, AudioInStream, AudioOutStream, AudioOutStream) {
let (exit_tx, exit_rx) = oneshot::channel();
let config = AudioConfig::load();
let (volume_sink, volume_src) = watch::channel(0.);
let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).expect("Could not create JACK client!");
let (mut tts_sink, tts_stream) = AudioSink::new(&client, "tts-out").expect("Could not create TTS sink");
let (mut sfx_sink, sfx_stream) = AudioSink::new(&client, "sfx-out").expect("Could not create SFX sink");
let (mut mic_src, mic_stream) = AudioSource::new(&client, "microphone-in").expect("Could not create microphone source");
let notifier = Notify {
config,
ports: HashMap::from_iter([
(Role::Mic, mic_src.port.clone_unowned()),
(Role::Tts, tts_sink.port.clone_unowned()),
(Role::Sfx, sfx_sink.port.clone_unowned())
])
};
for (role, local_port) in &notifier.ports {
if let Some(targets) = notifier.config.connections.get(role) {
for peer_name in targets {
if let Some(peer) = client.port_by_name(peer_name) {
let (src, dst) = if role.is_input() {
(&peer, local_port)
} else {
(local_port, &peer)
};
if let Err(err) = client.connect_ports(src, dst) {
log::error!("Failed to reconnect {} to {}: {:?}", role, peer_name, err);
} else {
log::info!("Reconnected {} to {}", role, peer_name);
}
}
}
}
}
let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| {
match mic_src.process(scope) {
Ok(Some(next_vu)) => {
volume_sink.send_if_modified(|v| {
let next_vu = (next_vu * 100.0).round() / 100.0;
if *v != next_vu {
*v = next_vu;
true
} else {
false
}
});
},
Ok(None) => (),
Err(err) => {
log::error!("Error while processing mic source: {:?}", err);
return jack::Control::Quit
}
}
for sink in [&mut tts_sink, &mut sfx_sink] {
if let Err(err) = sink.process(scope) {
log::error!("Error while processing {:?} audio sink: {:?}", sink, err);
}
}
jack::Control::Continue
});
tokio::spawn(async move {
let async_client = client.activate_async(notifier, handler).expect("Unable to start jack client!");
if let Err(err) = exit_rx.await {
log::warn!("Premature killswitch triggered: {:?}", err);
}
async_client.deactivate().expect("Unable to stop the jack client");
});
(AudioInputControl {
volume_src,
_jack_client: JackClientRef { killswitch: Some(exit_tx) }
}, mic_stream, tts_stream, sfx_stream)
}