transcription: split out transcription task into separate module
This commit is contained in:
+12
-128
@@ -1,21 +1,17 @@
|
|||||||
use async_openai::{Client, config::OpenAIConfig, types::{InputSource, audio::{AudioInput, CreateTranscriptionRequest}, chat::{ChatCompletionMessageToolCalls, ChatCompletionTool, ChatCompletionTools, CreateChatCompletionRequest, CreateChatCompletionResponse, FunctionObject}}};
|
use async_openai::{Client, config::OpenAIConfig, types::chat::{ChatCompletionMessageToolCalls, CreateChatCompletionRequest, CreateChatCompletionResponse}};
|
||||||
use chrono::{DateTime, Duration, Utc};
|
use chrono::{DateTime, Duration, Utc};
|
||||||
use futures_timer::Delay;
|
use futures_timer::Delay;
|
||||||
use jack::{AudioIn, ClientOptions};
|
use schemars::JsonSchema;
|
||||||
use oximedia_metering::vu_meter::VuMeter;
|
|
||||||
use schemars::{JsonSchema, schema_for};
|
|
||||||
use scraper::{Html, Selector};
|
use scraper::{Html, Selector};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use ratatui::{Frame, layout::{Constraint, Direction, Layout}, widgets::{Block, BorderType, Clear, Gauge, List, ListDirection, ListItem, ListState, Paragraph, Wrap}};
|
use ratatui::{Frame, layout::{Constraint, Direction, Layout}, widgets::{Block, BorderType, Clear, Gauge, List, ListDirection, ListItem, ListState, Paragraph, Wrap}};
|
||||||
use serde_json::Value;
|
|
||||||
use sqlite::OpenFlags;
|
use sqlite::OpenFlags;
|
||||||
use tempfile::SpooledData;
|
|
||||||
use throbber_widgets_tui::{Throbber, ThrobberState};
|
use throbber_widgets_tui::{Throbber, ThrobberState};
|
||||||
use crossterm::{event::{self, EventStream, KeyCode, KeyModifiers}};
|
use crossterm::{event::{self, EventStream, KeyCode, KeyModifiers}};
|
||||||
use tokio::{sync::{mpsc, watch}, time::Instant};
|
use tokio::{sync::{mpsc, watch}, time::Instant};
|
||||||
use tui_input::{Input, backend::crossterm::EventHandler};
|
use tui_input::{Input, backend::crossterm::EventHandler};
|
||||||
use std::{cell::RefCell, io::Read, process::Command, rc::Rc, sync::{Arc, Mutex}};
|
use std::process::Command ;
|
||||||
use futures::{StreamExt, future::FutureExt};
|
use futures::{StreamExt, future::FutureExt};
|
||||||
|
|
||||||
// TODO: We should have a separate 'state.json' file, which remembers jack connections, and the world time for the show to end. Then we only update the 'time remaining' field in the scene and only deal with relative durations inside the scene data
|
// TODO: We should have a separate 'state.json' file, which remembers jack connections, and the world time for the show to end. Then we only update the 'time remaining' field in the scene and only deal with relative durations inside the scene data
|
||||||
@@ -52,6 +48,7 @@ use crate::{events::AudioRecordRequest, scene::{ConversationEntry, PlaylistEntry
|
|||||||
|
|
||||||
mod scene;
|
mod scene;
|
||||||
mod events;
|
mod events;
|
||||||
|
mod transcription;
|
||||||
|
|
||||||
#[derive(JsonSchema, Deserialize, Serialize, Debug, Clone)]
|
#[derive(JsonSchema, Deserialize, Serialize, Debug, Clone)]
|
||||||
struct PossibleResponse {
|
struct PossibleResponse {
|
||||||
@@ -525,24 +522,6 @@ impl App {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct RcFile<T>(Arc<Mutex<T>>);
|
|
||||||
|
|
||||||
impl<T: std::io::Write> std::io::Write for RcFile<T> {
|
|
||||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
|
||||||
self.0.lock().unwrap().write(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flush(&mut self) -> std::io::Result<()> {
|
|
||||||
self.0.lock().unwrap().flush()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: std::io::Seek> std::io::Seek for RcFile<T> {
|
|
||||||
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
|
|
||||||
self.0.lock().unwrap().seek(pos)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
color_eyre::install().unwrap();
|
color_eyre::install().unwrap();
|
||||||
@@ -554,16 +533,14 @@ async fn main() {
|
|||||||
|
|
||||||
let mut terminal: Terminal<CrosstermBackend<std::io::Stdout>> = ratatui::init();
|
let mut terminal: Terminal<CrosstermBackend<std::io::Stdout>> = ratatui::init();
|
||||||
|
|
||||||
|
let (sys_message_sender, mut sys_message_receiver) = tokio::sync::mpsc::channel(5);
|
||||||
|
|
||||||
let (tts_request_sender, mut tts_request_receiver) = tokio::sync::mpsc::channel(3);
|
let (tts_request_sender, mut tts_request_receiver) = tokio::sync::mpsc::channel(3);
|
||||||
|
|
||||||
let (prediction_in, mut prediction_out) = tokio::sync::watch::channel(None);
|
let (prediction_in, mut prediction_out) = tokio::sync::watch::channel(None);
|
||||||
let (prediction_request_in, mut prediction_request_out) = tokio::sync::watch::channel(Scene::default());
|
let (prediction_request_in, mut prediction_request_out) = tokio::sync::watch::channel(Scene::default());
|
||||||
|
|
||||||
let (audio_in, mut audio_out) = tokio::sync::mpsc::channel(32);
|
let (mut audio_state_receiver, audio_control_in, mut transcription_out) = transcription::start_transcription(sys_message_sender).await;
|
||||||
let (audio_state_sender, mut audio_state_receiver) = tokio::sync::watch::channel(0.);
|
|
||||||
|
|
||||||
let (audio_control_in, mut audio_control_out) = tokio::sync::watch::channel(AudioRecordRequest::Finish);
|
|
||||||
let (transcrption_in, mut transcription_out) = tokio::sync::mpsc::channel(1);
|
|
||||||
|
|
||||||
let mut app = App::new(prediction_request_in, audio_control_in, tts_request_sender);
|
let mut app = App::new(prediction_request_in, audio_control_in, tts_request_sender);
|
||||||
app.load();
|
app.load();
|
||||||
@@ -576,104 +553,6 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap();
|
|
||||||
let port = client.register_port("microphone-in", AudioIn::default()).unwrap();
|
|
||||||
let rate = client.sample_rate();
|
|
||||||
|
|
||||||
|
|
||||||
if let Ok(_) = client.connect_ports_by_name("mixxx-mic-1:capture_MONO", port.name().unwrap().as_str()) {
|
|
||||||
app.scene.insert_conversation(ConversationEntry::SystemMessage("Connected to audio.".into()));
|
|
||||||
} else {
|
|
||||||
app.scene.insert_conversation(ConversationEntry::SystemMessage("Failed to reconnect to audio.".into()));
|
|
||||||
}
|
|
||||||
|
|
||||||
let handler = jack::contrib::ClosureProcessHandler::new(move |client, scope| {
|
|
||||||
if port.connected_count().unwrap() > 0 {
|
|
||||||
let buf: Vec<_> = port.as_slice(scope).iter().copied().collect();
|
|
||||||
audio_in.blocking_send(buf).unwrap();
|
|
||||||
}
|
|
||||||
jack::Control::Continue
|
|
||||||
});
|
|
||||||
|
|
||||||
std::mem::forget(client.activate_async((), handler).unwrap());
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let spec = hound::WavSpec {
|
|
||||||
channels: 1,
|
|
||||||
sample_rate: rate,
|
|
||||||
bits_per_sample: 16,
|
|
||||||
sample_format: hound::SampleFormat::Int
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut meter = VuMeter::new(rate.into(), 1, None);
|
|
||||||
|
|
||||||
let spool_size = 16 * (rate as usize) * 10;// 10 seconds of audio
|
|
||||||
let mut writer = None;
|
|
||||||
let mut outfile = None;
|
|
||||||
|
|
||||||
let client: Client<OpenAIConfig> = Client::default();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
_ = audio_control_out.changed() => {
|
|
||||||
let audio_event = *audio_control_out.borrow_and_update();
|
|
||||||
match audio_event {
|
|
||||||
AudioRecordRequest::Start => {
|
|
||||||
outfile = Some(Arc::new(Mutex::new(tempfile::spooled_tempfile(spool_size))));
|
|
||||||
writer = Some(hound::WavWriter::new(RcFile(outfile.as_ref().unwrap().clone()), spec).unwrap());
|
|
||||||
},
|
|
||||||
AudioRecordRequest::Finish => {
|
|
||||||
writer = None;
|
|
||||||
|
|
||||||
let final_audio = outfile.take().unwrap();
|
|
||||||
let bytes = match Arc::into_inner(final_audio).unwrap().into_inner().unwrap().into_inner() {
|
|
||||||
SpooledData::OnDisk(mut file) => {
|
|
||||||
let mut bytes = Vec::new();
|
|
||||||
file.read_to_end(&mut bytes).unwrap();
|
|
||||||
bytes.into()
|
|
||||||
},
|
|
||||||
SpooledData::InMemory(cursor) => cursor.into_inner().into(),
|
|
||||||
};
|
|
||||||
let c = client.clone();
|
|
||||||
let t = transcrption_in.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let response = c.audio().transcription().create(CreateTranscriptionRequest {
|
|
||||||
file: AudioInput { source: InputSource::Bytes { filename: "transcription.wav".into(), bytes } },
|
|
||||||
model: "gpt-4o-mini-transcribe".into(),
|
|
||||||
..Default::default()
|
|
||||||
}).await.unwrap();
|
|
||||||
t.send(response.text).await.unwrap();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
maybe_audio_packet = audio_out.recv() => {
|
|
||||||
let buf = maybe_audio_packet.unwrap();
|
|
||||||
|
|
||||||
meter.process_interleaved(buf.as_slice());
|
|
||||||
if let Some(w) = writer.as_mut() {
|
|
||||||
for sample in buf.iter().copied() {
|
|
||||||
let sample_i16 = (sample * 32768.0)
|
|
||||||
.clamp(i16::MIN as f32, i16::MAX as f32)
|
|
||||||
as i16;
|
|
||||||
w.write_sample(sample_i16).unwrap();
|
|
||||||
}
|
|
||||||
w.flush().unwrap();
|
|
||||||
}
|
|
||||||
audio_state_sender.send_if_modified(|v| {
|
|
||||||
let next_vu = meter.channel_vu(0).unwrap();
|
|
||||||
if *v != next_vu {
|
|
||||||
*v = next_vu;
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let client: Client<OpenAIConfig> = Client::default();
|
let client: Client<OpenAIConfig> = Client::default();
|
||||||
loop {
|
loop {
|
||||||
@@ -724,6 +603,11 @@ async fn main() {
|
|||||||
_ = audio_state_receiver.changed() => {
|
_ = audio_state_receiver.changed() => {
|
||||||
app.audio_level = *audio_state_receiver.borrow_and_update();
|
app.audio_level = *audio_state_receiver.borrow_and_update();
|
||||||
},
|
},
|
||||||
|
maybe_message = sys_message_receiver.recv() => {
|
||||||
|
if let Some(message) = maybe_message {
|
||||||
|
app.scene.insert_conversation(ConversationEntry::SystemMessage(message));
|
||||||
|
}
|
||||||
|
},
|
||||||
maybe_transcription = transcription_out.recv() => {
|
maybe_transcription = transcription_out.recv() => {
|
||||||
app.scene.insert_conversation(ConversationEntry::User(maybe_transcription.unwrap()));
|
app.scene.insert_conversation(ConversationEntry::User(maybe_transcription.unwrap()));
|
||||||
app.regenerate_responses();
|
app.regenerate_responses();
|
||||||
|
|||||||
@@ -1,10 +1,8 @@
|
|||||||
use async_openai::types::chat::*;
|
use async_openai::types::chat::*;
|
||||||
use chrono::Duration;
|
use chrono::Duration;
|
||||||
use crossterm::event::MediaKeyCode::Play;
|
|
||||||
use schemars::schema_for;
|
use schemars::schema_for;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use sqlite::OpenFlags;
|
|
||||||
|
|
||||||
use crate::GeneratedResponses;
|
use crate::GeneratedResponses;
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,139 @@
|
|||||||
|
use std::{io::Read, sync::{Arc, Mutex}};
|
||||||
|
|
||||||
|
use async_openai::{Client, config::OpenAIConfig, types::{InputSource, audio::{AudioInput, CreateTranscriptionRequest}}};
|
||||||
|
use jack::{AudioIn, ClientOptions};
|
||||||
|
use oximedia_metering::vu_meter::VuMeter;
|
||||||
|
use tempfile::SpooledData;
|
||||||
|
use tokio::sync::{mpsc, watch};
|
||||||
|
|
||||||
|
use crate::events::AudioRecordRequest;
|
||||||
|
|
||||||
|
struct RcFile<T>(Arc<Mutex<T>>);
|
||||||
|
|
||||||
|
impl<T: std::io::Write> std::io::Write for RcFile<T> {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
|
self.0.lock().unwrap().write(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> std::io::Result<()> {
|
||||||
|
self.0.lock().unwrap().flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: std::io::Seek> std::io::Seek for RcFile<T> {
|
||||||
|
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
|
||||||
|
self.0.lock().unwrap().seek(pos)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_transcription(messages: mpsc::Sender<String>) -> (watch::Receiver<f64>, watch::Sender<AudioRecordRequest>, mpsc::Receiver<String>) {
|
||||||
|
let (audio_sink, mut audio_src) = mpsc::channel(32);
|
||||||
|
let (audio_state_sender, audio_state_receiver) = watch::channel(0.);
|
||||||
|
let (audio_control_in, mut audio_control_out) = watch::channel(AudioRecordRequest::Finish);
|
||||||
|
let (transcription_in, transcription_out) = mpsc::channel(1);
|
||||||
|
|
||||||
|
let rate = start_audio_input(&messages, audio_sink).await;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let spec = hound::WavSpec {
|
||||||
|
channels: 1,
|
||||||
|
sample_rate: rate,
|
||||||
|
bits_per_sample: 16,
|
||||||
|
sample_format: hound::SampleFormat::Int
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut meter = VuMeter::new(rate.into(), 1, None);
|
||||||
|
|
||||||
|
let spool_size = 16 * (rate as usize) * 10;// 10 seconds of audio
|
||||||
|
let mut writer = None;
|
||||||
|
let mut outfile = None;
|
||||||
|
|
||||||
|
let client: Client<OpenAIConfig> = Client::default();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = audio_control_out.changed() => {
|
||||||
|
let audio_event = *audio_control_out.borrow_and_update();
|
||||||
|
match audio_event {
|
||||||
|
AudioRecordRequest::Start => {
|
||||||
|
outfile = Some(Arc::new(Mutex::new(tempfile::spooled_tempfile(spool_size))));
|
||||||
|
writer = Some(hound::WavWriter::new(RcFile(outfile.as_ref().unwrap().clone()), spec).unwrap());
|
||||||
|
},
|
||||||
|
AudioRecordRequest::Finish => {
|
||||||
|
writer = None;
|
||||||
|
|
||||||
|
let final_audio = outfile.take().unwrap();
|
||||||
|
let bytes = match Arc::into_inner(final_audio).unwrap().into_inner().unwrap().into_inner() {
|
||||||
|
SpooledData::OnDisk(mut file) => {
|
||||||
|
let mut bytes = Vec::new();
|
||||||
|
file.read_to_end(&mut bytes).unwrap();
|
||||||
|
bytes.into()
|
||||||
|
},
|
||||||
|
SpooledData::InMemory(cursor) => cursor.into_inner().into(),
|
||||||
|
};
|
||||||
|
let c = client.clone();
|
||||||
|
let t = transcription_in.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let response = c.audio().transcription().create(CreateTranscriptionRequest {
|
||||||
|
file: AudioInput { source: InputSource::Bytes { filename: "transcription.wav".into(), bytes } },
|
||||||
|
model: "gpt-4o-mini-transcribe".into(),
|
||||||
|
..Default::default()
|
||||||
|
}).await.unwrap();
|
||||||
|
t.send(response.text).await.unwrap();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
maybe_audio_packet = audio_src.recv() => {
|
||||||
|
let buf = maybe_audio_packet.unwrap();
|
||||||
|
|
||||||
|
meter.process_interleaved(buf.as_slice());
|
||||||
|
if let Some(w) = writer.as_mut() {
|
||||||
|
for sample in buf.iter().copied() {
|
||||||
|
let sample_i16 = (sample * 32768.0)
|
||||||
|
.clamp(i16::MIN as f32, i16::MAX as f32)
|
||||||
|
as i16;
|
||||||
|
w.write_sample(sample_i16).unwrap();
|
||||||
|
}
|
||||||
|
w.flush().unwrap();
|
||||||
|
}
|
||||||
|
audio_state_sender.send_if_modified(|v| {
|
||||||
|
let next_vu = meter.channel_vu(0).unwrap();
|
||||||
|
if *v != next_vu {
|
||||||
|
*v = next_vu;
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
(audio_state_receiver, audio_control_in, transcription_out)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start_audio_input(messages: &mpsc::Sender<String>, audio_sink: mpsc::Sender<Vec<f32>>) -> u32 {
|
||||||
|
let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap();
|
||||||
|
let port = client.register_port("microphone-in", AudioIn::default()).unwrap();
|
||||||
|
let rate = client.sample_rate();
|
||||||
|
|
||||||
|
if let Ok(_) = client.connect_ports_by_name("mixxx-mic-1:capture_MONO", port.name().unwrap().as_str()) {
|
||||||
|
messages.send("Connected to audio.".into()).await.unwrap();
|
||||||
|
} else {
|
||||||
|
messages.send("Failed to reconnect to audio.".into()).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| {
|
||||||
|
if port.connected_count().unwrap() > 0 {
|
||||||
|
let buf: Vec<_> = port.as_slice(scope).iter().copied().collect();
|
||||||
|
audio_sink.blocking_send(buf).unwrap();
|
||||||
|
}
|
||||||
|
jack::Control::Continue
|
||||||
|
});
|
||||||
|
|
||||||
|
std::mem::forget(client.activate_async((), handler).unwrap());
|
||||||
|
|
||||||
|
rate
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user