src: adopt the log crate and feed logs into the UI
This commit is contained in:
+45
-19
@@ -1,9 +1,11 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_openai::{Client, config::OpenAIConfig, types::chat::{ChatCompletionMessageToolCalls, ChatCompletionRequestAssistantMessageArgs, ChatCompletionRequestMessage, ChatCompletionRequestSystemMessageArgs, ChatCompletionRequestToolMessageArgs, ChatCompletionTool, ChatCompletionTools, CreateChatCompletionRequestArgs, FinishReason, FunctionObjectArgs, ResponseFormat, ResponseFormatJsonSchema}};
|
||||
use bandcamp::SearchResultItem;
|
||||
use schemars::{JsonSchema, schema_for};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Serializer, ser::CompactFormatter};
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tokio::sync::{RwLock, mpsc, watch};
|
||||
|
||||
use crate::{SaveData, archive::BeatsQueryArgs, artifacts::BandcampQueryArgs, scene::{PredictionAction, Scene, Scenery, StageDirection, conversation::ConversationEntry}};
|
||||
|
||||
@@ -31,7 +33,8 @@ struct Session {
|
||||
direction: StageDirection,
|
||||
scenery: Scenery,
|
||||
tokens_consumed: usize,
|
||||
activity_notify: watch::Sender<bool>
|
||||
activity_notify: watch::Sender<bool>,
|
||||
scene_sink: watch::Sender<Scene>
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
|
||||
@@ -52,7 +55,7 @@ struct ToolResults {
|
||||
}
|
||||
|
||||
impl Session {
|
||||
fn from_initial_messages(messages: Vec<ChatCompletionRequestMessage>, scenery: Scenery, direction: StageDirection, activity_notify: watch::Sender<bool>) -> Self {
|
||||
fn new(scene_sink: watch::Sender<Scene>, messages: Vec<ChatCompletionRequestMessage>, scenery: Scenery, direction: StageDirection, activity_notify: watch::Sender<bool>) -> Self {
|
||||
let mut conversation = vec![];
|
||||
for msg in &messages {
|
||||
if let Ok(conversation_msg) = msg.clone().try_into() {
|
||||
@@ -70,6 +73,7 @@ impl Session {
|
||||
direction,
|
||||
tokens_consumed: 0,
|
||||
activity_notify,
|
||||
scene_sink
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,6 +158,8 @@ impl Session {
|
||||
}
|
||||
|
||||
async fn regenerate_options(&mut self) {
|
||||
self.reply_options.responses.clear();
|
||||
self.refresh();
|
||||
self.activity_notify.send_if_modified(|x| { if !*x { *x = true; true } else { false }});
|
||||
loop {
|
||||
let full_conversation = self.generate_conversation(&self.direction);
|
||||
@@ -181,6 +187,7 @@ impl Session {
|
||||
.build().unwrap()
|
||||
})
|
||||
];
|
||||
self.log("Sending request..");
|
||||
let request = CreateChatCompletionRequestArgs::default()
|
||||
.messages(full_conversation)
|
||||
.model("gpt-5.4-mini")
|
||||
@@ -202,6 +209,7 @@ impl Session {
|
||||
|
||||
if let Some(usage) = response.usage {
|
||||
self.tokens_consumed += usage.total_tokens as usize;
|
||||
self.log(format!("{} tokens cast into the void", usage.total_tokens));
|
||||
}
|
||||
|
||||
if let Some(message) = response.choices.first() {
|
||||
@@ -260,26 +268,38 @@ impl Session {
|
||||
self.reply_options = options;
|
||||
break;
|
||||
} else {
|
||||
self.insert_conversation(ConversationEntry::SystemMessage("Received invalid JSON! Trying again.".into()));
|
||||
self.log("Received invalid JSON! Trying again.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.insert_conversation(ConversationEntry::SystemMessage("No messages were received! Trying again.".into()));
|
||||
self.log("No messages were received! Trying again.");
|
||||
}
|
||||
}
|
||||
self.activity_notify.send_if_modified(|x| { if *x { *x = false; true } else { false }});
|
||||
|
||||
self.refresh();
|
||||
}
|
||||
|
||||
fn as_scene(&self) -> Scene {
|
||||
Scene::new(self.reply_options.clone(), self.conversation.clone(), self.scenery.clone(), self.tokens_consumed, self.direction.clone())
|
||||
}
|
||||
|
||||
fn log<T: Into<String>>(&mut self, msg: T) {
|
||||
self.insert_conversation(ConversationEntry::SystemMessage(msg.into()));
|
||||
}
|
||||
|
||||
fn insert_conversation(&mut self, entry: ConversationEntry) {
|
||||
self.conversation.push(entry.clone());
|
||||
|
||||
if let Ok(next_msg) = entry.try_into() {
|
||||
self.messages.push(next_msg);
|
||||
}
|
||||
|
||||
self.refresh();
|
||||
}
|
||||
|
||||
fn refresh(&self) {
|
||||
self.scene_sink.send(self.as_scene()).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,20 +341,32 @@ impl SessionControl {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_prediction(saved_session: SaveData) -> SessionControl {
|
||||
pub async fn start_prediction(saved_session: SaveData, mut messages: tokio::sync::mpsc::UnboundedReceiver<String>) -> SessionControl {
|
||||
let (prediction_in, prediction_out) = tokio::sync::watch::channel(Scene::default());
|
||||
let (activity_notify_sink, activity_notify_src) = tokio::sync::watch::channel(false);
|
||||
|
||||
let (action_sink, mut action_src) = mpsc::channel(5);
|
||||
|
||||
let mut session = Session::from_initial_messages(saved_session.messages, saved_session.scenery, saved_session.direction, activity_notify_sink);
|
||||
let session = Session::new(prediction_in, saved_session.messages, saved_session.scenery, saved_session.direction, activity_notify_sink);
|
||||
|
||||
// Send the initial scene to the UI, after we have loaded the session from the first messages.
|
||||
prediction_in.send(session.as_scene()).unwrap();
|
||||
session.refresh();
|
||||
|
||||
let shared_session = Arc::new(RwLock::new(session));
|
||||
|
||||
let log_session = Arc::clone(&shared_session);
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if let Some(msg) = messages.recv().await {
|
||||
log_session.write().await.insert_conversation(ConversationEntry::SystemMessage(msg));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
if let Some(evt) = action_src.recv().await {
|
||||
let mut session = shared_session.write().await;
|
||||
let do_regen = match evt {
|
||||
PredictionAction::ConversationAppend(msg) => {
|
||||
let do_regen = match msg {
|
||||
@@ -348,9 +380,9 @@ pub async fn start_prediction(saved_session: SaveData) -> SessionControl {
|
||||
PredictionAction::SetEpisodeNumber(num) => {
|
||||
session.direction.episode_number = num;
|
||||
if let Err(err) = session.direction.reload_mixxx_playlist() {
|
||||
session.insert_conversation(ConversationEntry::SystemMessage(format!("Failed to load mixxx playlist: {:?}.", err).into()));
|
||||
session.log(format!("Failed to load mixxx playlist: {:?}.", err));
|
||||
} else {
|
||||
session.insert_conversation(ConversationEntry::SystemMessage("Mixxx playlist reloaded.".into()));
|
||||
session.log("Mixxx playlist reloaded.");
|
||||
}
|
||||
false
|
||||
},
|
||||
@@ -359,7 +391,7 @@ pub async fn start_prediction(saved_session: SaveData) -> SessionControl {
|
||||
},
|
||||
PredictionAction::SetNarrative(narrative) => {
|
||||
session.direction.narrative = narrative;
|
||||
session.insert_conversation(ConversationEntry::SystemMessage("Updated stage direction narrative".into()));
|
||||
session.log("Updated stage direction narrative");
|
||||
true
|
||||
},
|
||||
PredictionAction::SetShowEndTime(end_time) => {
|
||||
@@ -377,14 +409,8 @@ pub async fn start_prediction(saved_session: SaveData) -> SessionControl {
|
||||
save_data.save();
|
||||
|
||||
if do_regen {
|
||||
session.reply_options.responses.clear();
|
||||
}
|
||||
|
||||
prediction_in.send(session.as_scene()).unwrap();
|
||||
|
||||
if do_regen {
|
||||
session.regenerate_options().await;
|
||||
prediction_in.send(session.as_scene()).unwrap();
|
||||
drop(session);
|
||||
shared_session.write().await.regenerate_options().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user