diff --git a/build.rs b/build.rs index df8020a..850c71d 100644 --- a/build.rs +++ b/build.rs @@ -1,5 +1,5 @@ use std::fs; -use std::io::{Read, Seek, Write}; +use std::io::{Read, Write}; use std::path::Path; use std::fs::File; use image::GenericImageView; @@ -78,7 +78,10 @@ fn write_sim_data() { println!("cargo::rerun-if-changed={}", gps_input.to_str().unwrap()); if !gps_output.exists() || gps_output.metadata().unwrap().modified().unwrap() < gps_input.metadata().unwrap().modified().unwrap() { + let mut gps_data = Reader::from_reader(File::open(gps_input.clone()).unwrap()); + let record_count = gps_data.records().count() as u32; let mut gps_data = Reader::from_reader(File::open(gps_input).unwrap()); + let headers = gps_data.headers().unwrap(); let (timestamp_idx, lat_idx, lon_idx) = ( headers.iter().position(|x| { x == "seconds_elapsed" }).unwrap(), @@ -86,6 +89,7 @@ fn write_sim_data() { headers.iter().position(|x| { x == "latitude" }).unwrap(), ); let mut gps_output = File::create(gps_output.clone()).unwrap(); + rmp::encode::write_array_len(&mut gps_output, record_count).unwrap(); let mut last_stamp = 0.0; for record in gps_data.records().flatten() { let (timestamp, lat, lon) = ( @@ -113,8 +117,12 @@ fn write_sim_data() { } }; if rebuild_motion { - let mut accel_data = Reader::from_reader(File::open(accel_input).unwrap()); + let mut accel_data = Reader::from_reader(File::open(accel_input.clone()).unwrap()); let mut gyro_data = Reader::from_reader(File::open(gyro_input).unwrap()); + + let record_count = accel_data.records().count() as u32; + let mut accel_data = Reader::from_reader(File::open(accel_input).unwrap()); + let headers = accel_data.headers().unwrap(); let (timestamp_idx, accel_x_idx, accel_y_idx, accel_z_idx) = ( headers.iter().position(|x| { x == "seconds_elapsed" }).unwrap(), @@ -131,6 +139,7 @@ fn write_sim_data() { ); let mut motion_output = File::create(motion_output.clone()).unwrap(); + rmp::encode::write_array_len(&mut motion_output, record_count).unwrap(); let mut last_stamp = 0.0; for (accel_record, gyro_record) in accel_data.records().flatten().zip(gyro_data.records().flatten()) { let (timestamp, accel_x, accel_y, accel_z) = ( @@ -162,6 +171,10 @@ fn write_sim_data() { // GPS data = 2, motion data = 1 let mut unified_fd = File::create(unified_output.clone()).unwrap(); + + // Write out the stream index, which will be 2 (motion + gps) + rmp::encode::write_array_len(&mut unified_fd, 2).unwrap(); + let mut motion_output = File::open(motion_output).unwrap(); let mut gps_output = File::open(gps_output).unwrap(); rmp::encode::write_ext_meta(&mut unified_fd, motion_output.metadata().unwrap().len() as u32, 1).unwrap(); diff --git a/src/bin/main.rs b/src/bin/main.rs index 2d8cd0b..de55a67 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -130,15 +130,9 @@ async fn main(spawner: Spawner) { let mut storage = SharedFlash::new(FlashStorage::new()); let mut buf = [8; 1024]; let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut buf).unwrap(); - let data_partition = partitions.find_partition( - esp_bootloader_esp_idf::partitions::PartitionType::Data( - esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined - )).expect("Unable to read partition table").expect("Could not find data partition!"); - let data_offset = data_partition.offset() as usize; - info!("Loading simulation data starting at {data_offset:#02x}"); - for sim_data in SimDataTable::open(storage.clone(), data_offset) { + for sim_data in SimDataTable::open(storage, partitions).expect("Could not find partition for sim data!") { info!("Found simulation data for {:?}", sim_data.srcid()); - if spawner.spawn(renderbug_embassy::tasks::simulation::simulation_task(sim_data, garage.motion.dyn_sender())).is_err() { + if spawner.spawn(renderbug_embassy::tasks::simulation::simulation_task(sim_data, measurements.dyn_sender())).is_err() { error!("Unable to spawn simulation task! Increase the task pool size."); } } diff --git a/src/events.rs b/src/events.rs index 65a2596..dd6feca 100644 --- a/src/events.rs +++ b/src/events.rs @@ -43,7 +43,8 @@ pub enum Measurement { SensorHardwareStatus(SensorSource, SensorState), // Simulation metadata updates - SimulationProgress(SensorSource, Duration, f32) + SimulationProgress(SensorSource, Duration, f32), + Annotation } #[derive(Clone, Copy, Debug)] @@ -83,27 +84,30 @@ pub enum Telemetry { // GPS data = 2, motion data = 1 #[derive(Debug, EnumSetType, Enum)] pub enum SensorSource { - Unknown = 0, // Real hardware IMU = 1, GPS = 2, // Fusion outputs - GravityReference, + GravityReference = 100, ForwardsReference, Location, // Simulated sensors Demo, - Simulation + Simulation, + Annotations = 3 } -impl From for SensorSource { - fn from(value: i8) -> Self { +impl TryFrom for SensorSource { + type Error = (); + + fn try_from(value: i8) -> Result { match value { - 1 => SensorSource::IMU, - 2 => SensorSource::GPS, - _ => SensorSource::Unknown + 1 => Ok(SensorSource::IMU), + 2 => Ok(SensorSource::GPS), + 3 => Ok(SensorSource::Annotations), + _ => Err(()) } } } diff --git a/src/tasks/motion.rs b/src/tasks/motion.rs index 54683fc..c0f85d1 100644 --- a/src/tasks/motion.rs +++ b/src/tasks/motion.rs @@ -29,6 +29,7 @@ pub async fn motion_task(src: DynamicReceiver<'static, Measurement>, ui_sink: Dy ui_sink.publish(Notification::SensorStatus(source, state)).await; }, Measurement::SimulationProgress(source, duration, _pct) => debug!("{source:?} simulation time: {}", duration.as_secs()), + Measurement::Annotation => () } } } \ No newline at end of file diff --git a/src/tasks/simulation.rs b/src/tasks/simulation.rs index efe61cd..7be9f26 100644 --- a/src/tasks/simulation.rs +++ b/src/tasks/simulation.rs @@ -1,17 +1,19 @@ use core::cell::RefCell; use core::fmt::Formatter; -use alloc::rc::Rc; +use alloc::{collections::btree_map::Range, rc::Rc}; use embassy_sync::channel::DynamicSender; use embassy_time::{Duration, Timer}; use embedded_storage::{ReadStorage, Storage}; +use esp_bootloader_esp_idf::partitions::{PartitionEntry, PartitionTable}; use esp_storage::FlashStorage; use nalgebra::{Vector2, Vector3}; use log::*; -use rmp::decode::{RmpRead, RmpReadErr}; +use rmp::decode::{DecodeStringError, RmpRead, RmpReadErr, ValueReadError}; use crate::events::{Measurement, SensorSource, SensorState}; +#[derive(Debug)] pub struct SharedFlash { storage: Rc> } @@ -51,85 +53,168 @@ impl ReadStorage for SharedFlash { } pub struct SimDataTable { - storage: S, - reader: OffsetReader, + reader: RangeReader, + count: usize, + index: usize } -impl SimDataTable { - pub fn open(storage: S, offset: usize) -> Self { - debug!("Opening simulation data at {offset:#02x}"); - Self { - reader: OffsetReader::new(storage.clone(), offset), - storage +impl SimDataTable where S::Error: core::fmt::Debug { + + pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result> { + let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data( + esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined, + ); + info!("Searching for sim data partition"); + let data_partition = partitions.iter().find(|partition| { + partition.partition_type() == partition_type && partition.label_as_str() == "sim" + }).expect("Unable to locate 'sim' data partition!"); + + let start = data_partition.offset() as usize; + let end = data_partition.len() as usize + start; + info!("Opening simulation data at {start:#02x}:{end:#02x}"); + let mut reader = RangeReader::new(storage.clone(), start, end); + if let Ok(count) = rmp::decode::read_array_len(&mut reader) { + info!("Found {count} streams of simulation data"); + Ok(Self { + reader, + count: count as usize, + index: 0 + }) + } else { + error!("Stream index is missing! Have you flashed the partition yet?"); + Err(SimDataError::StreamIndexMissing) } } } -impl Iterator for SimDataTable { +impl Iterator for SimDataTable where S::Error: core::fmt::Debug + 'static { type Item = SimDataReader; fn next(&mut self) -> Option { - if let Ok(this_type) = rmp::decode::read_ext_meta(&mut self.reader) { - debug!("Found type={this_type:?} offset={:#02x}", self.reader.pos()); - let start_pos = self.reader.pos(); - self.reader.offset += this_type.size as usize; - match this_type.typeid.into() { - SensorSource::Unknown => panic!("Unsupported sensor type"), - srcid => Some(SimDataReader::open(self.storage.clone(), start_pos, srcid)) - } - } else { + if self.index >= self.count { None + } else { + loop { + match rmp::decode::read_ext_meta(&mut self.reader) { + Ok(this_type) => { + let sensor_reader = self.reader.subset(this_type.size as usize); + self.reader.seek(this_type.size as usize); + self.index += 1; + debug!("Found type={this_type:?}"); + match this_type.typeid.try_into() { + Err(()) => error!("Found unknown simulation data chunk {this_type:?}"), + Ok(srcid) => return Some(SimDataReader::open(sensor_reader, srcid)) + } + }, + Err(err) => { + error!("Read error while decoding simulation data {err:?}"); + return None; + } + } + } } } } -struct OffsetReader { +#[derive(Debug)] +pub struct RangeReader { storage: S, + start: usize, + end: usize, offset: usize } -impl OffsetReader { - pub const fn new(storage: S, offset: usize) -> Self { +impl RangeReader { + pub const fn new(storage: S, start: usize, end: usize) -> Self { + assert!(start <= end); // TODO: Should add bounds checking since we will know the size of the chunk already Self { storage, - offset + start, + end, + offset: 0 } } - pub const fn pos(&self) -> usize { - self.offset + pub fn seek(&mut self, offset: usize) { + self.offset += offset; + } + + pub fn subset(&self, size: usize) -> Self where S: Clone + core::fmt::Debug { + trace!("subset {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, self.start + self.offset, self.start + self.offset + size); + assert!(self.start + self.offset + size < self.end); + Self { + storage: self.storage.clone(), + start: self.offset + self.start, + end: self.start + self.offset + size, + offset: 0 + } } } -impl RmpRead for OffsetReader { - type Error = RmpErr; +impl RmpRead for RangeReader where S::Error: core::fmt::Debug + 'static { + type Error = RangeReadError; fn read_exact_buf(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> { - match self.storage.read(self.offset as u32, buf) { - Ok(_) => { - self.offset += buf.len(); - Ok(()) - }, - _ => Err(RmpErr{}) + let pos = self.start + self.offset; + if pos > self.end { + Err(RangeReadError::OutOfData) + } else { + assert!(pos + buf.len() <= self.end); + match self.storage.read(pos as u32, buf) { + Ok(_) => { + self.offset += buf.len(); + Ok(()) + }, + Err(err) => Err(RangeReadError::Storage(err)) + } } } } pub struct SimDataReader { - reader: OffsetReader, + reader: RangeReader, srcid: SensorSource, - runtime: Duration + runtime: Duration, + event_count: usize, + index: usize } -impl SimDataReader { - pub fn open(storage: S, offset: usize, srcid: SensorSource) -> Self { - debug!("Opening {srcid:?} sim data chunk at {offset:#02x}"); +#[derive(Debug)] +pub enum SimDataError where S::Error: core::fmt::Debug + 'static { + StreamIndexMissing, + InvalidChunkSize { expected: usize, found: usize }, + MissingTimecode, + BadString, + DecodeError(ValueReadError>), + EndOfStream +} + +impl From>> for SimDataError where S::Error: core::fmt::Debug { + fn from(value: ValueReadError>) -> Self { + SimDataError::DecodeError(value) + } +} + + +impl From>> for SimDataError where S::Error: core::fmt::Debug { + fn from(value: DecodeStringError<'_, RangeReadError>) -> Self { + SimDataError::BadString + } +} + +impl SimDataReader where S::Error: core::fmt::Debug + 'static { + pub fn open(mut reader: RangeReader, srcid: SensorSource) -> Self { + debug!("Opening {srcid:?} sim data chunk"); + let event_count = rmp::decode::read_array_len(&mut reader).unwrap() as usize; + debug!("Found {event_count} events!"); Self { - reader: OffsetReader::new(storage, offset), + reader, srcid, - runtime: Default::default() + runtime: Default::default(), + event_count, + index: 0 } } @@ -137,48 +222,70 @@ impl SimDataReader { self.srcid } - pub async fn read_next(&mut self) -> Result { - match self.srcid { - SensorSource::IMU => self.read_motion().await, - SensorSource::GPS => self.read_gps().await, - srcid => unimplemented!("{srcid:?} is not a simulatable sensor!") + pub async fn read_next(&mut self) -> Result, SimDataError> { + if self.index < self.event_count { + self.index += 1; + // The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some() + match self.srcid { + SensorSource::IMU => self.read_motion().await, + SensorSource::GPS => self.read_gps().await, + SensorSource::Annotations => self.read_annotation().await, + srcid => unimplemented!("{srcid:?} is not a simulatable sensor!") + }.map(|x| { Some(x) }) + } else { + Ok(None) } } - fn verify_chunk_len(&mut self, length: u32) { - let chunk_len = rmp::decode::read_array_len(&mut self.reader).expect("Could not find the chunk length!"); - assert_eq!(chunk_len, length, "Expected {length} fields but instead found {chunk_len}"); + fn verify_chunk_len(&mut self, length: u32) -> Result<(), SimDataError> { + let chunk_len = rmp::decode::read_array_len(&mut self.reader)?; + if chunk_len != length { + Err(SimDataError::InvalidChunkSize { expected: length as usize, found: chunk_len as usize }) + } else { + Ok(()) + } } - async fn read_delay_field(&mut self) { - let delay = embassy_time::Duration::from_millis((rmp::decode::read_f64(&mut self.reader).expect("Expected to find timestamp") * 1000.0) as u64); + async fn read_delay_field(&mut self) -> Result<(), SimDataError> { + let timecode = rmp::decode::read_f64(&mut self.reader)?; + let delay = embassy_time::Duration::from_millis((timecode * 1000.0) as u64); self.runtime += delay; - Timer::after(delay).await + Timer::after(delay).await; + Ok(()) } - async fn read_motion(&mut self) -> Result { - self.verify_chunk_len(7); - self.read_delay_field().await; + async fn read_annotation(&mut self) -> Result> { + self.verify_chunk_len(3)?; + self.read_delay_field().await?; + let mut buf = [0; 256]; + let msg = rmp::decode::read_str(&mut self.reader, &mut buf)?; + warn!("ANNOATION: {msg}"); + Ok(Measurement::Annotation) + } + + async fn read_motion(&mut self) -> Result> { + self.verify_chunk_len(7)?; + self.read_delay_field().await?; let accel = Vector3::new( - rmp::decode::read_f64(&mut self.reader).unwrap() as f32, - rmp::decode::read_f64(&mut self.reader).unwrap() as f32, - rmp::decode::read_f64(&mut self.reader).unwrap() as f32, + rmp::decode::read_f64(&mut self.reader)? as f32, + rmp::decode::read_f64(&mut self.reader)? as f32, + rmp::decode::read_f64(&mut self.reader)? as f32, ); let gyro = Vector3::new( - rmp::decode::read_f64(&mut self.reader).unwrap() as f32, - rmp::decode::read_f64(&mut self.reader).unwrap() as f32, - rmp::decode::read_f64(&mut self.reader).unwrap() as f32, + rmp::decode::read_f64(&mut self.reader)? as f32, + rmp::decode::read_f64(&mut self.reader)? as f32, + rmp::decode::read_f64(&mut self.reader)? as f32, ); Ok(Measurement::IMU { accel, gyro }) } - async fn read_gps(&mut self) -> Result { - self.verify_chunk_len(3); - self.read_delay_field().await; + async fn read_gps(&mut self) -> Result> { + self.verify_chunk_len(3)?; + self.read_delay_field().await?; let coords = Vector2::new( - rmp::decode::read_f64(&mut self.reader).unwrap(), - rmp::decode::read_f64(&mut self.reader).unwrap() + rmp::decode::read_f64(&mut self.reader)?, + rmp::decode::read_f64(&mut self.reader)? ); Ok(Measurement::GPS(Some(coords))) @@ -186,10 +293,13 @@ impl SimDataReader { } #[derive(Debug)] -pub struct RmpErr{} -impl RmpReadErr for RmpErr {} +pub enum RangeReadError { + OutOfData, + Storage(E) +} +impl RmpReadErr for RangeReadError {} -impl core::fmt::Display for RmpErr { +impl core::fmt::Display for RangeReadError { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { f.write_str("RmpErr") } @@ -203,8 +313,18 @@ pub async fn simulation_task(mut reader: SimDataReader events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Online)).await; // TODO: SimulationProgress updates - while let Ok(next_evt) = reader.read_next().await { - events.send(next_evt).await; + loop { + match reader.read_next().await { + Ok(Some(next_evt)) => events.send(next_evt).await, + Ok(None) => { + warn!("End of simulation data stream"); + break + } + Err(err) => { + warn!("Error during sensor stream: {err:?}"); + break + } + } } events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Offline)).await;