events: start moving towards a more control-handle based task architecture
This commit is contained in:
+22
-22
@@ -13,7 +13,7 @@ use futures::{StreamExt, future::FutureExt};
|
|||||||
use ratatui::prelude::*;
|
use ratatui::prelude::*;
|
||||||
use tui_skeleton::{AnimationMode, SkeletonText};
|
use tui_skeleton::{AnimationMode, SkeletonText};
|
||||||
|
|
||||||
use crate::{events::AudioRecordRequest, prediction::{BandcampResult, PossibleResponse}, scene::{ConversationEntry, Scene, StageActions, StageDirection}, tts::start_tts};
|
use crate::{prediction::{BandcampResult, PossibleResponse}, scene::{ConversationEntry, Scene, StageActions, StageDirection}, transcription::{AudioInputControl, TranscriptionControl, start_audio_input}, tts::{TtsControl, start_tts}};
|
||||||
|
|
||||||
mod scene;
|
mod scene;
|
||||||
mod events;
|
mod events;
|
||||||
@@ -79,9 +79,10 @@ struct App {
|
|||||||
recording_audio: bool,
|
recording_audio: bool,
|
||||||
focus_state: FocusState,
|
focus_state: FocusState,
|
||||||
|
|
||||||
audio_control_sink: watch::Sender<AudioRecordRequest>,
|
transcription: TranscriptionControl,
|
||||||
prediction_request_sink: watch::Sender<StageActions>,
|
prediction_request_sink: watch::Sender<StageActions>,
|
||||||
tts_request_sink: mpsc::Sender<String>,
|
audio: AudioInputControl,
|
||||||
|
tts: TtsControl,
|
||||||
sys_message_sink: mpsc::Sender<String>
|
sys_message_sink: mpsc::Sender<String>
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -110,7 +111,7 @@ impl From<bandcamp::Error> for BandcampError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl App {
|
impl App {
|
||||||
fn new(prediction_request_sink: watch::Sender<StageActions>, audio_control_sink: watch::Sender<AudioRecordRequest>, tts_request_sink: mpsc::Sender<String>, sys_message_sink: mpsc::Sender<String>, initial_direction: StageDirection) -> Self {
|
fn new(prediction_request_sink: watch::Sender<StageActions>, audio: AudioInputControl, transcription: TranscriptionControl, tts: TtsControl, sys_message_sink: mpsc::Sender<String>, initial_direction: StageDirection) -> Self {
|
||||||
Self {
|
Self {
|
||||||
scene: Default::default(),
|
scene: Default::default(),
|
||||||
direction: initial_direction,
|
direction: initial_direction,
|
||||||
@@ -123,10 +124,11 @@ impl App {
|
|||||||
prediction_request_sink,
|
prediction_request_sink,
|
||||||
is_requesting: false,
|
is_requesting: false,
|
||||||
audio_level: -60.,
|
audio_level: -60.,
|
||||||
|
audio,
|
||||||
recording_audio: false,
|
recording_audio: false,
|
||||||
audio_control_sink,
|
transcription,
|
||||||
focus_state: FocusState::UserInput,
|
focus_state: FocusState::UserInput,
|
||||||
tts_request_sink,
|
tts,
|
||||||
sys_message_sink
|
sys_message_sink
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -392,7 +394,7 @@ impl App {
|
|||||||
self.next_actions.push(ConversationEntry::StageDirection(direction.clone()));
|
self.next_actions.push(ConversationEntry::StageDirection(direction.clone()));
|
||||||
}
|
}
|
||||||
self.next_actions.push(ConversationEntry::Eva(selected.text.clone()));
|
self.next_actions.push(ConversationEntry::Eva(selected.text.clone()));
|
||||||
self.speak(selected.text.clone()).await;
|
self.tts.speak(selected.text.clone()).await;
|
||||||
self.regenerate_responses();
|
self.regenerate_responses();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -476,7 +478,7 @@ impl App {
|
|||||||
KeyCode::Enter => {
|
KeyCode::Enter => {
|
||||||
let row_num = self.conversation_state.selected().unwrap();
|
let row_num = self.conversation_state.selected().unwrap();
|
||||||
if let ConversationEntry::Eva(text) = &self.scene.conversation()[self.scene.conversation().len() - 1 - row_num] {
|
if let ConversationEntry::Eva(text) = &self.scene.conversation()[self.scene.conversation().len() - 1 - row_num] {
|
||||||
self.speak(text.clone()).await;
|
self.tts.speak(text.clone()).await;
|
||||||
self.focus_state = FocusState::UserInput;
|
self.focus_state = FocusState::UserInput;
|
||||||
self.conversation_state.select(None);
|
self.conversation_state.select(None);
|
||||||
}
|
}
|
||||||
@@ -494,11 +496,11 @@ impl App {
|
|||||||
KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => {
|
KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => {
|
||||||
if self.recording_audio {
|
if self.recording_audio {
|
||||||
self.recording_audio = false;
|
self.recording_audio = false;
|
||||||
self.audio_control_sink.send_replace(AudioRecordRequest::Finish);
|
self.transcription.stop();
|
||||||
self.is_requesting = true;
|
self.is_requesting = true;
|
||||||
} else {
|
} else {
|
||||||
self.recording_audio = true;
|
self.recording_audio = true;
|
||||||
self.audio_control_sink.send_replace(AudioRecordRequest::Start);
|
self.transcription.start();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
KeyCode::Down => self.reply_state.select_next(),
|
KeyCode::Down => self.reply_state.select_next(),
|
||||||
@@ -536,10 +538,6 @@ impl App {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn speak(&mut self, text: String) {
|
|
||||||
self.tts_request_sink.send(text).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
fn regenerate_responses(&mut self) {
|
fn regenerate_responses(&mut self) {
|
||||||
let actions = StageActions {
|
let actions = StageActions {
|
||||||
direction: self.direction.clone(),
|
direction: self.direction.clone(),
|
||||||
@@ -610,11 +608,13 @@ async fn main() {
|
|||||||
SaveData::default()
|
SaveData::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
let tts_request_sender = start_tts().await;
|
let (audio_ctrl, mic_stream) = start_audio_input(&sys_message_sink).await;
|
||||||
let (prediction_request_in, mut prediction_out) = prediction::start_prediction(sys_message_src, saved_session.messages).await;
|
|
||||||
let (mut audio_state_receiver, audio_control_in, mut transcription_out) = transcription::start_transcription(sys_message_sink.clone()).await;
|
|
||||||
|
|
||||||
let mut app = App::new(prediction_request_in, audio_control_in, tts_request_sender, sys_message_sink, saved_session.direction);
|
let tts_ctrl = start_tts().await;
|
||||||
|
let (prediction_request_in, mut prediction_out) = prediction::start_prediction(sys_message_src, saved_session.messages).await;
|
||||||
|
let transcription_ctrl = transcription::start_transcription(mic_stream).await;
|
||||||
|
|
||||||
|
let mut app = App::new(prediction_request_in, audio_ctrl, transcription_ctrl, tts_ctrl, sys_message_sink, saved_session.direction);
|
||||||
|
|
||||||
let mut events = EventStream::new();
|
let mut events = EventStream::new();
|
||||||
let mut last_tick = Instant::now();
|
let mut last_tick = Instant::now();
|
||||||
@@ -637,11 +637,11 @@ async fn main() {
|
|||||||
app.reply_state.select_first();
|
app.reply_state.select_first();
|
||||||
app.is_requesting = false;
|
app.is_requesting = false;
|
||||||
},
|
},
|
||||||
_ = audio_state_receiver.changed() => {
|
next_volume = app.audio.next() => {
|
||||||
app.audio_level = *audio_state_receiver.borrow();
|
app.audio_level = next_volume
|
||||||
},
|
},
|
||||||
maybe_transcription = transcription_out.recv() => {
|
transcription_result = app.transcription.next() => {
|
||||||
app.next_actions.push(ConversationEntry::User(maybe_transcription.unwrap()));
|
app.next_actions.push(ConversationEntry::User(transcription_result));
|
||||||
app.regenerate_responses();
|
app.regenerate_responses();
|
||||||
}
|
}
|
||||||
maybe_event = event => {
|
maybe_event = event => {
|
||||||
|
|||||||
+107
-39
@@ -4,10 +4,60 @@ use async_openai::{Client, config::OpenAIConfig, types::{InputSource, audio::{Au
|
|||||||
use jack::{AudioIn, ClientOptions};
|
use jack::{AudioIn, ClientOptions};
|
||||||
use oximedia_metering::vu_meter::VuMeter;
|
use oximedia_metering::vu_meter::VuMeter;
|
||||||
use tempfile::SpooledData;
|
use tempfile::SpooledData;
|
||||||
use tokio::sync::{mpsc, watch};
|
use tokio::sync::{mpsc, oneshot, watch};
|
||||||
|
|
||||||
use crate::events::AudioRecordRequest;
|
use crate::events::AudioRecordRequest;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct TranscriptionControl {
|
||||||
|
transcription_result_src: mpsc::Receiver<String>,
|
||||||
|
record_state_sink: watch::Sender<AudioRecordRequest>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TranscriptionControl {
|
||||||
|
pub fn start(&mut self) {
|
||||||
|
self.record_state_sink.send(AudioRecordRequest::Start).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn stop(&mut self) {
|
||||||
|
self.record_state_sink.send(AudioRecordRequest::Finish).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn next(&mut self) -> String {
|
||||||
|
self.transcription_result_src.recv().await.unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct JackClientRef {
|
||||||
|
killswitch: Option<oneshot::Sender<()>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for JackClientRef {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.killswitch.take().unwrap().send(()).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct AudioInputControl {
|
||||||
|
volume_src: watch::Receiver<f64>,
|
||||||
|
_jack_client: JackClientRef
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AudioInputControl {
|
||||||
|
pub async fn next(&mut self) -> f64 {
|
||||||
|
self.volume_src.changed().await.unwrap();
|
||||||
|
*self.volume_src.borrow_and_update()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct MicStream {
|
||||||
|
src: mpsc::Receiver<Vec<f32>>,
|
||||||
|
sample_rate: u32
|
||||||
|
}
|
||||||
|
|
||||||
struct RcFile<T>(Arc<Mutex<T>>);
|
struct RcFile<T>(Arc<Mutex<T>>);
|
||||||
|
|
||||||
impl<T: std::io::Write> std::io::Write for RcFile<T> {
|
impl<T: std::io::Write> std::io::Write for RcFile<T> {
|
||||||
@@ -26,25 +76,24 @@ impl<T: std::io::Seek> std::io::Seek for RcFile<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_transcription(messages: mpsc::Sender<String>) -> (watch::Receiver<f64>, watch::Sender<AudioRecordRequest>, mpsc::Receiver<String>) {
|
pub async fn start_transcription(mut mic_src: MicStream) -> TranscriptionControl {
|
||||||
let (audio_sink, mut audio_src) = mpsc::channel(32);
|
|
||||||
let (audio_state_sender, audio_state_receiver) = watch::channel(0.);
|
|
||||||
let (audio_control_in, mut audio_control_out) = watch::channel(AudioRecordRequest::Finish);
|
let (audio_control_in, mut audio_control_out) = watch::channel(AudioRecordRequest::Finish);
|
||||||
let (transcription_in, transcription_out) = mpsc::channel(1);
|
let (transcription_in, transcription_out) = mpsc::channel(1);
|
||||||
|
|
||||||
let rate = start_audio_input(&messages, audio_sink).await;
|
let ret = TranscriptionControl {
|
||||||
|
record_state_sink: audio_control_in,
|
||||||
|
transcription_result_src: transcription_out
|
||||||
|
};
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let spec = hound::WavSpec {
|
let spec = hound::WavSpec {
|
||||||
channels: 1,
|
channels: 1,
|
||||||
sample_rate: rate,
|
sample_rate: mic_src.sample_rate,
|
||||||
bits_per_sample: 16,
|
bits_per_sample: 16,
|
||||||
sample_format: hound::SampleFormat::Int
|
sample_format: hound::SampleFormat::Int
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut meter = VuMeter::new(rate.into(), 1, None);
|
let spool_size = 16 * (mic_src.sample_rate as usize) * 10;// 10 seconds of audio
|
||||||
|
|
||||||
let spool_size = 16 * (rate as usize) * 10;// 10 seconds of audio
|
|
||||||
let mut writer = None;
|
let mut writer = None;
|
||||||
let mut outfile = None;
|
let mut outfile = None;
|
||||||
|
|
||||||
@@ -85,10 +134,7 @@ pub async fn start_transcription(messages: mpsc::Sender<String>) -> (watch::Rece
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
maybe_audio_packet = audio_src.recv() => {
|
Some(buf) = mic_src.src.recv() => {
|
||||||
let buf = maybe_audio_packet.unwrap();
|
|
||||||
|
|
||||||
meter.process_interleaved(buf.as_slice());
|
|
||||||
if let Some(w) = writer.as_mut() {
|
if let Some(w) = writer.as_mut() {
|
||||||
for sample in buf.iter().copied() {
|
for sample in buf.iter().copied() {
|
||||||
let sample_i16 = (sample * 32768.0)
|
let sample_i16 = (sample * 32768.0)
|
||||||
@@ -98,7 +144,40 @@ pub async fn start_transcription(messages: mpsc::Sender<String>) -> (watch::Rece
|
|||||||
}
|
}
|
||||||
w.flush().unwrap();
|
w.flush().unwrap();
|
||||||
}
|
}
|
||||||
audio_state_sender.send_if_modified(|v| {
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_audio_input(messages: &mpsc::Sender<String>) -> (AudioInputControl, MicStream) {
|
||||||
|
|
||||||
|
let (exit_tx, exit_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let (mic_audio_sink, mic_audio_src) = mpsc::channel(32);
|
||||||
|
let (volume_sink, volume_src) = watch::channel(0.);
|
||||||
|
|
||||||
|
let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap();
|
||||||
|
let mic_port = client.register_port("microphone-in", AudioIn::default()).unwrap();
|
||||||
|
let rate = client.sample_rate();
|
||||||
|
|
||||||
|
if let Ok(_) = client.connect_ports_by_name("mixxx-mic-1:capture_MONO", mic_port.name().unwrap().as_str()) {
|
||||||
|
messages.send("Connected to audio.".into()).await.unwrap();
|
||||||
|
} else {
|
||||||
|
messages.send("Failed to reconnect to audio.".into()).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut meter = VuMeter::new(rate.into(), 1, None);
|
||||||
|
|
||||||
|
let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| {
|
||||||
|
if mic_port.connected_count().unwrap() > 0 {
|
||||||
|
let buf: Vec<_> = mic_port.as_slice(scope).iter().copied().collect();
|
||||||
|
meter.process_interleaved(&buf);
|
||||||
|
mic_audio_sink.blocking_send(buf).unwrap();
|
||||||
|
|
||||||
|
volume_sink.send_if_modified(|v| {
|
||||||
let next_vu = meter.channel_vu(0).unwrap();
|
let next_vu = meter.channel_vu(0).unwrap();
|
||||||
if *v != next_vu {
|
if *v != next_vu {
|
||||||
*v = next_vu;
|
*v = next_vu;
|
||||||
@@ -108,33 +187,22 @@ pub async fn start_transcription(messages: mpsc::Sender<String>) -> (watch::Rece
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
(audio_state_receiver, audio_control_in, transcription_out)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn start_audio_input(messages: &mpsc::Sender<String>, audio_sink: mpsc::Sender<Vec<f32>>) -> u32 {
|
|
||||||
let (client, _status) = jack::Client::new("Eva-Cohost", ClientOptions::default() | ClientOptions::SESSION_ID).unwrap();
|
|
||||||
let port = client.register_port("microphone-in", AudioIn::default()).unwrap();
|
|
||||||
let rate = client.sample_rate();
|
|
||||||
|
|
||||||
if let Ok(_) = client.connect_ports_by_name("mixxx-mic-1:capture_MONO", port.name().unwrap().as_str()) {
|
|
||||||
messages.send("Connected to audio.".into()).await.unwrap();
|
|
||||||
} else {
|
|
||||||
messages.send("Failed to reconnect to audio.".into()).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
let handler = jack::contrib::ClosureProcessHandler::new(move |_client, scope| {
|
|
||||||
if port.connected_count().unwrap() > 0 {
|
|
||||||
let buf: Vec<_> = port.as_slice(scope).iter().copied().collect();
|
|
||||||
audio_sink.blocking_send(buf).unwrap();
|
|
||||||
}
|
|
||||||
jack::Control::Continue
|
jack::Control::Continue
|
||||||
});
|
});
|
||||||
|
|
||||||
std::mem::forget(client.activate_async((), handler).unwrap());
|
tokio::spawn(async move {
|
||||||
|
let async_client = client.activate_async((), handler).unwrap();
|
||||||
|
|
||||||
rate
|
exit_rx.await.unwrap();
|
||||||
|
|
||||||
|
async_client.deactivate().unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
(AudioInputControl {
|
||||||
|
volume_src,
|
||||||
|
_jack_client: JackClientRef { killswitch: Some(exit_tx) }
|
||||||
|
}, MicStream {
|
||||||
|
sample_rate: rate,
|
||||||
|
src: mic_audio_src
|
||||||
|
})
|
||||||
}
|
}
|
||||||
+15
-2
@@ -1,6 +1,17 @@
|
|||||||
use std::process::Command;
|
use std::process::Command;
|
||||||
|
|
||||||
pub async fn start_tts() -> tokio::sync::mpsc::Sender<String> {
|
#[derive(Debug)]
|
||||||
|
pub struct TtsControl {
|
||||||
|
request_sink: tokio::sync::mpsc::Sender<String>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TtsControl {
|
||||||
|
pub async fn speak(&self, text: String) {
|
||||||
|
self.request_sink.send(text).await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start_tts() -> TtsControl {
|
||||||
|
|
||||||
let (tts_request_sender, mut tts_request_receiver) = tokio::sync::mpsc::channel(3);
|
let (tts_request_sender, mut tts_request_receiver) = tokio::sync::mpsc::channel(3);
|
||||||
|
|
||||||
@@ -12,5 +23,7 @@ pub async fn start_tts() -> tokio::sync::mpsc::Sender<String> {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
tts_request_sender
|
TtsControl {
|
||||||
|
request_sink: tts_request_sender
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user