Files
renderbug-bike/src/tasks/simulation.rs

334 lines
11 KiB
Rust

use core::cell::RefCell;
use core::fmt::Formatter;
use alloc::rc::Rc;
use embassy_sync::channel::DynamicSender;
use embassy_time::{Duration, Timer};
use embedded_storage::{ReadStorage, Storage};
use esp_bootloader_esp_idf::partitions::PartitionTable;
use esp_storage::FlashStorage;
use nalgebra::{Vector2, Vector3};
use log::*;
use rmp::decode::{DecodeStringError, RmpRead, RmpReadErr, ValueReadError};
use crate::events::{Measurement, SensorSource, SensorState};
#[derive(Debug)]
pub struct SharedFlash<S> {
storage: Rc<RefCell<S>>
}
impl<S> Clone for SharedFlash<S> {
fn clone(&self) -> Self {
Self {
storage: Rc::clone(&self.storage)
}
}
}
impl<S> SharedFlash<S> {
pub fn new(storage: S) -> Self {
Self {
storage: Rc::new(RefCell::new(storage))
}
}
}
impl<S: Storage> Storage for SharedFlash<S> {
fn write(&mut self, offset: u32, bytes: &[u8]) -> Result<(), Self::Error> {
self.storage.borrow_mut().write(offset, bytes)
}
}
impl<S: ReadStorage> ReadStorage for SharedFlash<S> {
type Error = S::Error;
fn read(&mut self, offset: u32, bytes: &mut [u8]) -> Result<(), Self::Error> {
self.storage.borrow_mut().read(offset, bytes)
}
fn capacity(&self) -> usize {
self.storage.borrow().capacity()
}
}
pub struct SimDataTable<S> {
reader: RangeReader<S>,
count: usize,
index: usize
}
impl<S: ReadStorage + Clone> SimDataTable<S> where S::Error: core::fmt::Debug {
pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result<Self, SimDataError<S>> {
let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data(
esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined,
);
info!("Searching for sim data partition");
let data_partition = partitions.iter().find(|partition| {
partition.partition_type() == partition_type && partition.label_as_str() == "sim"
}).expect("Unable to locate 'sim' data partition!");
let start = data_partition.offset() as usize;
let end = data_partition.len() as usize + start;
info!("Opening simulation data at {start:#02x}:{end:#02x}");
let mut reader = RangeReader::new(storage.clone(), start, end);
if let Ok(count) = rmp::decode::read_array_len(&mut reader) {
info!("Found {count} streams of simulation data");
Ok(Self {
reader,
count: count as usize,
index: 0
})
} else {
error!("Stream index is missing! Have you flashed the partition yet?");
Err(SimDataError::StreamIndexMissing)
}
}
}
impl<S: ReadStorage + Clone + core::fmt::Debug> Iterator for SimDataTable<S> where S::Error: core::fmt::Debug + 'static {
type Item = SimDataReader<S>;
fn next(&mut self) -> Option<Self::Item> {
if self.index >= self.count {
None
} else {
loop {
match rmp::decode::read_ext_meta(&mut self.reader) {
Ok(this_type) => {
let sensor_reader = self.reader.subset(this_type.size as usize);
self.reader.seek(this_type.size as usize);
self.index += 1;
debug!("Found type={this_type:?}");
match this_type.typeid.try_into() {
Err(()) => error!("Found unknown simulation data chunk {this_type:?}"),
Ok(srcid) => return Some(SimDataReader::open(sensor_reader, srcid))
}
},
Err(err) => {
error!("Read error while decoding simulation data {err:?}");
return None;
}
}
}
}
}
}
#[derive(Debug)]
pub struct RangeReader<S> {
storage: S,
start: usize,
end: usize,
offset: usize
}
impl<S: ReadStorage> RangeReader<S> {
pub const fn new(storage: S, start: usize, end: usize) -> Self {
assert!(start <= end);
// TODO: Should add bounds checking since we will know the size of the chunk already
Self {
storage,
start,
end,
offset: 0
}
}
pub fn seek(&mut self, offset: usize) {
self.offset += offset;
}
pub fn subset(&self, size: usize) -> Self where S: Clone + core::fmt::Debug {
trace!("subset {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, self.start + self.offset, self.start + self.offset + size);
assert!(self.start + self.offset + size < self.end);
Self {
storage: self.storage.clone(),
start: self.offset + self.start,
end: self.start + self.offset + size,
offset: 0
}
}
}
impl<S: ReadStorage> RmpRead for RangeReader<S> where S::Error: core::fmt::Debug + 'static {
type Error = RangeReadError<S::Error>;
fn read_exact_buf(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> {
let pos = self.start + self.offset;
if pos > self.end {
Err(RangeReadError::OutOfData)
} else {
assert!(pos + buf.len() <= self.end);
match self.storage.read(pos as u32, buf) {
Ok(_) => {
self.offset += buf.len();
Ok(())
},
Err(err) => Err(RangeReadError::Storage(err))
}
}
}
}
pub struct SimDataReader<S> {
reader: RangeReader<S>,
srcid: SensorSource,
runtime: Duration,
event_count: usize,
index: usize
}
#[derive(Debug)]
pub enum SimDataError<S: ReadStorage> where S::Error: core::fmt::Debug + 'static {
StreamIndexMissing,
InvalidChunkSize { expected: usize, found: usize },
MissingTimecode,
BadString,
DecodeError(ValueReadError<RangeReadError<S::Error>>),
EndOfStream
}
impl<S: ReadStorage> From<ValueReadError<RangeReadError<S::Error>>> for SimDataError<S> where S::Error: core::fmt::Debug {
fn from(value: ValueReadError<RangeReadError<S::Error>>) -> Self {
SimDataError::DecodeError(value)
}
}
impl<S: ReadStorage> From<DecodeStringError<'_, RangeReadError<S::Error>>> for SimDataError<S> where S::Error: core::fmt::Debug {
fn from(value: DecodeStringError<'_, RangeReadError<S::Error>>) -> Self {
SimDataError::BadString
}
}
impl<S: ReadStorage> SimDataReader<S> where S::Error: core::fmt::Debug + 'static {
pub fn open(mut reader: RangeReader<S>, srcid: SensorSource) -> Self {
debug!("Opening {srcid:?} sim data chunk");
let event_count = rmp::decode::read_array_len(&mut reader).unwrap() as usize;
debug!("Found {event_count} events!");
Self {
reader,
srcid,
runtime: Default::default(),
event_count,
index: 0
}
}
pub fn srcid(&self) -> SensorSource {
self.srcid
}
pub async fn read_next(&mut self) -> Result<Option<Measurement>, SimDataError<S>> {
if self.index < self.event_count {
self.index += 1;
// The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some()
match self.srcid {
SensorSource::IMU => self.read_motion().await,
SensorSource::GPS => self.read_gps().await,
SensorSource::Annotations => self.read_annotation().await,
srcid => unimplemented!("{srcid:?} is not a simulatable sensor!")
}.map(|x| { Some(x) })
} else {
Ok(None)
}
}
fn verify_chunk_len(&mut self, length: u32) -> Result<(), SimDataError<S>> {
let chunk_len = rmp::decode::read_array_len(&mut self.reader)?;
if chunk_len != length {
Err(SimDataError::InvalidChunkSize { expected: length as usize, found: chunk_len as usize })
} else {
Ok(())
}
}
async fn read_delay_field(&mut self) -> Result<(), SimDataError<S>> {
let timecode = rmp::decode::read_f64(&mut self.reader)?;
let delay = embassy_time::Duration::from_millis((timecode * 1000.0) as u64);
self.runtime += delay;
Timer::after(delay).await;
Ok(())
}
async fn read_annotation(&mut self) -> Result<Measurement, SimDataError<S>> {
self.verify_chunk_len(3)?;
self.read_delay_field().await?;
let mut buf = [0; 256];
let msg = rmp::decode::read_str(&mut self.reader, &mut buf)?;
warn!("ANNOATION: {msg}");
Ok(Measurement::Annotation)
}
async fn read_motion(&mut self) -> Result<Measurement, SimDataError<S>> {
self.verify_chunk_len(7)?;
self.read_delay_field().await?;
let accel = Vector3::new(
rmp::decode::read_f64(&mut self.reader)? as f32,
rmp::decode::read_f64(&mut self.reader)? as f32,
rmp::decode::read_f64(&mut self.reader)? as f32,
);
let gyro = Vector3::new(
rmp::decode::read_f64(&mut self.reader)? as f32,
rmp::decode::read_f64(&mut self.reader)? as f32,
rmp::decode::read_f64(&mut self.reader)? as f32,
);
Ok(Measurement::IMU { accel, gyro })
}
async fn read_gps(&mut self) -> Result<Measurement, SimDataError<S>> {
self.verify_chunk_len(3)?;
self.read_delay_field().await?;
let coords = Vector2::new(
rmp::decode::read_f64(&mut self.reader)?,
rmp::decode::read_f64(&mut self.reader)?
);
Ok(Measurement::GPS(Some(coords)))
}
}
#[derive(Debug)]
pub enum RangeReadError<E> {
OutOfData,
Storage(E)
}
impl<E: core::fmt::Debug + 'static> RmpReadErr for RangeReadError<E> {}
impl<E> core::fmt::Display for RangeReadError<E> {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.write_str("RmpErr")
}
}
#[embassy_executor::task(pool_size = 3)]
pub async fn simulation_task(mut reader: SimDataReader<SharedFlash<FlashStorage>>, events: DynamicSender<'static, Measurement>) {
warn!("Starting simulation for {:?}", reader.srcid());
events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::AcquiringFix)).await;
events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Online)).await;
// TODO: SimulationProgress updates
loop {
match reader.read_next().await {
Ok(Some(next_evt)) => events.send(next_evt).await,
Ok(None) => {
warn!("End of simulation data stream");
break
}
Err(err) => {
warn!("Error during sensor stream: {err:?}");
break
}
}
}
events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Offline)).await;
events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::Degraded)).await;
warn!("End of simulation for {:?}", reader.srcid());
}