main: make the audio file processing and tts speaking async

This commit is contained in:
2026-06-02 22:50:07 +02:00
parent 2e880ca552
commit 1e6f88ab87
2 changed files with 54 additions and 40 deletions
+5
View File
@@ -0,0 +1,5 @@
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum AudioRecordRequest {
Start,
Finish
}
+45 -36
View File
@@ -13,7 +13,7 @@ use sqlite::OpenFlags;
use tempfile::SpooledData; use tempfile::SpooledData;
use throbber_widgets_tui::{Throbber, ThrobberState}; use throbber_widgets_tui::{Throbber, ThrobberState};
use crossterm::{event::{self, EventStream, KeyCode, KeyModifiers}}; use crossterm::{event::{self, EventStream, KeyCode, KeyModifiers}};
use tokio::{sync::watch, time::Instant}; use tokio::{sync::{mpsc, watch}, time::Instant};
use tui_input::{Input, backend::crossterm::EventHandler}; use tui_input::{Input, backend::crossterm::EventHandler};
use std::{cell::RefCell, io::Read, process::Command, rc::Rc, sync::{Arc, Mutex}}; use std::{cell::RefCell, io::Read, process::Command, rc::Rc, sync::{Arc, Mutex}};
use futures::{StreamExt, future::FutureExt}; use futures::{StreamExt, future::FutureExt};
@@ -48,9 +48,10 @@ use futures::{StreamExt, future::FutureExt};
use ratatui::prelude::*; use ratatui::prelude::*;
use crate::scene::{ConversationEntry, PlaylistEntry, Scene}; use crate::{events::AudioRecordRequest, scene::{ConversationEntry, PlaylistEntry, Scene}};
mod scene; mod scene;
mod events;
#[derive(JsonSchema, Deserialize, Serialize, Debug, Clone)] #[derive(JsonSchema, Deserialize, Serialize, Debug, Clone)]
struct PossibleResponse { struct PossibleResponse {
@@ -95,8 +96,9 @@ struct App {
is_requesting: bool, is_requesting: bool,
audio_level: f64, audio_level: f64,
recording_audio: bool, recording_audio: bool,
audio_control_sink: tokio::sync::mpsc::Sender<AudioRecordRequest>, audio_control_sink: watch::Sender<AudioRecordRequest>,
focus_state: FocusState focus_state: FocusState,
tts_request_sink: mpsc::Sender<String>
} }
#[derive(Debug)] #[derive(Debug)]
@@ -106,7 +108,7 @@ enum FocusState {
} }
impl App { impl App {
fn new(prediction_request_sink: watch::Sender<Scene>, audio_control_sink: tokio::sync::mpsc::Sender<AudioRecordRequest>) -> Self { fn new(prediction_request_sink: watch::Sender<Scene>, audio_control_sink: watch::Sender<AudioRecordRequest>, tts_request_sink: mpsc::Sender<String>) -> Self {
Self { Self {
scene: Scene::default(), scene: Scene::default(),
next_reply_options: Vec::new(), next_reply_options: Vec::new(),
@@ -120,7 +122,8 @@ impl App {
audio_level: -60., audio_level: -60.,
recording_audio: false, recording_audio: false,
audio_control_sink, audio_control_sink,
focus_state: FocusState::UserInput focus_state: FocusState::UserInput,
tts_request_sink
} }
} }
@@ -283,14 +286,14 @@ impl App {
frame.render_widget(gauge, area); frame.render_widget(gauge, area);
} }
fn insert_selected_prompt(&mut self) { async fn insert_selected_prompt(&mut self) {
let selected = self.next_reply_options[self.reply_state.selected().unwrap()].clone(); let selected = self.next_reply_options[self.reply_state.selected().unwrap()].clone();
if let Some(direction) = &selected.stage_direction { if let Some(direction) = &selected.stage_direction {
self.scene.insert_conversation(ConversationEntry::StageDirection(direction.clone())); self.scene.insert_conversation(ConversationEntry::StageDirection(direction.clone()));
} }
self.scene.insert_conversation(ConversationEntry::Eva(selected.text.clone())); self.scene.insert_conversation(ConversationEntry::Eva(selected.text.clone()));
self.save(); self.save();
self.speak(&selected.text.as_str()); self.speak(selected.text.clone()).await;
self.regenerate_responses(); self.regenerate_responses();
} }
@@ -312,7 +315,7 @@ impl App {
KeyCode::Enter => { KeyCode::Enter => {
let row_num = self.conversation_state.selected().unwrap(); let row_num = self.conversation_state.selected().unwrap();
if let ConversationEntry::Eva(text) = &self.scene.conversation[self.scene.conversation.len() - 1 - row_num] { if let ConversationEntry::Eva(text) = &self.scene.conversation[self.scene.conversation.len() - 1 - row_num] {
self.speak(text.clone().as_str()); self.speak(text.clone()).await;
self.focus_state = FocusState::UserInput; self.focus_state = FocusState::UserInput;
self.conversation_state.select(None); self.conversation_state.select(None);
} }
@@ -330,22 +333,22 @@ impl App {
KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => { KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => {
if self.recording_audio { if self.recording_audio {
self.recording_audio = false; self.recording_audio = false;
self.audio_control_sink.send(AudioRecordRequest::Finish).await.unwrap(); self.audio_control_sink.send_replace(AudioRecordRequest::Finish);
self.is_requesting = true; self.is_requesting = true;
} else { } else {
self.recording_audio = true; self.recording_audio = true;
self.audio_control_sink.send(AudioRecordRequest::Start).await.unwrap(); self.audio_control_sink.send_replace(AudioRecordRequest::Start);
} }
}, },
KeyCode::Down => self.reply_state.select_next(), KeyCode::Down => self.reply_state.select_next(),
KeyCode::Up => self.reply_state.select_previous(), KeyCode::Up => self.reply_state.select_previous(),
KeyCode::Enter if key.modifiers.contains(KeyModifiers::CONTROL) => { KeyCode::Enter if key.modifiers.contains(KeyModifiers::CONTROL) => {
self.insert_selected_prompt(); self.insert_selected_prompt().await;
}, },
KeyCode::Enter => { KeyCode::Enter => {
let next_msg = self.user_input.value_and_reset(); let next_msg = self.user_input.value_and_reset();
if next_msg.trim().is_empty() { if next_msg.trim().is_empty() {
self.insert_selected_prompt(); self.insert_selected_prompt().await;
} else { } else {
if next_msg.starts_with("/") { if next_msg.starts_with("/") {
let mut parts = next_msg.splitn(2, " "); let mut parts = next_msg.splitn(2, " ");
@@ -457,10 +460,8 @@ impl App {
} }
} }
fn speak(&mut self, text: &str) { async fn speak(&mut self, text: String) {
// FIXME: Utterances should be handled in another task, so the UI doesn't lock up while waiting for speech to finish self.tts_request_sink.send(text).await.unwrap();
// TODO: We should also have espeak pipe out to stdout, then we can apply some audio effects and write to our own jack port.
Command::new("espeak-ng").arg("-v").arg("en-us+f3").arg(text).spawn().unwrap().wait().unwrap();
} }
fn regenerate_responses(&mut self) { fn regenerate_responses(&mut self) {
@@ -495,7 +496,7 @@ impl App {
self.scene.insert_conversation(ConversationEntry::SystemMessage("Mixxx playlist reloaded.".into())); self.scene.insert_conversation(ConversationEntry::SystemMessage("Mixxx playlist reloaded.".into()));
} }
fn on_response(&mut self, response: CreateChatCompletionResponse) { fn on_response(&mut self, response: &CreateChatCompletionResponse) {
self.is_requesting = false; self.is_requesting = false;
if let Some(calls) = &response.choices[0].message.tool_calls { if let Some(calls) = &response.choices[0].message.tool_calls {
for call in calls { for call in calls {
@@ -524,11 +525,6 @@ impl App {
} }
} }
enum AudioRecordRequest {
Start,
Finish
}
struct RcFile<T>(Arc<Mutex<T>>); struct RcFile<T>(Arc<Mutex<T>>);
impl<T: std::io::Write> std::io::Write for RcFile<T> { impl<T: std::io::Write> std::io::Write for RcFile<T> {
@@ -547,7 +543,6 @@ impl<T: std::io::Seek> std::io::Seek for RcFile<T> {
} }
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
color_eyre::install().unwrap(); color_eyre::install().unwrap();
@@ -559,18 +554,28 @@ async fn main() {
let mut terminal: Terminal<CrosstermBackend<std::io::Stdout>> = ratatui::init(); let mut terminal: Terminal<CrosstermBackend<std::io::Stdout>> = ratatui::init();
let (tts_request_sender, mut tts_request_receiver) = tokio::sync::mpsc::channel(3);
let (prediction_in, mut prediction_out) = tokio::sync::watch::channel(None); let (prediction_in, mut prediction_out) = tokio::sync::watch::channel(None);
let (prediction_request_in, mut prediction_request_out) = tokio::sync::watch::channel(Scene::default()); let (prediction_request_in, mut prediction_request_out) = tokio::sync::watch::channel(Scene::default());
let (audio_in, mut audio_out) = tokio::sync::mpsc::channel(32); let (audio_in, mut audio_out) = tokio::sync::mpsc::channel(32);
let (audio_level_in, mut audio_level_out) = tokio::sync::watch::channel(0.); let (audio_state_sender, mut audio_state_receiver) = tokio::sync::watch::channel(0.);
let (audio_control_in, mut audio_control_out) = tokio::sync::mpsc::channel(1); let (audio_control_in, mut audio_control_out) = tokio::sync::watch::channel(AudioRecordRequest::Finish);
let (transcrption_in, mut transcription_out) = tokio::sync::mpsc::channel(1); let (transcrption_in, mut transcription_out) = tokio::sync::mpsc::channel(1);
let mut app = App::new(prediction_request_in, audio_control_in); let mut app = App::new(prediction_request_in, audio_control_in, tts_request_sender);
app.load(); app.load();
// Set up the TTS task
tokio::spawn(async move {
while let Some(text) = tts_request_receiver.recv().await {
// TODO: We should also have espeak pipe out to stdout, then we can apply some audio effects and write to our own jack port.
Command::new("espeak-ng").arg("-v").arg("en-us+f3").arg(text).spawn().unwrap().wait().unwrap();
}
});
let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap(); let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap();
let port = client.register_port("microphone-in", AudioIn::default()).unwrap(); let port = client.register_port("microphone-in", AudioIn::default()).unwrap();
let rate = client.sample_rate(); let rate = client.sample_rate();
@@ -610,8 +615,8 @@ async fn main() {
loop { loop {
tokio::select! { tokio::select! {
maybe_audio_event = audio_control_out.recv() => { _ = audio_control_out.changed() => {
let audio_event = maybe_audio_event.unwrap(); let audio_event = *audio_control_out.borrow_and_update();
match audio_event { match audio_event {
AudioRecordRequest::Start => { AudioRecordRequest::Start => {
outfile = Some(Arc::new(Mutex::new(tempfile::spooled_tempfile(spool_size)))); outfile = Some(Arc::new(Mutex::new(tempfile::spooled_tempfile(spool_size))));
@@ -629,12 +634,16 @@ async fn main() {
}, },
SpooledData::InMemory(cursor) => cursor.into_inner().into(), SpooledData::InMemory(cursor) => cursor.into_inner().into(),
}; };
let response = client.audio().transcription().create(CreateTranscriptionRequest { let c = client.clone();
let t = transcrption_in.clone();
tokio::spawn(async move {
let response = c.audio().transcription().create(CreateTranscriptionRequest {
file: AudioInput { source: InputSource::Bytes { filename: "transcription.wav".into(), bytes } }, file: AudioInput { source: InputSource::Bytes { filename: "transcription.wav".into(), bytes } },
model: "gpt-4o-mini-transcribe".into(), model: "gpt-4o-mini-transcribe".into(),
..Default::default() ..Default::default()
}).await.unwrap(); }).await.unwrap();
transcrption_in.send(response.text).await.unwrap(); t.send(response.text).await.unwrap();
});
} }
} }
}, },
@@ -651,7 +660,7 @@ async fn main() {
} }
w.flush().unwrap(); w.flush().unwrap();
} }
audio_level_in.send_if_modified(|v| { audio_state_sender.send_if_modified(|v| {
let next_vu = meter.channel_vu(0).unwrap(); let next_vu = meter.channel_vu(0).unwrap();
if *v != next_vu { if *v != next_vu {
*v = next_vu; *v = next_vu;
@@ -669,7 +678,7 @@ async fn main() {
let client: Client<OpenAIConfig> = Client::default(); let client: Client<OpenAIConfig> = Client::default();
loop { loop {
if let Ok(_) = prediction_request_out.changed().await { if let Ok(_) = prediction_request_out.changed().await {
let request = prediction_request_out.borrow().clone(); let request = prediction_request_out.borrow_and_update().clone();
let chat_request = CreateChatCompletionRequest { let chat_request = CreateChatCompletionRequest {
/*tools: Some(vec![ /*tools: Some(vec![
ChatCompletionTools::Function( ChatCompletionTools::Function(
@@ -710,10 +719,10 @@ async fn main() {
tokio::select! { tokio::select! {
_ = delay => (), _ = delay => (),
_ = prediction_out.changed() => { _ = prediction_out.changed() => {
app.on_response(prediction_out.borrow().clone().unwrap()); app.on_response(prediction_out.borrow_and_update().as_ref().unwrap());
}, },
_ = audio_level_out.changed() => { _ = audio_state_receiver.changed() => {
app.audio_level = audio_level_out.borrow().clone(); app.audio_level = *audio_state_receiver.borrow_and_update();
}, },
maybe_transcription = transcription_out.recv() => { maybe_transcription = transcription_out.recv() => {
app.scene.insert_conversation(ConversationEntry::User(maybe_transcription.unwrap())); app.scene.insert_conversation(ConversationEntry::User(maybe_transcription.unwrap()));