diff --git a/src/events.rs b/src/events.rs new file mode 100644 index 0000000..83edb11 --- /dev/null +++ b/src/events.rs @@ -0,0 +1,5 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum AudioRecordRequest { + Start, + Finish +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 2f3c73d..3458800 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,7 @@ use sqlite::OpenFlags; use tempfile::SpooledData; use throbber_widgets_tui::{Throbber, ThrobberState}; use crossterm::{event::{self, EventStream, KeyCode, KeyModifiers}}; -use tokio::{sync::watch, time::Instant}; +use tokio::{sync::{mpsc, watch}, time::Instant}; use tui_input::{Input, backend::crossterm::EventHandler}; use std::{cell::RefCell, io::Read, process::Command, rc::Rc, sync::{Arc, Mutex}}; use futures::{StreamExt, future::FutureExt}; @@ -48,9 +48,10 @@ use futures::{StreamExt, future::FutureExt}; use ratatui::prelude::*; -use crate::scene::{ConversationEntry, PlaylistEntry, Scene}; +use crate::{events::AudioRecordRequest, scene::{ConversationEntry, PlaylistEntry, Scene}}; mod scene; +mod events; #[derive(JsonSchema, Deserialize, Serialize, Debug, Clone)] struct PossibleResponse { @@ -95,8 +96,9 @@ struct App { is_requesting: bool, audio_level: f64, recording_audio: bool, - audio_control_sink: tokio::sync::mpsc::Sender, - focus_state: FocusState + audio_control_sink: watch::Sender, + focus_state: FocusState, + tts_request_sink: mpsc::Sender } #[derive(Debug)] @@ -106,7 +108,7 @@ enum FocusState { } impl App { - fn new(prediction_request_sink: watch::Sender, audio_control_sink: tokio::sync::mpsc::Sender) -> Self { + fn new(prediction_request_sink: watch::Sender, audio_control_sink: watch::Sender, tts_request_sink: mpsc::Sender) -> Self { Self { scene: Scene::default(), next_reply_options: Vec::new(), @@ -120,7 +122,8 @@ impl App { audio_level: -60., recording_audio: false, audio_control_sink, - focus_state: FocusState::UserInput + focus_state: FocusState::UserInput, + tts_request_sink } } @@ -283,14 +286,14 @@ impl App { frame.render_widget(gauge, area); } - fn insert_selected_prompt(&mut self) { + async fn insert_selected_prompt(&mut self) { let selected = self.next_reply_options[self.reply_state.selected().unwrap()].clone(); if let Some(direction) = &selected.stage_direction { self.scene.insert_conversation(ConversationEntry::StageDirection(direction.clone())); } self.scene.insert_conversation(ConversationEntry::Eva(selected.text.clone())); self.save(); - self.speak(&selected.text.as_str()); + self.speak(selected.text.clone()).await; self.regenerate_responses(); } @@ -312,7 +315,7 @@ impl App { KeyCode::Enter => { let row_num = self.conversation_state.selected().unwrap(); if let ConversationEntry::Eva(text) = &self.scene.conversation[self.scene.conversation.len() - 1 - row_num] { - self.speak(text.clone().as_str()); + self.speak(text.clone()).await; self.focus_state = FocusState::UserInput; self.conversation_state.select(None); } @@ -330,22 +333,22 @@ impl App { KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => { if self.recording_audio { self.recording_audio = false; - self.audio_control_sink.send(AudioRecordRequest::Finish).await.unwrap(); + self.audio_control_sink.send_replace(AudioRecordRequest::Finish); self.is_requesting = true; } else { self.recording_audio = true; - self.audio_control_sink.send(AudioRecordRequest::Start).await.unwrap(); + self.audio_control_sink.send_replace(AudioRecordRequest::Start); } }, KeyCode::Down => self.reply_state.select_next(), KeyCode::Up => self.reply_state.select_previous(), KeyCode::Enter if key.modifiers.contains(KeyModifiers::CONTROL) => { - self.insert_selected_prompt(); + self.insert_selected_prompt().await; }, KeyCode::Enter => { let next_msg = self.user_input.value_and_reset(); if next_msg.trim().is_empty() { - self.insert_selected_prompt(); + self.insert_selected_prompt().await; } else { if next_msg.starts_with("/") { let mut parts = next_msg.splitn(2, " "); @@ -457,10 +460,8 @@ impl App { } } - fn speak(&mut self, text: &str) { - // FIXME: Utterances should be handled in another task, so the UI doesn't lock up while waiting for speech to finish - // TODO: We should also have espeak pipe out to stdout, then we can apply some audio effects and write to our own jack port. - Command::new("espeak-ng").arg("-v").arg("en-us+f3").arg(text).spawn().unwrap().wait().unwrap(); + async fn speak(&mut self, text: String) { + self.tts_request_sink.send(text).await.unwrap(); } fn regenerate_responses(&mut self) { @@ -495,7 +496,7 @@ impl App { self.scene.insert_conversation(ConversationEntry::SystemMessage("Mixxx playlist reloaded.".into())); } - fn on_response(&mut self, response: CreateChatCompletionResponse) { + fn on_response(&mut self, response: &CreateChatCompletionResponse) { self.is_requesting = false; if let Some(calls) = &response.choices[0].message.tool_calls { for call in calls { @@ -524,11 +525,6 @@ impl App { } } -enum AudioRecordRequest { - Start, - Finish -} - struct RcFile(Arc>); impl std::io::Write for RcFile { @@ -547,7 +543,6 @@ impl std::io::Seek for RcFile { } } - #[tokio::main] async fn main() { color_eyre::install().unwrap(); @@ -559,18 +554,28 @@ async fn main() { let mut terminal: Terminal> = ratatui::init(); + let (tts_request_sender, mut tts_request_receiver) = tokio::sync::mpsc::channel(3); + let (prediction_in, mut prediction_out) = tokio::sync::watch::channel(None); let (prediction_request_in, mut prediction_request_out) = tokio::sync::watch::channel(Scene::default()); let (audio_in, mut audio_out) = tokio::sync::mpsc::channel(32); - let (audio_level_in, mut audio_level_out) = tokio::sync::watch::channel(0.); + let (audio_state_sender, mut audio_state_receiver) = tokio::sync::watch::channel(0.); - let (audio_control_in, mut audio_control_out) = tokio::sync::mpsc::channel(1); + let (audio_control_in, mut audio_control_out) = tokio::sync::watch::channel(AudioRecordRequest::Finish); let (transcrption_in, mut transcription_out) = tokio::sync::mpsc::channel(1); - let mut app = App::new(prediction_request_in, audio_control_in); + let mut app = App::new(prediction_request_in, audio_control_in, tts_request_sender); app.load(); + // Set up the TTS task + tokio::spawn(async move { + while let Some(text) = tts_request_receiver.recv().await { + // TODO: We should also have espeak pipe out to stdout, then we can apply some audio effects and write to our own jack port. + Command::new("espeak-ng").arg("-v").arg("en-us+f3").arg(text).spawn().unwrap().wait().unwrap(); + } + }); + let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap(); let port = client.register_port("microphone-in", AudioIn::default()).unwrap(); let rate = client.sample_rate(); @@ -610,8 +615,8 @@ async fn main() { loop { tokio::select! { - maybe_audio_event = audio_control_out.recv() => { - let audio_event = maybe_audio_event.unwrap(); + _ = audio_control_out.changed() => { + let audio_event = *audio_control_out.borrow_and_update(); match audio_event { AudioRecordRequest::Start => { outfile = Some(Arc::new(Mutex::new(tempfile::spooled_tempfile(spool_size)))); @@ -629,12 +634,16 @@ async fn main() { }, SpooledData::InMemory(cursor) => cursor.into_inner().into(), }; - let response = client.audio().transcription().create(CreateTranscriptionRequest { - file: AudioInput { source: InputSource::Bytes { filename: "transcription.wav".into(), bytes } }, - model: "gpt-4o-mini-transcribe".into(), - ..Default::default() - }).await.unwrap(); - transcrption_in.send(response.text).await.unwrap(); + let c = client.clone(); + let t = transcrption_in.clone(); + tokio::spawn(async move { + let response = c.audio().transcription().create(CreateTranscriptionRequest { + file: AudioInput { source: InputSource::Bytes { filename: "transcription.wav".into(), bytes } }, + model: "gpt-4o-mini-transcribe".into(), + ..Default::default() + }).await.unwrap(); + t.send(response.text).await.unwrap(); + }); } } }, @@ -651,7 +660,7 @@ async fn main() { } w.flush().unwrap(); } - audio_level_in.send_if_modified(|v| { + audio_state_sender.send_if_modified(|v| { let next_vu = meter.channel_vu(0).unwrap(); if *v != next_vu { *v = next_vu; @@ -669,7 +678,7 @@ async fn main() { let client: Client = Client::default(); loop { if let Ok(_) = prediction_request_out.changed().await { - let request = prediction_request_out.borrow().clone(); + let request = prediction_request_out.borrow_and_update().clone(); let chat_request = CreateChatCompletionRequest { /*tools: Some(vec![ ChatCompletionTools::Function( @@ -710,10 +719,10 @@ async fn main() { tokio::select! { _ = delay => (), _ = prediction_out.changed() => { - app.on_response(prediction_out.borrow().clone().unwrap()); + app.on_response(prediction_out.borrow_and_update().as_ref().unwrap()); }, - _ = audio_level_out.changed() => { - app.audio_level = audio_level_out.borrow().clone(); + _ = audio_state_receiver.changed() => { + app.audio_level = *audio_state_receiver.borrow_and_update(); }, maybe_transcription = transcription_out.recv() => { app.scene.insert_conversation(ConversationEntry::User(maybe_transcription.unwrap()));