events: rewrite some event buses to use multi-consumer pubsub instead of single-consumer channels
This commit is contained in:
@@ -1,4 +1,4 @@
|
|||||||
use embassy_sync::channel::DynamicSender;
|
use embassy_sync::{channel::DynamicSender, pubsub::DynPublisher};
|
||||||
use embassy_time::{Duration, Instant};
|
use embassy_time::{Duration, Instant};
|
||||||
use nalgebra::{Rotation3, Vector2, Vector3};
|
use nalgebra::{Rotation3, Vector2, Vector3};
|
||||||
use log::*;
|
use log::*;
|
||||||
@@ -7,7 +7,7 @@ use nalgebra::{ComplexField, RealField};
|
|||||||
|
|
||||||
use core::fmt::Debug;
|
use core::fmt::Debug;
|
||||||
|
|
||||||
use crate::{ego::{heading::HeadingEstimator, kalman::Ekf2D, orientation::OrientationEstimator}, events::{Notification, Prediction}, Breaker, CircularBuffer, idle::IdleClock};
|
use crate::{ego::{heading::HeadingEstimator, kalman::Ekf2D, orientation::OrientationEstimator}, events::{Notification, Prediction, Telemetry}, idle::IdleClock, Breaker, CircularBuffer};
|
||||||
|
|
||||||
#[derive(PartialEq, Debug, Default, Clone, Copy)]
|
#[derive(PartialEq, Debug, Default, Clone, Copy)]
|
||||||
pub enum MotionState {
|
pub enum MotionState {
|
||||||
@@ -104,15 +104,15 @@ impl BikeStates {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn commit(&mut self, predictions: &DynamicSender<'static, Prediction>, notifications: &DynamicSender<'static, Notification>) {
|
pub async fn commit(&mut self, predictions: &DynamicSender<'static, Prediction>, notifications: &DynPublisher<'static, Notification>) {
|
||||||
if let Some(true) = self.is_calibrated.read_tripped() {
|
if let Some(true) = self.is_calibrated.read_tripped() {
|
||||||
notifications.send(Notification::SensorOnline(crate::events::SensorSource::IMU)).await
|
notifications.publish(Notification::SensorOnline(crate::events::SensorSource::IMU)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.has_gps_fix.read_tripped() {
|
match self.has_gps_fix.read_tripped() {
|
||||||
None => (),
|
None => (),
|
||||||
Some(true) => notifications.send(Notification::SensorOnline(crate::events::SensorSource::GPS)).await,
|
Some(true) => notifications.publish(Notification::SensorOnline(crate::events::SensorSource::GPS)).await,
|
||||||
Some(false) => notifications.send(Notification::SensorOffline(crate::events::SensorSource::GPS)).await,
|
Some(false) => notifications.publish(Notification::SensorOffline(crate::events::SensorSource::GPS)).await,
|
||||||
}
|
}
|
||||||
|
|
||||||
let est = self.kf.x;
|
let est = self.kf.x;
|
||||||
@@ -167,7 +167,7 @@ impl BikeStates {
|
|||||||
// And if the motion status has changed, send it out
|
// And if the motion status has changed, send it out
|
||||||
if let Some(state) = self.motion_state.read_tripped() {
|
if let Some(state) = self.motion_state.read_tripped() {
|
||||||
debug!("state={state:?} trend={trend} mean={mean} v={v}");
|
debug!("state={state:?} trend={trend} mean={mean} v={v}");
|
||||||
predictions.send(Prediction::Motion(state)).await
|
predictions.send(Prediction::Motion(state)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -1,7 +1,5 @@
|
|||||||
|
|
||||||
use core::sync::atomic::{AtomicBool, AtomicU8};
|
use embassy_sync::{blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}, channel::Channel, pubsub::PubSubChannel};
|
||||||
|
|
||||||
use embassy_sync::{blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}, channel::Channel, signal::Signal};
|
|
||||||
use embassy_time::Duration;
|
use embassy_time::Duration;
|
||||||
use nalgebra::{Vector2, Vector3};
|
use nalgebra::{Vector2, Vector3};
|
||||||
use alloc::sync::Arc;
|
use alloc::sync::Arc;
|
||||||
@@ -133,7 +131,7 @@ impl Default for DisplayControls {
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct BusGarage {
|
pub struct BusGarage {
|
||||||
pub motion: Channel<NoopRawMutex, Measurement, 5>,
|
pub motion: Channel<NoopRawMutex, Measurement, 5>,
|
||||||
pub notify: Channel<CriticalSectionRawMutex, Notification, 5>,
|
pub notify: PubSubChannel<CriticalSectionRawMutex, Notification, 5, 2, 4>,
|
||||||
pub predict: Channel<CriticalSectionRawMutex, Prediction, 15>,
|
pub predict: Channel<CriticalSectionRawMutex, Prediction, 15>,
|
||||||
pub display: Arc<DisplayControls>
|
pub display: Arc<DisplayControls>
|
||||||
}
|
}
|
||||||
@@ -142,7 +140,7 @@ impl Default for BusGarage {
|
|||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
motion: Channel::new(),
|
motion: Channel::new(),
|
||||||
notify: Channel::new(),
|
notify: PubSubChannel::new(),
|
||||||
predict: Channel::new(),
|
predict: Channel::new(),
|
||||||
display: Default::default()
|
display: Default::default()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,18 +1,20 @@
|
|||||||
use embassy_sync::channel::DynamicSender;
|
use embassy_sync::{channel::DynamicSender, pubsub::DynPublisher};
|
||||||
use embassy_time::Timer;
|
use embassy_time::Timer;
|
||||||
|
|
||||||
use crate::events::{Notification, Scene};
|
use crate::events::{Notification, Scene};
|
||||||
|
|
||||||
|
|
||||||
#[embassy_executor::task]
|
#[embassy_executor::task]
|
||||||
pub async fn demo_task(ui: DynamicSender<'static, Notification>) {
|
pub async fn demo_task(ui: DynPublisher<'static, Notification>) {
|
||||||
Timer::after_secs(10).await;
|
Timer::after_secs(10).await;
|
||||||
ui.send(Notification::SceneChange(Scene::Idle)).await;
|
ui.publish(Notification::SceneChange(Scene::Idle)).await;
|
||||||
|
ui.publish(Notification::SetBrakelight(true)).await;
|
||||||
|
ui.publish(Notification::SetHeadlight(true)).await;
|
||||||
Timer::after_secs(10).await;
|
Timer::after_secs(10).await;
|
||||||
loop {
|
loop {
|
||||||
for scene in [Scene::Accelerating, Scene::Ready, Scene::Decelerating, Scene::Ready] {
|
for scene in [Scene::Accelerating, Scene::Ready, Scene::Decelerating, Scene::Ready] {
|
||||||
Timer::after_secs(8).await;
|
Timer::after_secs(8).await;
|
||||||
ui.send(Notification::SceneChange(scene)).await
|
ui.publish(Notification::SceneChange(scene)).await
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,10 +1,10 @@
|
|||||||
use embassy_sync::channel::{DynamicReceiver, DynamicSender};
|
use embassy_sync::{channel::{DynamicReceiver, DynamicSender}, pubsub::DynPublisher};
|
||||||
use log::*;
|
use log::*;
|
||||||
|
|
||||||
use crate::{ego::engine::BikeStates, events::{Measurement, Notification, Prediction}};
|
use crate::{ego::engine::BikeStates, events::{Measurement, Notification, Prediction}};
|
||||||
|
|
||||||
#[embassy_executor::task]
|
#[embassy_executor::task]
|
||||||
pub async fn motion_task(src: DynamicReceiver<'static, Measurement>, ui_sink: DynamicSender<'static, Notification>, prediction_sink: DynamicSender<'static, Prediction>) {
|
pub async fn motion_task(src: DynamicReceiver<'static, Measurement>, ui_sink: DynPublisher<'static, Notification>, prediction_sink: DynamicSender<'static, Prediction>) {
|
||||||
let mut states = BikeStates::default();
|
let mut states = BikeStates::default();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
|
|
||||||
use embassy_sync::channel::{DynamicReceiver, DynamicSender};
|
use embassy_sync::{channel::DynamicReceiver, pubsub::DynPublisher};
|
||||||
use embassy_time::Duration;
|
use embassy_time::Duration;
|
||||||
use log::*;
|
use log::*;
|
||||||
|
|
||||||
use crate::{ego::engine::{gps_to_local_meters_haversine, MotionState}, events::{Notification, Prediction, Scene}, idle::IdleClock};
|
use crate::{ego::engine::{gps_to_local_meters_haversine, MotionState}, events::{Notification, Prediction, Scene, Telemetry}, idle::IdleClock};
|
||||||
|
|
||||||
#[embassy_executor::task]
|
#[embassy_executor::task]
|
||||||
pub async fn prediction_task(prediction_src: DynamicReceiver<'static, Prediction>, notify: DynamicSender<'static, Notification>) {
|
pub async fn prediction_task(prediction_src: DynamicReceiver<'static, Prediction>, notify: DynPublisher<'static, Notification>, telemetery: DynPublisher<'static, Telemetry>) {
|
||||||
let mut last_velocity = 0.0;
|
let mut last_velocity = 0.0;
|
||||||
let mut first_position = None;
|
let mut first_position = None;
|
||||||
let mut last_position = Default::default();
|
let mut last_position = Default::default();
|
||||||
@@ -18,28 +18,27 @@ pub async fn prediction_task(prediction_src: DynamicReceiver<'static, Prediction
|
|||||||
gps_to_local_meters_haversine(&x, &last_position).norm()
|
gps_to_local_meters_haversine(&x, &last_position).norm()
|
||||||
});
|
});
|
||||||
if let Ok(next_evt) = embassy_time::with_timeout(Duration::from_secs(1), prediction_src.receive()).await {
|
if let Ok(next_evt) = embassy_time::with_timeout(Duration::from_secs(1), prediction_src.receive()).await {
|
||||||
|
telemetery.publish(Telemetry::Prediction(next_evt)).await;
|
||||||
match next_evt {
|
match next_evt {
|
||||||
Prediction::WakeRequested => {
|
Prediction::WakeRequested => {
|
||||||
if sleep_timer.wake() {
|
if sleep_timer.wake() {
|
||||||
warn!("Wake requested during sleep");
|
warn!("Wake requested during sleep");
|
||||||
notify.send(Notification::WakeUp).await;
|
notify.publish(Notification::WakeUp).await;
|
||||||
|
notify.publish(Notification::SetHeadlight(true)).await;
|
||||||
|
notify.publish(Notification::SetBrakelight(true)).await;
|
||||||
|
|
||||||
// Also reset the parking timer
|
// Also reset the parking timer
|
||||||
parking_timer.wake();
|
parking_timer.wake();
|
||||||
} else if parking_timer.wake() {
|
} else if parking_timer.wake() {
|
||||||
info!("Wake requested while parked");
|
info!("Wake requested while parked");
|
||||||
// If we weren't asleep but we were parked, then switch back to the Ready state and turn on the lights
|
// If we weren't asleep but we were parked, then switch back to the Ready state and turn on the lights
|
||||||
notify.send(Notification::SetHeadlight(true)).await;
|
notify.publish(Notification::SetHeadlight(true)).await;
|
||||||
notify.send(Notification::SetBrakelight(true)).await;
|
notify.publish(Notification::SetBrakelight(true)).await;
|
||||||
notify.send(Notification::SceneChange(Scene::Ready)).await;
|
notify.publish(Notification::SceneChange(Scene::Ready)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Prediction::Velocity(v) => {
|
Prediction::Velocity(v) => {
|
||||||
last_velocity = v;
|
last_velocity = v;
|
||||||
|
|
||||||
if v > 5.0 && stationary {
|
|
||||||
notify.send(Notification::Beat).await;
|
|
||||||
}
|
|
||||||
// TODO: Probably makes sense to only print this based on an IdleTimer, so that a long period of slightly variable movement doesn't get lost, but we can still report values to the UI / telemetry outputs
|
// TODO: Probably makes sense to only print this based on an IdleTimer, so that a long period of slightly variable movement doesn't get lost, but we can still report values to the UI / telemetry outputs
|
||||||
//info!("Velocity predict: velocity={v}\tpos={last_position:?}\tdistance={d:?}");
|
//info!("Velocity predict: velocity={v}\tpos={last_position:?}\tdistance={d:?}");
|
||||||
},
|
},
|
||||||
@@ -53,38 +52,38 @@ pub async fn prediction_task(prediction_src: DynamicReceiver<'static, Prediction
|
|||||||
Prediction::Motion(motion) => {
|
Prediction::Motion(motion) => {
|
||||||
info!("Motion predict:\t{motion:?}\tvelocity={last_velocity}\tpos={last_position:?}\tdistance={d:?}");
|
info!("Motion predict:\t{motion:?}\tvelocity={last_velocity}\tpos={last_position:?}\tdistance={d:?}");
|
||||||
if sleep_timer.wake() {
|
if sleep_timer.wake() {
|
||||||
notify.send(Notification::WakeUp).await;
|
notify.publish(Notification::WakeUp).await;
|
||||||
notify.send(Notification::SetHeadlight(true)).await;
|
notify.publish(Notification::SetHeadlight(true)).await;
|
||||||
notify.send(Notification::SetBrakelight(true)).await
|
notify.publish(Notification::SetBrakelight(true)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
if parking_timer.wake() {
|
if parking_timer.wake() {
|
||||||
notify.send(Notification::SetHeadlight(true)).await;
|
notify.publish(Notification::SetHeadlight(true)).await;
|
||||||
notify.send(Notification::SetBrakelight(true)).await
|
notify.publish(Notification::SetBrakelight(true)).await
|
||||||
}
|
}
|
||||||
match motion {
|
match motion {
|
||||||
MotionState::Accelerating => {
|
MotionState::Accelerating => {
|
||||||
if stationary {
|
if stationary {
|
||||||
// If we are going from standing still to immediately accelerating, first transition to the 'ready' scene
|
// If we are going from standing still to immediately accelerating, first transition to the 'ready' scene
|
||||||
notify.send(Notification::SceneChange(Scene::Ready)).await;
|
notify.publish(Notification::SceneChange(Scene::Ready)).await;
|
||||||
}
|
}
|
||||||
notify.send(Notification::SceneChange(Scene::Accelerating)).await;
|
notify.publish(Notification::SceneChange(Scene::Accelerating)).await;
|
||||||
stationary = false;
|
stationary = false;
|
||||||
},
|
},
|
||||||
MotionState::Decelerating => {
|
MotionState::Decelerating => {
|
||||||
if stationary {
|
if stationary {
|
||||||
// If we are going from standing still to immediately accelerating, first transition to the 'ready' scene
|
// If we are going from standing still to immediately accelerating, first transition to the 'ready' scene
|
||||||
notify.send(Notification::SceneChange(Scene::Ready)).await;
|
notify.publish(Notification::SceneChange(Scene::Ready)).await;
|
||||||
}
|
}
|
||||||
notify.send(Notification::SceneChange(Scene::Decelerating)).await;
|
notify.publish(Notification::SceneChange(Scene::Decelerating)).await;
|
||||||
stationary = false;
|
stationary = false;
|
||||||
},
|
},
|
||||||
MotionState::Steady => {
|
MotionState::Steady => {
|
||||||
notify.send(Notification::SceneChange(Scene::Ready)).await;
|
notify.publish(Notification::SceneChange(Scene::Ready)).await;
|
||||||
stationary = false;
|
stationary = false;
|
||||||
},
|
},
|
||||||
MotionState::Stationary => {
|
MotionState::Stationary => {
|
||||||
notify.send(Notification::SceneChange(Scene::Ready)).await;
|
notify.publish(Notification::SceneChange(Scene::Ready)).await;
|
||||||
stationary = true
|
stationary = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -95,13 +94,15 @@ pub async fn prediction_task(prediction_src: DynamicReceiver<'static, Prediction
|
|||||||
|
|
||||||
if stationary {
|
if stationary {
|
||||||
if parking_timer.check() {
|
if parking_timer.check() {
|
||||||
notify.send(Notification::SceneChange(Scene::Idle)).await;
|
warn!("Engaging parking brake");
|
||||||
notify.send(Notification::SetHeadlight(false)).await;
|
notify.publish(Notification::SceneChange(Scene::Idle)).await;
|
||||||
notify.send(Notification::SetBrakelight(false)).await
|
notify.publish(Notification::SetHeadlight(false)).await;
|
||||||
|
notify.publish(Notification::SetBrakelight(false)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
if sleep_timer.check() {
|
if sleep_timer.check() {
|
||||||
notify.send(Notification::Sleep).await;
|
warn!("Sleeping!");
|
||||||
|
notify.publish(Notification::Sleep).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user