From aba21940329aca48a268edb3f504612ab29fff37 Mon Sep 17 00:00:00 2001 From: Victoria Fischer Date: Mon, 8 Jun 2026 14:57:54 +0200 Subject: [PATCH] events: start moving towards a more control-handle based task architecture --- src/main.rs | 44 +++++++-------- src/transcription.rs | 132 ++++++++++++++++++++++++++++++++----------- src/tts.rs | 17 +++++- 3 files changed, 137 insertions(+), 56 deletions(-) diff --git a/src/main.rs b/src/main.rs index 84ffcbd..1977949 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,7 @@ use futures::{StreamExt, future::FutureExt}; use ratatui::prelude::*; use tui_skeleton::{AnimationMode, SkeletonText}; -use crate::{events::AudioRecordRequest, prediction::{BandcampResult, PossibleResponse}, scene::{ConversationEntry, Scene, StageActions, StageDirection}, tts::start_tts}; +use crate::{prediction::{BandcampResult, PossibleResponse}, scene::{ConversationEntry, Scene, StageActions, StageDirection}, transcription::{AudioInputControl, TranscriptionControl, start_audio_input}, tts::{TtsControl, start_tts}}; mod scene; mod events; @@ -79,9 +79,10 @@ struct App { recording_audio: bool, focus_state: FocusState, - audio_control_sink: watch::Sender, + transcription: TranscriptionControl, prediction_request_sink: watch::Sender, - tts_request_sink: mpsc::Sender, + audio: AudioInputControl, + tts: TtsControl, sys_message_sink: mpsc::Sender } @@ -110,7 +111,7 @@ impl From for BandcampError { } impl App { - fn new(prediction_request_sink: watch::Sender, audio_control_sink: watch::Sender, tts_request_sink: mpsc::Sender, sys_message_sink: mpsc::Sender, initial_direction: StageDirection) -> Self { + fn new(prediction_request_sink: watch::Sender, audio: AudioInputControl, transcription: TranscriptionControl, tts: TtsControl, sys_message_sink: mpsc::Sender, initial_direction: StageDirection) -> Self { Self { scene: Default::default(), direction: initial_direction, @@ -123,10 +124,11 @@ impl App { prediction_request_sink, is_requesting: false, audio_level: -60., + audio, recording_audio: false, - audio_control_sink, + transcription, focus_state: FocusState::UserInput, - tts_request_sink, + tts, sys_message_sink } } @@ -392,7 +394,7 @@ impl App { self.next_actions.push(ConversationEntry::StageDirection(direction.clone())); } self.next_actions.push(ConversationEntry::Eva(selected.text.clone())); - self.speak(selected.text.clone()).await; + self.tts.speak(selected.text.clone()).await; self.regenerate_responses(); } @@ -476,7 +478,7 @@ impl App { KeyCode::Enter => { let row_num = self.conversation_state.selected().unwrap(); if let ConversationEntry::Eva(text) = &self.scene.conversation()[self.scene.conversation().len() - 1 - row_num] { - self.speak(text.clone()).await; + self.tts.speak(text.clone()).await; self.focus_state = FocusState::UserInput; self.conversation_state.select(None); } @@ -494,11 +496,11 @@ impl App { KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => { if self.recording_audio { self.recording_audio = false; - self.audio_control_sink.send_replace(AudioRecordRequest::Finish); + self.transcription.stop(); self.is_requesting = true; } else { self.recording_audio = true; - self.audio_control_sink.send_replace(AudioRecordRequest::Start); + self.transcription.start(); } }, KeyCode::Down => self.reply_state.select_next(), @@ -536,10 +538,6 @@ impl App { Ok(()) } - async fn speak(&mut self, text: String) { - self.tts_request_sink.send(text).await.unwrap(); - } - fn regenerate_responses(&mut self) { let actions = StageActions { direction: self.direction.clone(), @@ -610,11 +608,13 @@ async fn main() { SaveData::default() }; - let tts_request_sender = start_tts().await; - let (prediction_request_in, mut prediction_out) = prediction::start_prediction(sys_message_src, saved_session.messages).await; - let (mut audio_state_receiver, audio_control_in, mut transcription_out) = transcription::start_transcription(sys_message_sink.clone()).await; + let (audio_ctrl, mic_stream) = start_audio_input(&sys_message_sink).await; - let mut app = App::new(prediction_request_in, audio_control_in, tts_request_sender, sys_message_sink, saved_session.direction); + let tts_ctrl = start_tts().await; + let (prediction_request_in, mut prediction_out) = prediction::start_prediction(sys_message_src, saved_session.messages).await; + let transcription_ctrl = transcription::start_transcription(mic_stream).await; + + let mut app = App::new(prediction_request_in, audio_ctrl, transcription_ctrl, tts_ctrl, sys_message_sink, saved_session.direction); let mut events = EventStream::new(); let mut last_tick = Instant::now(); @@ -637,11 +637,11 @@ async fn main() { app.reply_state.select_first(); app.is_requesting = false; }, - _ = audio_state_receiver.changed() => { - app.audio_level = *audio_state_receiver.borrow(); + next_volume = app.audio.next() => { + app.audio_level = next_volume }, - maybe_transcription = transcription_out.recv() => { - app.next_actions.push(ConversationEntry::User(maybe_transcription.unwrap())); + transcription_result = app.transcription.next() => { + app.next_actions.push(ConversationEntry::User(transcription_result)); app.regenerate_responses(); } maybe_event = event => { diff --git a/src/transcription.rs b/src/transcription.rs index bdfd2a3..eab4aea 100644 --- a/src/transcription.rs +++ b/src/transcription.rs @@ -4,10 +4,60 @@ use async_openai::{Client, config::OpenAIConfig, types::{InputSource, audio::{Au use jack::{AudioIn, ClientOptions}; use oximedia_metering::vu_meter::VuMeter; use tempfile::SpooledData; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, oneshot, watch}; use crate::events::AudioRecordRequest; +#[derive(Debug)] +pub struct TranscriptionControl { + transcription_result_src: mpsc::Receiver, + record_state_sink: watch::Sender, +} + +impl TranscriptionControl { + pub fn start(&mut self) { + self.record_state_sink.send(AudioRecordRequest::Start).unwrap(); + } + + pub fn stop(&mut self) { + self.record_state_sink.send(AudioRecordRequest::Finish).unwrap(); + } + + pub async fn next(&mut self) -> String { + self.transcription_result_src.recv().await.unwrap() + } +} + +#[derive(Debug)] +pub struct JackClientRef { + killswitch: Option> +} + +impl Drop for JackClientRef { + fn drop(&mut self) { + self.killswitch.take().unwrap().send(()).unwrap(); + } +} + +#[derive(Debug)] +pub struct AudioInputControl { + volume_src: watch::Receiver, + _jack_client: JackClientRef +} + +impl AudioInputControl { + pub async fn next(&mut self) -> f64 { + self.volume_src.changed().await.unwrap(); + *self.volume_src.borrow_and_update() + } +} + +#[derive(Debug)] +pub struct MicStream { + src: mpsc::Receiver>, + sample_rate: u32 +} + struct RcFile(Arc>); impl std::io::Write for RcFile { @@ -26,25 +76,24 @@ impl std::io::Seek for RcFile { } } -pub async fn start_transcription(messages: mpsc::Sender) -> (watch::Receiver, watch::Sender, mpsc::Receiver) { - let (audio_sink, mut audio_src) = mpsc::channel(32); - let (audio_state_sender, audio_state_receiver) = watch::channel(0.); +pub async fn start_transcription(mut mic_src: MicStream) -> TranscriptionControl { let (audio_control_in, mut audio_control_out) = watch::channel(AudioRecordRequest::Finish); let (transcription_in, transcription_out) = mpsc::channel(1); - let rate = start_audio_input(&messages, audio_sink).await; + let ret = TranscriptionControl { + record_state_sink: audio_control_in, + transcription_result_src: transcription_out + }; tokio::spawn(async move { let spec = hound::WavSpec { channels: 1, - sample_rate: rate, + sample_rate: mic_src.sample_rate, bits_per_sample: 16, sample_format: hound::SampleFormat::Int }; - let mut meter = VuMeter::new(rate.into(), 1, None); - - let spool_size = 16 * (rate as usize) * 10;// 10 seconds of audio + let spool_size = 16 * (mic_src.sample_rate as usize) * 10;// 10 seconds of audio let mut writer = None; let mut outfile = None; @@ -85,10 +134,7 @@ pub async fn start_transcription(messages: mpsc::Sender) -> (watch::Rece } } }, - maybe_audio_packet = audio_src.recv() => { - let buf = maybe_audio_packet.unwrap(); - - meter.process_interleaved(buf.as_slice()); + Some(buf) = mic_src.src.recv() => { if let Some(w) = writer.as_mut() { for sample in buf.iter().copied() { let sample_i16 = (sample * 32768.0) @@ -98,43 +144,65 @@ pub async fn start_transcription(messages: mpsc::Sender) -> (watch::Rece } w.flush().unwrap(); } - audio_state_sender.send_if_modified(|v| { - let next_vu = meter.channel_vu(0).unwrap(); - if *v != next_vu { - *v = next_vu; - true - } else { - false - } - }); } }; } }); - (audio_state_receiver, audio_control_in, transcription_out) + ret } -async fn start_audio_input(messages: &mpsc::Sender, audio_sink: mpsc::Sender>) -> u32 { +pub async fn start_audio_input(messages: &mpsc::Sender) -> (AudioInputControl, MicStream) { + + let (exit_tx, exit_rx) = oneshot::channel(); + + let (mic_audio_sink, mic_audio_src) = mpsc::channel(32); + let (volume_sink, volume_src) = watch::channel(0.); + let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap(); - let port = client.register_port("microphone-in", AudioIn::default()).unwrap(); + let mic_port = client.register_port("microphone-in", AudioIn::default()).unwrap(); let rate = client.sample_rate(); - if let Ok(_) = client.connect_ports_by_name("mixxx-mic-1:capture_MONO", port.name().unwrap().as_str()) { + if let Ok(_) = client.connect_ports_by_name("mixxx-mic-1:capture_MONO", mic_port.name().unwrap().as_str()) { messages.send("Connected to audio.".into()).await.unwrap(); } else { messages.send("Failed to reconnect to audio.".into()).await.unwrap(); } + let mut meter = VuMeter::new(rate.into(), 1, None); + let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| { - if port.connected_count().unwrap() > 0 { - let buf: Vec<_> = port.as_slice(scope).iter().copied().collect(); - audio_sink.blocking_send(buf).unwrap(); + if mic_port.connected_count().unwrap() > 0 { + let buf: Vec<_> = mic_port.as_slice(scope).iter().copied().collect(); + meter.process_interleaved(&buf); + mic_audio_sink.blocking_send(buf).unwrap(); + + volume_sink.send_if_modified(|v| { + let next_vu = meter.channel_vu(0).unwrap(); + if *v != next_vu { + *v = next_vu; + true + } else { + false + } + }); } jack::Control::Continue }); - - std::mem::forget(client.activate_async((), handler).unwrap()); - rate + tokio::spawn(async move { + let async_client = client.activate_async((), handler).unwrap(); + + exit_rx.await.unwrap(); + + async_client.deactivate().unwrap(); + }); + + (AudioInputControl { + volume_src, + _jack_client: JackClientRef { killswitch: Some(exit_tx) } + }, MicStream { + sample_rate: rate, + src: mic_audio_src + }) } \ No newline at end of file diff --git a/src/tts.rs b/src/tts.rs index f2c6a45..d5578b2 100644 --- a/src/tts.rs +++ b/src/tts.rs @@ -1,6 +1,17 @@ use std::process::Command; -pub async fn start_tts() -> tokio::sync::mpsc::Sender { +#[derive(Debug)] +pub struct TtsControl { + request_sink: tokio::sync::mpsc::Sender +} + +impl TtsControl { + pub async fn speak(&self, text: String) { + self.request_sink.send(text).await.unwrap(); + } +} + +pub async fn start_tts() -> TtsControl { let (tts_request_sender, mut tts_request_receiver) = tokio::sync::mpsc::channel(3); @@ -12,5 +23,7 @@ pub async fn start_tts() -> tokio::sync::mpsc::Sender { } }); - tts_request_sender + TtsControl { + request_sink: tts_request_sender + } } \ No newline at end of file