From 42f764c91431d470936c667fc2fbf45253ccb28d Mon Sep 17 00:00:00 2001 From: Victoria Fischer Date: Tue, 23 Jun 2026 09:18:32 +0200 Subject: [PATCH] sfx: implement mixing multiple playbacks at once into the same output stream --- Cargo.lock | 16 ++++- Cargo.toml | 1 + src/sfx.rs | 202 ++++++++++++++++++++++++++++++++++------------------- 3 files changed, 144 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cb754c2..86c8a60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,6 +1212,7 @@ dependencies = [ "futures-timer", "hound", "iref 4.0.0", + "itertools 0.15.0", "jack", "json-ld", "log", @@ -2069,6 +2070,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b4baf93f58d4425749ca49a51c50ebab072c5df6994d08fed93541c331481dc" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.18" @@ -3528,7 +3538,7 @@ dependencies = [ "compact_str", "hashbrown 0.16.1", "indoc", - "itertools", + "itertools 0.14.0", "kasuari", "lru", "strum", @@ -3580,7 +3590,7 @@ dependencies = [ "hashbrown 0.16.1", "indoc", "instability", - "itertools", + "itertools 0.14.0", "line-clipping", "ratatui-core", "strum", @@ -5303,7 +5313,7 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16b380a1238663e5f8a691f9039c73e1cdae598a30e9855f541d29b08b53e9a5" dependencies = [ - "itertools", + "itertools 0.14.0", "unicode-segmentation", "unicode-width", ] diff --git a/Cargo.toml b/Cargo.toml index a280d86..7cc7f8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ futures = "0.3.32" futures-timer = "3.0.4" hound = "3.5.1" iref = { version = "4.0.0", features = ["url", "serde"] } +itertools = "0.15.0" jack = "0.13.5" json-ld = { version = "0.21.4", features = ["reqwest", "serde"] } log = "0.4.32" diff --git a/src/sfx.rs b/src/sfx.rs index d071b9e..d2fdb4b 100644 --- a/src/sfx.rs +++ b/src/sfx.rs @@ -1,6 +1,7 @@ use std::path::Path; use rand::seq::IteratorRandom; +use resampler::ResamplerFir; use symphonia::core::{codecs::audio::AudioDecoder, formats::{FormatReader, TrackType, probe::Hint}, io::MediaSourceStream}; use crate::audio::AudioOutStream; @@ -10,7 +11,7 @@ pub enum SfxRequest { RandomAmbient } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SfxControl { sink: tokio::sync::mpsc::Sender } @@ -21,77 +22,136 @@ impl SfxControl { } } +struct Sample { + format: Box, + decoder: Box, + channel_bufs: Vec, + sample_rate: u32, + channel_num: usize, + bitrate_resample: ResamplerFir +} + +impl Sample { + fn new(format: Box, decoder: Box, output_sample_rate: u32) -> Self { + let sample_rate = decoder.codec_params().sample_rate.unwrap(); + let channel_num = decoder.codec_params().channels.as_ref().unwrap().count(); + let bitrate_resample = resampler::ResamplerFir::new_from_hz(channel_num, sample_rate, output_sample_rate, Default::default(), Default::default()); + Self { + format, + decoder, + channel_bufs: vec![], + sample_rate, + channel_num, + bitrate_resample + } + } + + fn next(&mut self) -> Result>, symphonia::core::errors::Error> { + let packet = match self.format.next_packet()? { + Some(packet) => packet, + None => return Ok(None) + }; + + match self.decoder.decode_ref(&packet.as_packet_ref()) { + Ok(samples) => { + self.channel_bufs.resize(samples.samples_interleaved(), 0.); + samples.copy_to_slice_interleaved(&mut self.channel_bufs); + + let mut resampled = [0.; 4096]; + let (read_count, write_count) = self.bitrate_resample.resample(&self.channel_bufs, &mut resampled).unwrap(); + if read_count < self.channel_bufs.len() { + log::error!("Resampling buffer is too small for a {}Hz file! We need an additional {}", self.sample_rate, read_count); + } + + // First we convert the audio feed from stereo down to mono by simple average + // TODO: This should be something smarter, like a saturating add..? + let mono_stream = resampled[..write_count].chunks(self.channel_num).map(|channels| { + let total_volume = channels.iter().cloned().reduce(|a, b| a + b).unwrap_or_default(); + total_volume / (self.channel_num as f32) + }); + + Ok(Some(mono_stream.collect())) + }, + Err(err) => { + // Dump the audio buffer on failure + Err(err) + } + } + } +} + struct Player { audio_sink: AudioOutStream, - audio_out_buf: Vec + audio_out_buf: Vec, + playing_samples: Vec } impl Player { + async fn process(&mut self) { + if self.playing_samples.is_empty() { + return; + } + + 'out: loop { + let mut next_batch = vec![]; + + self.playing_samples.retain_mut(|sample| { + match sample.next() { + Ok(Some(buf)) => { + next_batch.push(buf); + true + }, + Ok(None) => { + false + }, + Err(err) => { + log::error!("Audio error: {:?}", err); + false + } + } + }); + + loop { + let mut this_sample = 0.; + let mut any_valid = false; + for stream in &mut next_batch { + if let Some(next_sample) = stream.pop() { + this_sample += next_sample; + any_valid = true; + } + } + + if !any_valid { + if self.audio_out_buf.is_empty() { + log::debug!("End of audio files"); + break 'out; + } + //log::debug!("No more valid files, but the buffer is still running!"); + break; + } + let mixed_sample = this_sample / next_batch.len() as f32; + self.audio_out_buf.push(mixed_sample); + if self.audio_out_buf.len() >= 1024 { + self.flush().await; + } + } + + if self.playing_samples.is_empty() { + break; + } + } + } + async fn submit_buffer(&mut self) { self.audio_sink.sink.send(std::mem::take(&mut self.audio_out_buf)).await.unwrap(); } - async fn play_stream(&mut self, mut format: Box, mut decoder: Box) -> Result<(), symphonia::core::errors::Error> { - let mut channel_bufs: Vec = vec![]; - let sample_rate = decoder.codec_params().sample_rate.unwrap(); - let channel_num = decoder.codec_params().channels.as_ref().unwrap().count(); - let mut bitrate_resample = resampler::ResamplerFir::new_from_hz(channel_num, sample_rate, self.audio_sink.sample_rate, Default::default(), Default::default()); - - log::debug!("Resampling {} -> {}", sample_rate, self.audio_sink.sample_rate); - - loop { - let packet = match format.next_packet() { - Ok(Some(packet)) => packet, - Ok(None) => break, - Err(err) => return Err(err) - }; - - match decoder.decode_ref(&packet.as_packet_ref()) { - Ok(samples) => { - channel_bufs.resize(samples.samples_interleaved(), 0.); - samples.copy_to_slice_interleaved(&mut channel_bufs); - - let mut resampled = [0.; 4096]; - let (read_count, write_count) = bitrate_resample.resample(&channel_bufs, &mut resampled).unwrap(); - if read_count < channel_bufs.len() { - log::error!("Resampling buffer is too small for a {}Hz file! We need an additional {}", sample_rate, read_count); - } - - // First we convert the audio feed from stereo down to mono by simple average - // TODO: This should be something smarter, like a saturating add..? - let mono_stream = resampled[..write_count].chunks(channel_num).map(|channels| { - let total_volume = channels.iter().cloned().reduce(|a, b| a + b).unwrap_or_default(); - total_volume / (channel_num as f32) - }); - - // Then we write out the resampled audio to our staging buffer - self.audio_out_buf.extend(mono_stream); - - // Once we have 1024 samples (jack default, I guess), we send it to the audio output - if self.audio_out_buf.len() >= 1024 { - self.submit_buffer().await; - } - }, - Err(err) => { - // Dump the audio buffer on failure - self.submit_buffer().await; - return Err(err) - } - } - } + async fn flush(&mut self) { if !self.audio_out_buf.is_empty() { self.submit_buffer().await; } - Ok(()) } - - // cw-m03 kwhz - morse code, 18:03 - // amateur radio station - spanish, 2:30 - // data transmission sound - 00:27 - // NOAA report - 00:50 - // sideband voice - 00:17 - async fn play_sample(&mut self, path: &Path) { log::debug!("Queuing sound playback for {:?}", path); let sfx_fd = std::fs::File::open(path).unwrap(); @@ -112,29 +172,27 @@ impl Player { &dec_opts ).expect("Unsupported audio codec"); - - log::debug!("Starting stream"); - if let Err(err) = self.play_stream(format, decoder).await { - log::error!("Audio playback error: {:?}", err); - } - log::debug!("Playback complete"); + self.playing_samples.push(Sample::new(format, decoder, self.audio_sink.sample_rate)); } } pub async fn start_sfx(audio_sink: AudioOutStream) -> SfxControl { let (event_sink, mut event_src) = tokio::sync::mpsc::channel(32); tokio::spawn(async move { - let mut player = Player { audio_sink, audio_out_buf: vec![] }; + let mut player = Player { audio_sink, audio_out_buf: vec![] , playing_samples: vec![]}; let sfx_dir = std::path::Path::new("./sfx"); loop { - while let Some(event) = event_src.recv().await { - match event { - SfxRequest::RandomAmbient => { - log::debug!("Playing random audio sample"); - let avail_files = std::fs::read_dir(sfx_dir.join("ambient")).unwrap(); - let chosen_file = avail_files.choose(&mut rand::rng()).unwrap().unwrap(); - player.play_sample(&chosen_file.path()).await; + tokio::select! { + _ = player.process(), if !player.playing_samples.is_empty() => {}, + Some(event) = event_src.recv() => { + match event { + SfxRequest::RandomAmbient => { + log::debug!("Playing random audio sample"); + let avail_files = std::fs::read_dir(sfx_dir.join("ambient")).unwrap(); + let chosen_file = avail_files.choose(&mut rand::rng()).unwrap().unwrap(); + player.play_sample(&chosen_file.path()).await; + } } } }