diff --git a/src/bin/main.rs b/src/bin/main.rs index 20c4123..c0a60b3 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -118,22 +118,32 @@ async fn main(spawner: Spawner) { spawner.must_spawn(renderbug_embassy::tasks::oled_render::oled_render(output, oled_surfaces, oled_uniforms)); } + let mut storage = renderbug_bike::storage::SharedFlash::new(esp_storage::FlashStorage::new()); + let mut partition_buf = [8; 1024]; + let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut partition_buf).unwrap(); + #[cfg(feature="simulation")] { - use esp_storage::FlashStorage; - use renderbug_embassy::tasks::simulation::{SharedFlash, SimDataTable}; - let mut storage = SharedFlash::new(FlashStorage::new()); - let mut buf = [8; 1024]; - let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut buf).unwrap(); + use renderbug_bike::tasks::simulation::SimDataTable; for sim_data in SimDataTable::open(storage, partitions).expect("Could not find sim data!") { let srcid = sim_data.srcid(); info!("Found simulation data for {srcid:?}"); - if spawner.spawn(renderbug_embassy::tasks::simulation::simulation_task(sim_data, motion_bus.dyn_sender())).is_err() { + if spawner.spawn(renderbug_bike::tasks::simulation::simulation_task(sim_data, motion_bus.dyn_sender())).is_err() { error!("Unable to spawn simulation task for {srcid:?}! Increase the task pool size."); } } } + #[cfg(not(feature="simulation"))] + { + use renderbug_bike::storage::SimDataRecorder; + + let recorder = SimDataRecorder::open(storage, partitions).expect("Unable to open sim data partition for writing"); + //spawner.spawn(record_telemetry(recording_bus.dyn_receiver(), recorder)).unwrap(); + } + + spawner.spawn(print_sensor_readings(recording_bus.dyn_subscriber().unwrap())).unwrap(); + #[cfg(feature="radio")] let (wifi, network_device, ble) = { info!("Configuring wifi"); diff --git a/src/storage.rs b/src/storage.rs index 50f4ba1..ce9d07f 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -2,8 +2,12 @@ use core::{cell::RefCell, fmt::Formatter}; use alloc::rc::Rc; use embedded_storage::{ReadStorage, Storage}; +use esp_bootloader_esp_idf::partitions::PartitionTable; +use esp_hal::time::Instant; use log::*; -use rmp::decode::{RmpRead, RmpReadErr}; +use rmp::{decode::{RmpRead, RmpReadErr}, encode::{RmpWrite, RmpWriteErr, ValueWriteError}}; + +use crate::simdata::{BundleEventHeader, EventRecord, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}; #[derive(Debug)] pub struct SharedFlash { @@ -45,30 +49,30 @@ impl ReadStorage for SharedFlash { } #[derive(Debug)] -pub enum RangeReadError { +pub enum StorageRangeError { OutOfData, Storage(E) } -impl RmpReadErr for RangeReadError {} +impl RmpReadErr for StorageRangeError {} +impl RmpWriteErr for StorageRangeError {} -impl core::fmt::Display for RangeReadError { +impl core::fmt::Display for StorageRangeError { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { f.write_str("RmpErr") } } #[derive(Debug)] -pub struct RangeReader { +pub struct StorageRange { storage: S, start: usize, end: usize, offset: usize } -impl RangeReader { +impl StorageRange { pub const fn new(storage: S, start: usize, end: usize) -> Self { assert!(start <= end); - // TODO: Should add bounds checking since we will know the size of the chunk already Self { storage, start, @@ -77,19 +81,27 @@ impl RangeReader { } } - pub fn seek(&mut self, offset: usize) -> Result<(), RangeReadError> { + pub const fn capacity(&self) -> usize { + self.end - self.start + } + + pub const fn pos(&self) -> usize { + self.offset + } + + pub fn seek(&mut self, offset: usize) -> Result<(), StorageRangeError> { self.offset += offset; if self.offset > self.end { - Err(RangeReadError::OutOfData) + Err(StorageRangeError::OutOfData) } else { Ok(()) } } - pub fn subset(&self, size: usize) -> Result> where S: Clone + core::fmt::Debug { + pub fn subset(&self, size: usize) -> Result> where S: Clone + core::fmt::Debug { trace!("subset {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, self.start + self.offset, self.start + self.offset + size); if self.start + self.offset + size > self.end { - Err(RangeReadError::OutOfData) + Err(StorageRangeError::OutOfData) } else { Ok(Self { storage: self.storage.clone(), @@ -101,13 +113,13 @@ impl RangeReader { } } -impl RmpRead for RangeReader where S::Error: core::fmt::Debug + 'static { - type Error = RangeReadError; +impl RmpRead for StorageRange where S::Error: core::fmt::Debug + 'static { + type Error = StorageRangeError; - fn read_exact_buf(&mut self, buf: &mut [u8]) -> Result<(), RangeReadError> { + fn read_exact_buf(&mut self, buf: &mut [u8]) -> Result<(), StorageRangeError> { let pos = self.start + self.offset; if pos > self.end { - Err(RangeReadError::OutOfData) + Err(StorageRangeError::OutOfData) } else { assert!(pos + buf.len() <= self.end); match self.storage.read(pos as u32, buf) { @@ -115,8 +127,69 @@ impl RmpRead for RangeReader where S::Error: core::fmt::Debug self.offset += buf.len(); Ok(()) }, - Err(err) => Err(RangeReadError::Storage(err)) + Err(err) => Err(StorageRangeError::Storage(err)) } } } } + + +impl RmpWrite for StorageRange where S::Error: core::fmt::Debug + 'static { + type Error = StorageRangeError; + + fn write_bytes(&mut self, buf: &[u8]) -> Result<(), Self::Error> { + let pos = self.start + self.offset; + if pos > self.end { + Err(StorageRangeError::OutOfData) + } else { + assert!(pos + buf.len() <= self.end); + match self.storage.write(pos as u32, buf) { + Ok(_) => { + self.offset += buf.len(); + Ok(()) + }, + Err(err) => Err(StorageRangeError::Storage(err)) + } + } + } +} + +pub struct SimDataRecorder { + storage: StorageRange, + last_stamp: Instant +} + +impl SimDataRecorder where S::Error: core::fmt::Debug + 'static { + pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result>>> { + let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data( + esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined, + ); + info!("Searching for sim data partition"); + let data_partition = partitions.iter().find(|partition| { + partition.partition_type() == partition_type && partition.label_as_str() == "sim" + }).ok_or(SimDataError::PartitionNotFound)?; + + let start = data_partition.offset() as usize; + let end = data_partition.len() as usize + start; + let mut writer = StorageRange::new(storage.clone(), start, end); + warn!("Writing new simulation data at {start:#02x}:{end:#02x}"); + StreamIndex { count: 1 }.write_rmp(&mut writer)?; + StreamHeader { id: StreamType::Bundle, size: 0 }.write_rmp(&mut writer)?; + Ok(Self { + storage: writer, + last_stamp: Instant::now() + }) + } + + pub fn write_next(&mut self, event: T) -> Result<(), SimDataError>>> { + BundleEventHeader { id: T::stream_id() }.write_rmp(&mut self.storage)?; + let now = Instant::now(); + let event = StreamEvent { + data: event, + timecode: (now - self.last_stamp).as_millis() as f64 / 1000.0 + }; + event.write_rmp(&mut self.storage)?; + self.last_stamp = now; + Ok(()) + } +} \ No newline at end of file diff --git a/src/tasks/simulation.rs b/src/tasks/simulation.rs index b4e77f7..d19dc71 100644 --- a/src/tasks/simulation.rs +++ b/src/tasks/simulation.rs @@ -1,23 +1,25 @@ +use core::usize; + use embassy_sync::channel::DynamicSender; use embassy_time::{Duration, Timer}; -use embedded_storage::ReadStorage; +use embedded_storage::{ReadStorage, Storage}; use esp_bootloader_esp_idf::partitions::PartitionTable; +use esp_hal::time::Instant; use esp_storage::FlashStorage; use figments::liber8tion::interpolate::Fract8; use nalgebra::{Vector2, Vector3}; use log::*; -use rmp::decode::ValueReadError; +use rmp::{decode::ValueReadError, encode::ValueWriteError}; -use crate::{Breaker, events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}, storage::{RangeReadError, RangeReader, SharedFlash}}; +use crate::{Breaker, events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, BundleEventHeader, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}, storage::{SharedFlash, StorageRange, StorageRangeError}}; pub struct SimDataTable { - reader: RangeReader, + reader: StorageRange, count: usize, index: usize } -impl SimDataTable where S::Error: core::fmt::Debug + 'static { - +impl SimDataTable where S::Error: core::fmt::Debug + 'static { pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result> { let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data( esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined, @@ -30,7 +32,7 @@ impl SimDataTable where S::Error: core::fmt::Debug + let start = data_partition.offset() as usize; let end = data_partition.len() as usize + start; info!("Opening simulation data at {start:#02x}:{end:#02x}"); - let mut reader = RangeReader::new(storage.clone(), start, end); + let mut reader = StorageRange::new(storage, start, end); if let Ok(index) = StreamIndex::from_rmp(&mut reader) { info!("Found stream index: {index:?}"); Ok(Self { @@ -55,9 +57,11 @@ impl Iterator for SimDataTable whe loop { match StreamHeader::from_rmp(&mut self.reader) { Ok(header) => { - match self.reader.subset(header.size) { + info!("Found stream header: {header:?}"); + let stream_size = if header.size == 0 { self.reader.capacity() - self.reader.pos() } else { header.size }; + match self.reader.subset(stream_size) { Ok(sensor_reader) => { - self.reader.seek(header.size).unwrap_or_else(|err| { + self.reader.seek(stream_size).unwrap_or_else(|err| { error!("Simulation data appears bigger than the storage capacity: {err:?}") }); self.index += 1; @@ -88,8 +92,8 @@ impl Iterator for SimDataTable whe } pub struct SimDataReader { - reader: RangeReader, - srcid: SensorSource, + reader: StorageRange, + srcid: StreamType, runtime: Duration, event_count: usize, index: usize @@ -118,39 +122,44 @@ impl From for Measurement { } impl SimDataReader where S::Error: core::fmt::Debug + 'static { - pub fn open(mut reader: RangeReader, stream_type: StreamType) -> Self { + pub fn open(mut reader: StorageRange, stream_type: StreamType) -> Self { debug!("Opening {stream_type:?} sim data chunk"); - let event_count = EventStreamHeader::from_rmp(&mut reader).unwrap().count; + let event_count = if stream_type != StreamType::Bundle { EventStreamHeader::from_rmp(&mut reader).unwrap().count } else { usize::MAX }; debug!("Found {event_count} events!"); Self { reader, - srcid: stream_type.into(), + srcid: stream_type, runtime: Default::default(), event_count, index: 0 } } - pub fn srcid(&self) -> SensorSource { + pub fn srcid(&self) -> StreamType { self.srcid } - async fn read_next_event>(&mut self) -> Result>>> { + async fn read_next_event>(&mut self) -> Result>>> { let event = StreamEvent::::from_rmp(&mut self.reader)?; let delay = embassy_time::Duration::from_millis((event.timecode * 1000.0) as u64); self.runtime += delay; + info!("waiting {delay}"); Timer::after(delay).await; Ok(event.data.into()) } - pub async fn read_next(&mut self) -> Result, SimDataError>>> { + pub async fn read_next(&mut self) -> Result, SimDataError>>> { if self.index < self.event_count { self.index += 1; + let next_id = match self.srcid { + StreamType::Bundle => BundleEventHeader::from_rmp(&mut self.reader)?.id, + _ => self.srcid + }; // The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some() - Ok(Some(match self.srcid { - SensorSource::IMU => self.read_next_event::().await?, - SensorSource::GPS => self.read_next_event::().await?, - SensorSource::Annotations => self.read_next_event::().await?, + Ok(Some(match next_id { + StreamType::IMU => self.read_next_event::().await?, + StreamType::GPS => self.read_next_event::().await?, + StreamType::Annotations => self.read_next_event::().await?, srcid => unimplemented!("{srcid:?} is not a simulatable sensor yet!") })) } else { @@ -164,7 +173,7 @@ pub async fn simulation_task(mut reader: SimDataReader warn!("Starting simulation for {:?}", reader.srcid()); events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::AcquiringFix)).await; - events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Online)).await; + //events.send(Measurement::SensorHardwareStatus(reader.srcid().into(), SensorState::Online)).await; let mut idx = 0; let mut idx_breaker = Breaker::default(); @@ -177,14 +186,14 @@ pub async fn simulation_task(mut reader: SimDataReader let pct = (idx as f32) / (reader.event_count as f32); idx_breaker.set((pct * 255.0) as u8); if let Some(pct) = idx_breaker.read_tripped() { - events.send(Measurement::SimulationProgress(reader.srcid(), reader.runtime, Fract8::from_raw(pct))).await; + //events.send(Measurement::SimulationProgress(reader.srcid().into(), reader.runtime, Fract8::from_raw(pct))).await; } idx += 1; }, Ok(None) => { warn!("End of simulation data stream"); break - } + }, Err(err) => { warn!("Error during sensor stream: {err:?}"); break @@ -192,7 +201,7 @@ pub async fn simulation_task(mut reader: SimDataReader } } - events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Offline)).await; + //events.send(Measurement::SensorHardwareStatus(reader.srcid().into(), SensorState::Offline)).await; events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::Degraded)).await; warn!("End of simulation for {:?}", reader.srcid());