From f1e6684d9c529c73a99476e6e73ce96b13015e91 Mon Sep 17 00:00:00 2001 From: Victoria Fischer Date: Wed, 3 Jun 2026 19:15:37 +0200 Subject: [PATCH] transcription: split out transcription task into separate module --- src/main.rs | 140 ++++--------------------------------------- src/scene.rs | 2 - src/transcription.rs | 139 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+), 130 deletions(-) create mode 100644 src/transcription.rs diff --git a/src/main.rs b/src/main.rs index 3458800..fd7dce7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,17 @@ -use async_openai::{Client, config::OpenAIConfig, types::{InputSource, audio::{AudioInput, CreateTranscriptionRequest}, chat::{ChatCompletionMessageToolCalls, ChatCompletionTool, ChatCompletionTools, CreateChatCompletionRequest, CreateChatCompletionResponse, FunctionObject}}}; +use async_openai::{Client, config::OpenAIConfig, types::chat::{ChatCompletionMessageToolCalls, CreateChatCompletionRequest, CreateChatCompletionResponse}}; use chrono::{DateTime, Duration, Utc}; use futures_timer::Delay; -use jack::{AudioIn, ClientOptions}; -use oximedia_metering::vu_meter::VuMeter; -use schemars::{JsonSchema, schema_for}; +use schemars::JsonSchema; use scraper::{Html, Selector}; use serde::{Deserialize, Serialize}; use ratatui::{Frame, layout::{Constraint, Direction, Layout}, widgets::{Block, BorderType, Clear, Gauge, List, ListDirection, ListItem, ListState, Paragraph, Wrap}}; -use serde_json::Value; use sqlite::OpenFlags; -use tempfile::SpooledData; use throbber_widgets_tui::{Throbber, ThrobberState}; use crossterm::{event::{self, EventStream, KeyCode, KeyModifiers}}; use tokio::{sync::{mpsc, watch}, time::Instant}; use tui_input::{Input, backend::crossterm::EventHandler}; -use std::{cell::RefCell, io::Read, process::Command, rc::Rc, sync::{Arc, Mutex}}; +use std::process::Command ; use futures::{StreamExt, future::FutureExt}; // TODO: We should have a separate 'state.json' file, which remembers jack connections, and the world time for the show to end. Then we only update the 'time remaining' field in the scene and only deal with relative durations inside the scene data @@ -52,6 +48,7 @@ use crate::{events::AudioRecordRequest, scene::{ConversationEntry, PlaylistEntry mod scene; mod events; +mod transcription; #[derive(JsonSchema, Deserialize, Serialize, Debug, Clone)] struct PossibleResponse { @@ -525,24 +522,6 @@ impl App { } } -struct RcFile(Arc>); - -impl std::io::Write for RcFile { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.0.lock().unwrap().write(buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - self.0.lock().unwrap().flush() - } -} - -impl std::io::Seek for RcFile { - fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result { - self.0.lock().unwrap().seek(pos) - } -} - #[tokio::main] async fn main() { color_eyre::install().unwrap(); @@ -554,16 +533,14 @@ async fn main() { let mut terminal: Terminal> = ratatui::init(); + let (sys_message_sender, mut sys_message_receiver) = tokio::sync::mpsc::channel(5); + let (tts_request_sender, mut tts_request_receiver) = tokio::sync::mpsc::channel(3); let (prediction_in, mut prediction_out) = tokio::sync::watch::channel(None); let (prediction_request_in, mut prediction_request_out) = tokio::sync::watch::channel(Scene::default()); - let (audio_in, mut audio_out) = tokio::sync::mpsc::channel(32); - let (audio_state_sender, mut audio_state_receiver) = tokio::sync::watch::channel(0.); - - let (audio_control_in, mut audio_control_out) = tokio::sync::watch::channel(AudioRecordRequest::Finish); - let (transcrption_in, mut transcription_out) = tokio::sync::mpsc::channel(1); + let (mut audio_state_receiver, audio_control_in, mut transcription_out) = transcription::start_transcription(sys_message_sender).await; let mut app = App::new(prediction_request_in, audio_control_in, tts_request_sender); app.load(); @@ -576,104 +553,6 @@ async fn main() { } }); - let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap(); - let port = client.register_port("microphone-in", AudioIn::default()).unwrap(); - let rate = client.sample_rate(); - - - if let Ok(_) = client.connect_ports_by_name("mixxx-mic-1:capture_MONO", port.name().unwrap().as_str()) { - app.scene.insert_conversation(ConversationEntry::SystemMessage("Connected to audio.".into())); - } else { - app.scene.insert_conversation(ConversationEntry::SystemMessage("Failed to reconnect to audio.".into())); - } - - let handler = jack::contrib::ClosureProcessHandler::new(move |client, scope| { - if port.connected_count().unwrap() > 0 { - let buf: Vec<_> = port.as_slice(scope).iter().copied().collect(); - audio_in.blocking_send(buf).unwrap(); - } - jack::Control::Continue - }); - - std::mem::forget(client.activate_async((), handler).unwrap()); - - tokio::spawn(async move { - let spec = hound::WavSpec { - channels: 1, - sample_rate: rate, - bits_per_sample: 16, - sample_format: hound::SampleFormat::Int - }; - - let mut meter = VuMeter::new(rate.into(), 1, None); - - let spool_size = 16 * (rate as usize) * 10;// 10 seconds of audio - let mut writer = None; - let mut outfile = None; - - let client: Client = Client::default(); - - loop { - tokio::select! { - _ = audio_control_out.changed() => { - let audio_event = *audio_control_out.borrow_and_update(); - match audio_event { - AudioRecordRequest::Start => { - outfile = Some(Arc::new(Mutex::new(tempfile::spooled_tempfile(spool_size)))); - writer = Some(hound::WavWriter::new(RcFile(outfile.as_ref().unwrap().clone()), spec).unwrap()); - }, - AudioRecordRequest::Finish => { - writer = None; - - let final_audio = outfile.take().unwrap(); - let bytes = match Arc::into_inner(final_audio).unwrap().into_inner().unwrap().into_inner() { - SpooledData::OnDisk(mut file) => { - let mut bytes = Vec::new(); - file.read_to_end(&mut bytes).unwrap(); - bytes.into() - }, - SpooledData::InMemory(cursor) => cursor.into_inner().into(), - }; - let c = client.clone(); - let t = transcrption_in.clone(); - tokio::spawn(async move { - let response = c.audio().transcription().create(CreateTranscriptionRequest { - file: AudioInput { source: InputSource::Bytes { filename: "transcription.wav".into(), bytes } }, - model: "gpt-4o-mini-transcribe".into(), - ..Default::default() - }).await.unwrap(); - t.send(response.text).await.unwrap(); - }); - } - } - }, - maybe_audio_packet = audio_out.recv() => { - let buf = maybe_audio_packet.unwrap(); - - meter.process_interleaved(buf.as_slice()); - if let Some(w) = writer.as_mut() { - for sample in buf.iter().copied() { - let sample_i16 = (sample * 32768.0) - .clamp(i16::MIN as f32, i16::MAX as f32) - as i16; - w.write_sample(sample_i16).unwrap(); - } - w.flush().unwrap(); - } - audio_state_sender.send_if_modified(|v| { - let next_vu = meter.channel_vu(0).unwrap(); - if *v != next_vu { - *v = next_vu; - true - } else { - false - } - }); - } - }; - } - }); - tokio::spawn(async move { let client: Client = Client::default(); loop { @@ -724,6 +603,11 @@ async fn main() { _ = audio_state_receiver.changed() => { app.audio_level = *audio_state_receiver.borrow_and_update(); }, + maybe_message = sys_message_receiver.recv() => { + if let Some(message) = maybe_message { + app.scene.insert_conversation(ConversationEntry::SystemMessage(message)); + } + }, maybe_transcription = transcription_out.recv() => { app.scene.insert_conversation(ConversationEntry::User(maybe_transcription.unwrap())); app.regenerate_responses(); diff --git a/src/scene.rs b/src/scene.rs index a179cb1..aba4743 100644 --- a/src/scene.rs +++ b/src/scene.rs @@ -1,10 +1,8 @@ use async_openai::types::chat::*; use chrono::Duration; -use crossterm::event::MediaKeyCode::Play; use schemars::schema_for; use serde::{Deserialize, Serialize}; use serde_json::Value; -use sqlite::OpenFlags; use crate::GeneratedResponses; diff --git a/src/transcription.rs b/src/transcription.rs new file mode 100644 index 0000000..9ba2ff8 --- /dev/null +++ b/src/transcription.rs @@ -0,0 +1,139 @@ +use std::{io::Read, sync::{Arc, Mutex}}; + +use async_openai::{Client, config::OpenAIConfig, types::{InputSource, audio::{AudioInput, CreateTranscriptionRequest}}}; +use jack::{AudioIn, ClientOptions}; +use oximedia_metering::vu_meter::VuMeter; +use tempfile::SpooledData; +use tokio::sync::{mpsc, watch}; + +use crate::events::AudioRecordRequest; + +struct RcFile(Arc>); + +impl std::io::Write for RcFile { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.lock().unwrap().write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0.lock().unwrap().flush() + } +} + +impl std::io::Seek for RcFile { + fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result { + self.0.lock().unwrap().seek(pos) + } +} + +pub async fn start_transcription(messages: mpsc::Sender) -> (watch::Receiver, watch::Sender, mpsc::Receiver) { + let (audio_sink, mut audio_src) = mpsc::channel(32); + let (audio_state_sender, audio_state_receiver) = watch::channel(0.); + let (audio_control_in, mut audio_control_out) = watch::channel(AudioRecordRequest::Finish); + let (transcription_in, transcription_out) = mpsc::channel(1); + + let rate = start_audio_input(&messages, audio_sink).await; + + tokio::spawn(async move { + let spec = hound::WavSpec { + channels: 1, + sample_rate: rate, + bits_per_sample: 16, + sample_format: hound::SampleFormat::Int + }; + + let mut meter = VuMeter::new(rate.into(), 1, None); + + let spool_size = 16 * (rate as usize) * 10;// 10 seconds of audio + let mut writer = None; + let mut outfile = None; + + let client: Client = Client::default(); + + loop { + tokio::select! { + _ = audio_control_out.changed() => { + let audio_event = *audio_control_out.borrow_and_update(); + match audio_event { + AudioRecordRequest::Start => { + outfile = Some(Arc::new(Mutex::new(tempfile::spooled_tempfile(spool_size)))); + writer = Some(hound::WavWriter::new(RcFile(outfile.as_ref().unwrap().clone()), spec).unwrap()); + }, + AudioRecordRequest::Finish => { + writer = None; + + let final_audio = outfile.take().unwrap(); + let bytes = match Arc::into_inner(final_audio).unwrap().into_inner().unwrap().into_inner() { + SpooledData::OnDisk(mut file) => { + let mut bytes = Vec::new(); + file.read_to_end(&mut bytes).unwrap(); + bytes.into() + }, + SpooledData::InMemory(cursor) => cursor.into_inner().into(), + }; + let c = client.clone(); + let t = transcription_in.clone(); + tokio::spawn(async move { + let response = c.audio().transcription().create(CreateTranscriptionRequest { + file: AudioInput { source: InputSource::Bytes { filename: "transcription.wav".into(), bytes } }, + model: "gpt-4o-mini-transcribe".into(), + ..Default::default() + }).await.unwrap(); + t.send(response.text).await.unwrap(); + }); + } + } + }, + maybe_audio_packet = audio_src.recv() => { + let buf = maybe_audio_packet.unwrap(); + + meter.process_interleaved(buf.as_slice()); + if let Some(w) = writer.as_mut() { + for sample in buf.iter().copied() { + let sample_i16 = (sample * 32768.0) + .clamp(i16::MIN as f32, i16::MAX as f32) + as i16; + w.write_sample(sample_i16).unwrap(); + } + w.flush().unwrap(); + } + audio_state_sender.send_if_modified(|v| { + let next_vu = meter.channel_vu(0).unwrap(); + if *v != next_vu { + *v = next_vu; + true + } else { + false + } + }); + } + }; + } + }); + + (audio_state_receiver, audio_control_in, transcription_out) +} + +async fn start_audio_input(messages: &mpsc::Sender, audio_sink: mpsc::Sender>) -> u32 { + let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap(); + let port = client.register_port("microphone-in", AudioIn::default()).unwrap(); + let rate = client.sample_rate(); + + if let Ok(_) = client.connect_ports_by_name("mixxx-mic-1:capture_MONO", port.name().unwrap().as_str()) { + messages.send("Connected to audio.".into()).await.unwrap(); + } else { + messages.send("Failed to reconnect to audio.".into()).await.unwrap(); + } + + let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| { + if port.connected_count().unwrap() > 0 { + let buf: Vec<_> = port.as_slice(scope).iter().copied().collect(); + audio_sink.blocking_send(buf).unwrap(); + } + jack::Control::Continue + }); + + std::mem::forget(client.activate_async((), handler).unwrap()); + + rate +} \ No newline at end of file