ego: engine: move all the async timeouts into the engine's commit method

This commit is contained in:
2026-03-27 22:53:09 +01:00
parent 4977746af1
commit fc9bd0b7a7
2 changed files with 27 additions and 21 deletions

View File

@@ -1,5 +1,5 @@
use embassy_sync::pubsub::DynPublisher;
use embassy_time::{Duration, Instant};
use embassy_time::{Duration, Instant, WithTimeout};
use nalgebra::{Rotation3, Vector2, Vector3, ComplexField, RealField};
use log::*;
@@ -60,6 +60,14 @@ const ACCELERATION_NOISE_GATE: f32 = 1.0;
/// When we get a heading via GPS, this determines how much weight that value has when blended into the system.
const GPS_HEADING_ALPHA_CORRECTION: f32 = 0.9;
const TIMEOUT: Duration = Duration::from_millis(3);
macro_rules! publish {
($predictions:expr, $prediction:expr) => {
$predictions.publish($prediction).with_timeout(TIMEOUT).await.expect("Could not commit IMU data in time. Is the prediction bus stalled?")
}
}
impl Debug for BikeStates {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("BikeStates")
@@ -132,18 +140,18 @@ impl BikeStates {
let last_motion = self.motion_state.value;
if let Some(true) = self.acquiring_data.read_tripped() {
predictions.publish(Prediction::SensorStatus(SensorSource::MotionFrame, SensorState::AcquiringFix)).await;
predictions.publish(Prediction::SensorStatus(SensorSource::Location, SensorState::AcquiringFix)).await;
publish!(predictions, Prediction::SensorStatus(SensorSource::MotionFrame, SensorState::AcquiringFix));
publish!(predictions, Prediction::SensorStatus(SensorSource::Location, SensorState::AcquiringFix));
}
if let Some(true) = self.has_motion_frame.read_tripped() {
predictions.publish(Prediction::SensorStatus(SensorSource::MotionFrame, SensorState::Online)).await
publish!(predictions, Prediction::SensorStatus(SensorSource::MotionFrame, SensorState::Online));
}
match self.has_gps_fix.read_tripped() {
None => (),
Some(true) => predictions.publish(Prediction::SensorStatus(SensorSource::Location, SensorState::Online)).await,
Some(false) => predictions.publish(Prediction::SensorStatus(SensorSource::Location, SensorState::Degraded)).await,
Some(true) => publish!(predictions, Prediction::SensorStatus(SensorSource::Location, SensorState::Online)),
Some(false) => publish!(predictions, Prediction::SensorStatus(SensorSource::Location, SensorState::Degraded)),
}
let est = self.kf.x;
@@ -155,7 +163,7 @@ impl BikeStates {
if let Some(pos) = position {
self.predicted_location.set(pos);
if let Some(pos) = self.predicted_location.read_tripped() {
predictions.publish(Prediction::Location(pos)).await;
publish!(predictions, Prediction::Location(pos));
}
}
@@ -172,24 +180,24 @@ impl BikeStates {
}).sum::<f32>();
// Also grab the average velocity of the last few sample periods
let average_speed = self.speedo.mean();
info!("prediction={current_prediction:?} mean={average_speed}");
trace!("prediction={current_prediction:?} mean={average_speed}");
// Reported velocity is kept only to the first decimal, so we aren't spamming the system with floating point noise
self.reported_velocity.set((average_speed * 10.0).trunc() / 10.0);
if let Some(reported) = self.reported_velocity.read_tripped() {
predictions.publish(Prediction::Velocity(reported)).await;
publish!(predictions, Prediction::Velocity(reported));
}
// We only want to wake up from sleep if our current velocity is something like a bump
if average_speed > BUMP_SPEED_NOISE_GATE && self.sleep_timer.wake() {
warn!("Waking from sleep into idle mode");
predictions.publish(Prediction::SetPersonality(Personality::Waking)).await;
predictions.publish(Prediction::SetPersonality(Personality::Parked)).await;
publish!(predictions, Prediction::SetPersonality(Personality::Waking));
publish!(predictions, Prediction::SetPersonality(Personality::Parked));
}
// Here, we additionally release the parking brake if we are currently parked and reading substantial movement
if average_speed > WAKEUP_SPEED_NOISE_GATE && self.parking_timer.wake() {
warn!("Disengaging parking brake");
predictions.publish(Prediction::SetPersonality(Personality::Active)).await;
publish!(predictions, Prediction::SetPersonality(Personality::Active));
}
// If the total slope is more upwards than not, we are accelerating.
@@ -213,19 +221,19 @@ impl BikeStates {
//debug!("state={state:?} trend={trend} mean={mean} v={v}");
//if state != MotionState::Stationary {
// warn!("Active due to motion");
// predictions.publish(Prediction::SetPersonality(Personality::Active)).await;
// publish!(predictions, Prediction::SetPersonality(Personality::Active));
//}
predictions.publish(Prediction::Motion { prev: last_motion, next: state }).await;
publish!(predictions, Prediction::Motion { prev: last_motion, next: state });
} else if self.motion_state.value == MotionState::Stationary {
// Finally, if we are stationary, check our parking and sleep timers
if self.parking_timer.check() {
warn!("Engaging parking brake");
predictions.publish(Prediction::SetPersonality(Personality::Parked)).await;
publish!(predictions, Prediction::SetPersonality(Personality::Parked));
}
if self.sleep_timer.check() {
warn!("Sleeping!");
predictions.publish(Prediction::SetPersonality(Personality::Sleeping)).await;
publish!(predictions, Prediction::SetPersonality(Personality::Sleeping));
}
}
}

View File

@@ -4,8 +4,6 @@ use embassy_time::{Duration, WithTimeout};
use crate::{ego::engine::BikeStates, events::{Measurement, Prediction}};
const TIMEOUT: Duration = Duration::from_millis(3);
#[embassy_executor::task]
pub async fn motion_task(src: DynamicReceiver<'static, Measurement>, prediction_sink: DynPublisher<'static, Prediction>, recording_sink: DynPublisher<'static, Measurement>) {
let mut states = BikeStates::default();
@@ -17,11 +15,11 @@ pub async fn motion_task(src: DynamicReceiver<'static, Measurement>, prediction_
match next_measurement {
Measurement::IMU { accel, gyro } => {
states.insert_imu(accel, gyro);
states.commit(&prediction_sink).with_timeout(TIMEOUT).await.expect("Could not commit IMU data in time. Is the prediction bus stalled?");
states.commit(&prediction_sink).await;
},
Measurement::GPS(Some(gps_pos)) => {
states.insert_gps(gps_pos);
states.commit(&prediction_sink).with_timeout(TIMEOUT).await.expect("Could not commit GPS data in time. Is the prediction bus stalled?");
states.commit(&prediction_sink).await;
},
Measurement::GPS(None) => {
states.has_gps_fix.set(false);
@@ -29,7 +27,7 @@ pub async fn motion_task(src: DynamicReceiver<'static, Measurement>, prediction_
// FIXME: This needs harmonized with the automatic data timeout from above, somehow?
Measurement::SensorHardwareStatus(source, state) => {
debug!("Sensor {source:?} reports {state:?}!");
prediction_sink.publish(Prediction::SensorStatus(source, state)).with_timeout(TIMEOUT).await.expect("Could not update sensor status in time. Is the prediction bus stalled?");
prediction_sink.publish(Prediction::SensorStatus(source, state)).await;
},
Measurement::ExternalPower(mw) => {
info!("Got external power change to {mw:?}");