use embassy_sync::channel::DynamicSender; use embassy_time::{Duration, Timer}; use embedded_storage::ReadStorage; use esp_bootloader_esp_idf::partitions::PartitionTable; use esp_storage::FlashStorage; use nalgebra::{Vector2, Vector3}; use log::*; use rmp::decode::ValueReadError; use crate::{Breaker, events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}, storage::{RangeReadError, RangeReader, SharedFlash}}; pub struct SimDataTable { reader: RangeReader, count: usize, index: usize } impl SimDataTable where S::Error: core::fmt::Debug + 'static { pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result> { let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data( esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined, ); info!("Searching for sim data partition"); let data_partition = partitions.iter().find(|partition| { partition.partition_type() == partition_type && partition.label_as_str() == "sim" }).ok_or(SimDataError::PartitionNotFound)?; let start = data_partition.offset() as usize; let end = data_partition.len() as usize + start; info!("Opening simulation data at {start:#02x}:{end:#02x}"); let mut reader = RangeReader::new(storage.clone(), start, end); if let Ok(index) = StreamIndex::from_rmp(&mut reader) { info!("Found stream index: {index:?}"); Ok(Self { reader, count: index.count, index: 0 }) } else { error!("Stream index is missing! Have you flashed the partition yet?"); Err(SimDataError::StreamIndexMissing) } } } impl Iterator for SimDataTable where S::Error: core::fmt::Debug + 'static { type Item = SimDataReader; fn next(&mut self) -> Option { if self.index >= self.count { None } else { loop { match StreamHeader::from_rmp(&mut self.reader) { Ok(header) => { match self.reader.subset(header.size) { Ok(sensor_reader) => { self.reader.seek(header.size).unwrap_or_else(|err| { error!("Simulation data appears bigger than the storage capacity: {err:?}") }); self.index += 1; debug!("Found header={header:?}"); return Some(SimDataReader::open(sensor_reader, header.id)); }, err => { error!("Could not open the next simulation data chunk: {err:?}"); return None; } } }, Err(SimDataError::UnsupportedStreamType(meta)) => { error!("Found unknown simulation data chunk {meta:?}"); self.reader.seek(meta.size as usize).unwrap_or_else(|err| { error!("Simulation data appears bigger than the storage capacity: {err:?}") }); self.index += 1; } Err(err) => { error!("Read error while reading next chunk in simulation stream table {err:?}"); return None; } } } } } } pub struct SimDataReader { reader: RangeReader, srcid: SensorSource, runtime: Duration, event_count: usize, index: usize } impl From for Measurement { fn from(value: IMUReading) -> Self { Measurement::IMU { accel: Vector3::new(value.accel_x as f32, value.accel_y as f32, value.accel_z as f32), gyro: Vector3::new(value.gyro_x as f32, value.gyro_y as f32, value.gyro_z as f32) } } } impl From for Measurement { fn from(value: GPSReading) -> Self { Measurement::GPS(Some(Vector2::new(value.lat, value.lon))) } } impl From for Measurement { fn from(value: AnnotationReading) -> Self { warn!("ANNOTATION: {}", core::str::from_utf8(&value.buf).unwrap()); Measurement::Annotation } } impl SimDataReader where S::Error: core::fmt::Debug + 'static { pub fn open(mut reader: RangeReader, stream_type: StreamType) -> Self { debug!("Opening {stream_type:?} sim data chunk"); let event_count = EventStreamHeader::from_rmp(&mut reader).unwrap().count; debug!("Found {event_count} events!"); Self { reader, srcid: stream_type.into(), runtime: Default::default(), event_count, index: 0 } } pub fn srcid(&self) -> SensorSource { self.srcid } async fn read_next_event>(&mut self) -> Result>>> { let event = StreamEvent::::from_rmp(&mut self.reader)?; let delay = embassy_time::Duration::from_millis((event.timecode * 1000.0) as u64); self.runtime += delay; Timer::after(delay).await; Ok(event.data.into()) } pub async fn read_next(&mut self) -> Result, SimDataError>>> { if self.index < self.event_count { self.index += 1; // The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some() Ok(Some(match self.srcid { SensorSource::IMU => self.read_next_event::().await?, SensorSource::GPS => self.read_next_event::().await?, SensorSource::Annotations => self.read_next_event::().await?, srcid => unimplemented!("{srcid:?} is not a simulatable sensor yet!") })) } else { Ok(None) } } } #[embassy_executor::task(pool_size = 3)] pub async fn simulation_task(mut reader: SimDataReader>, events: DynamicSender<'static, Measurement>) { warn!("Starting simulation for {:?}", reader.srcid()); events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::AcquiringFix)).await; events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Online)).await; let mut idx = 0; let mut idx_breaker = Breaker::default(); // TODO: SimulationProgress updates loop { match reader.read_next().await { Ok(Some(next_evt)) => { events.send(next_evt).await; let pct = (idx as f32) / (reader.event_count as f32); idx_breaker.set((pct * 255.0) as u8); if let Some(pct) = idx_breaker.read_tripped() { events.send(Measurement::SimulationProgress(reader.srcid(), reader.runtime, pct)).await; } idx += 1; }, Ok(None) => { warn!("End of simulation data stream"); break } Err(err) => { warn!("Error during sensor stream: {err:?}"); break } } } events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Offline)).await; events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::Degraded)).await; warn!("End of simulation for {:?}", reader.srcid()); }