use core::cell::RefCell; use core::fmt::Formatter; use alloc::rc::Rc; use embassy_sync::channel::DynamicSender; use embassy_time::{Duration, Timer}; use embedded_storage::{ReadStorage, Storage}; use esp_bootloader_esp_idf::partitions::PartitionTable; use esp_storage::FlashStorage; use nalgebra::{Vector2, Vector3}; use log::*; use rmp::decode::{RmpRead, RmpReadErr, ValueReadError}; use crate::{Breaker, events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}}; #[derive(Debug)] pub struct SharedFlash { storage: Rc> } impl Clone for SharedFlash { fn clone(&self) -> Self { Self { storage: Rc::clone(&self.storage) } } } impl SharedFlash { pub fn new(storage: S) -> Self { Self { storage: Rc::new(RefCell::new(storage)) } } } impl Storage for SharedFlash { fn write(&mut self, offset: u32, bytes: &[u8]) -> Result<(), Self::Error> { self.storage.borrow_mut().write(offset, bytes) } } impl ReadStorage for SharedFlash { type Error = S::Error; fn read(&mut self, offset: u32, bytes: &mut [u8]) -> Result<(), Self::Error> { self.storage.borrow_mut().read(offset, bytes) } fn capacity(&self) -> usize { self.storage.borrow().capacity() } } pub struct SimDataTable { reader: RangeReader, count: usize, index: usize } impl SimDataTable where S::Error: core::fmt::Debug + 'static { pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result> { let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data( esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined, ); info!("Searching for sim data partition"); let data_partition = partitions.iter().find(|partition| { partition.partition_type() == partition_type && partition.label_as_str() == "sim" }).ok_or(SimDataError::PartitionNotFound)?; let start = data_partition.offset() as usize; let end = data_partition.len() as usize + start; info!("Opening simulation data at {start:#02x}:{end:#02x}"); let mut reader = RangeReader::new(storage.clone(), start, end); if let Ok(index) = StreamIndex::from_rmp(&mut reader) { info!("Found stream index: {index:?}"); Ok(Self { reader, count: index.count, index: 0 }) } else { error!("Stream index is missing! Have you flashed the partition yet?"); Err(SimDataError::StreamIndexMissing) } } } impl Iterator for SimDataTable where S::Error: core::fmt::Debug + 'static { type Item = SimDataReader; fn next(&mut self) -> Option { if self.index >= self.count { None } else { loop { match StreamHeader::from_rmp(&mut self.reader) { Ok(header) => { match self.reader.subset(header.size) { Ok(sensor_reader) => { self.reader.seek(header.size).unwrap_or_else(|err| { error!("Simulation data appears bigger than the storage capacity: {err:?}") }); self.index += 1; debug!("Found header={header:?}"); return Some(SimDataReader::open(sensor_reader, header.id)); }, err => { error!("Could not open the next simulation data chunk: {err:?}"); return None; } } }, Err(SimDataError::UnsupportedStreamType(meta)) => { error!("Found unknown simulation data chunk {meta:?}"); self.reader.seek(meta.size as usize).unwrap_or_else(|err| { error!("Simulation data appears bigger than the storage capacity: {err:?}") }); self.index += 1; } Err(err) => { error!("Read error while reading next chunk in simulation stream table {err:?}"); return None; } } } } } } #[derive(Debug)] pub struct RangeReader { storage: S, start: usize, end: usize, offset: usize } impl RangeReader { pub const fn new(storage: S, start: usize, end: usize) -> Self { assert!(start <= end); // TODO: Should add bounds checking since we will know the size of the chunk already Self { storage, start, end, offset: 0 } } pub fn seek(&mut self, offset: usize) -> Result<(), RangeReadError> { self.offset += offset; if self.offset > self.end { Err(RangeReadError::OutOfData) } else { Ok(()) } } pub fn subset(&self, size: usize) -> Result> where S: Clone + core::fmt::Debug { trace!("subset {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, self.start + self.offset, self.start + self.offset + size); if self.start + self.offset + size > self.end { Err(SimDataError::EndOfStream) } else { Ok(Self { storage: self.storage.clone(), start: self.offset + self.start, end: self.start + self.offset + size, offset: 0 }) } } } impl RmpRead for RangeReader where S::Error: core::fmt::Debug + 'static { type Error = RangeReadError; fn read_exact_buf(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> { let pos = self.start + self.offset; if pos > self.end { Err(RangeReadError::OutOfData) } else { assert!(pos + buf.len() <= self.end); match self.storage.read(pos as u32, buf) { Ok(_) => { self.offset += buf.len(); Ok(()) }, Err(err) => Err(RangeReadError::Storage(err)) } } } } pub struct SimDataReader { reader: RangeReader, srcid: SensorSource, runtime: Duration, event_count: usize, index: usize } impl From for Measurement { fn from(value: IMUReading) -> Self { Measurement::IMU { accel: Vector3::new(value.accel_x as f32, value.accel_y as f32, value.accel_z as f32), gyro: Vector3::new(value.gyro_x as f32, value.gyro_y as f32, value.gyro_z as f32) } } } impl From for Measurement { fn from(value: GPSReading) -> Self { Measurement::GPS(Some(Vector2::new(value.lat, value.lon))) } } impl From for Measurement { fn from(value: AnnotationReading) -> Self { warn!("ANNOTATION: {}", core::str::from_utf8(&value.buf).unwrap()); Measurement::Annotation } } impl SimDataReader where S::Error: core::fmt::Debug + 'static { pub fn open(mut reader: RangeReader, stream_type: StreamType) -> Self { debug!("Opening {stream_type:?} sim data chunk"); let event_count = EventStreamHeader::from_rmp(&mut reader).unwrap().count; debug!("Found {event_count} events!"); Self { reader, srcid: stream_type.into(), runtime: Default::default(), event_count, index: 0 } } pub fn srcid(&self) -> SensorSource { self.srcid } async fn read_next_event>(&mut self) -> Result>>> { let event = StreamEvent::::from_rmp(&mut self.reader)?; let delay = embassy_time::Duration::from_millis((event.timecode * 1000.0) as u64); self.runtime += delay; Timer::after(delay).await; Ok(event.data.into()) } pub async fn read_next(&mut self) -> Result, SimDataError>>> { if self.index < self.event_count { self.index += 1; // The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some() Ok(Some(match self.srcid { SensorSource::IMU => self.read_next_event::().await?, SensorSource::GPS => self.read_next_event::().await?, SensorSource::Annotations => self.read_next_event::().await?, srcid => unimplemented!("{srcid:?} is not a simulatable sensor yet!") })) } else { Ok(None) } } } #[derive(Debug)] pub enum RangeReadError { OutOfData, Storage(E) } impl RmpReadErr for RangeReadError {} impl core::fmt::Display for RangeReadError { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { f.write_str("RmpErr") } } #[embassy_executor::task(pool_size = 3)] pub async fn simulation_task(mut reader: SimDataReader>, events: DynamicSender<'static, Measurement>) { warn!("Starting simulation for {:?}", reader.srcid()); events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::AcquiringFix)).await; events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Online)).await; let mut idx = 0; let mut idx_breaker = Breaker::default(); // TODO: SimulationProgress updates loop { match reader.read_next().await { Ok(Some(next_evt)) => { events.send(next_evt).await; let pct = (idx as f32) / (reader.event_count as f32); idx_breaker.set((pct * 255.0) as u8); if let Some(pct) = idx_breaker.read_tripped() { events.send(Measurement::SimulationProgress(reader.srcid(), reader.runtime, pct)).await; } idx += 1; }, Ok(None) => { warn!("End of simulation data stream"); break } Err(err) => { warn!("Error during sensor stream: {err:?}"); break } } } events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Offline)).await; events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::Degraded)).await; warn!("End of simulation for {:?}", reader.srcid()); }