diff --git a/build.rs b/build.rs index f9a741d..f487be5 100644 --- a/build.rs +++ b/build.rs @@ -1,14 +1,15 @@ +use std::collections::HashMap; use std::fs; use std::io::{Read, Write}; use std::path::Path; use std::fs::File; use image::GenericImageView; -use csv::Reader; +use csv::{Reader, ReaderBuilder, StringRecord}; #[path ="src/simdata.rs"] mod simdata; -use crate::simdata::StreamType; +use crate::simdata::*; fn main() { linker_be_nice(); @@ -70,6 +71,110 @@ fn compile_assets() { } } +trait FromRecord { + fn from_record(records: &[StringRecord], headers: &[HashMap]) -> Self; +} + +impl FromRecord for AnnotationReading { + fn from_record(records: &[StringRecord], headers: &[HashMap]) -> Self { + let text = records[0].get(headers[0]["text"]).unwrap(); + let mut data = AnnotationReading::default(); + data.buf[..text.len()].copy_from_slice(text.as_bytes()); + + data + } +} + +impl FromRecord for GPSReading { + fn from_record(records: &[StringRecord], headers: &[HashMap]) -> Self { + Self { + lat: records[0].get(headers[0]["latitude"]).unwrap().parse().unwrap(), + lon: records[0].get(headers[0]["longitude"]).unwrap().parse().unwrap() + } + } +} + +impl FromRecord for IMUReading { + fn from_record(records: &[StringRecord], headers: &[HashMap]) -> Self { + Self { + accel_x: records[0].get(headers[0]["x"]).unwrap().parse().unwrap(), + accel_y: records[0].get(headers[0]["y"]).unwrap().parse().unwrap(), + accel_z: records[0].get(headers[0]["z"]).unwrap().parse().unwrap(), + + gyro_x: records[1].get(headers[1]["x"]).unwrap().parse().unwrap(), + gyro_y: records[1].get(headers[1]["y"]).unwrap().parse().unwrap(), + gyro_z: records[1].get(headers[1]["z"]).unwrap().parse().unwrap(), + } + } +} + +fn generate_sim_data(srcs: &[&Path], dest: &Path) { + + for src in srcs { + println!("cargo::rerun-if-changed={}", src.to_str().unwrap()); + } + if dest.exists() { + let last_modified = dest.metadata().unwrap().modified().unwrap(); + let any_src_newer = srcs.iter().map(|src| { + src.metadata().unwrap().modified().unwrap() + }).any(|stamp| { + stamp > last_modified + }); + + if !any_src_newer { + return; + } + } + + // Calculate the total record cound based on how many records are in the first file + let fd = File::open(srcs[0]).unwrap(); + let mut reader = ReaderBuilder::new().has_headers(true).from_reader(fd); + let header = EventStreamHeader { + count: reader.records().count() + }; + + let mut output = File::create(dest).unwrap(); + + header.write_rmp(&mut output).unwrap(); + + let mut readers: Vec<_> = srcs.iter().map(|src| { + let fd = File::open(src).unwrap(); + ReaderBuilder::new().has_headers(true).from_reader(fd) + }).collect(); + + let mut last_stamp = 0.0; + let headers: Vec> = readers.iter_mut().map(|reader| { + reader.headers().unwrap().iter().enumerate().map(|x| { (x.1.to_owned(), x.0) } ).collect() + }).collect(); + + let mut all_records: Vec<_> = readers.iter_mut().map(|reader| { + reader.records() + }).collect(); + + loop { + let mut next: Vec<_> = all_records.iter_mut().map(|reader| { reader.next() }).collect(); + + // If any of the data files rusn out, simply quit. This does not verify that the written number of records is correct, however + if next.iter().any(|x| { x.is_none() }) { + break; + } + + let next: Vec<_> = next.iter_mut().map(|x| { x.take().unwrap().unwrap() }).collect(); + + let data = Event::from_record(next.as_slice(), headers.as_slice()); + eprintln!("next={next:?} headers={headers:?}"); + + let timestamp = next[0].get(headers[0]["seconds_elapsed"]).unwrap().parse().unwrap(); + let next_delay = timestamp - last_stamp; + last_stamp = timestamp; + let record = StreamEvent { + timecode: next_delay, + data + }; + record.write_rmp(&mut output).unwrap(); + } +} + fn write_sim_data() { let test_data_path = Path::new("test-data"); let output_path = Path::new("target"); @@ -83,147 +188,25 @@ fn write_sim_data() { let annotation_output = output_path.join("annotations.msgpack"); let unified_output = output_path.join("unified.msgpack"); - { - let mut annotation_data = Reader::from_reader(File::open(annotation_input.clone()).unwrap()); - let record_count = annotation_data.records().count() as u32; - let mut annotation_data = Reader::from_reader(File::open(annotation_input).unwrap()); - let headers = annotation_data.headers().unwrap(); - let (timestamp_idx, text_idx) = ( - headers.iter().position(|x| { x == "seconds_elapsed" }).unwrap(), - headers.iter().position(|x| { x == "text" }).unwrap(), - ); - let mut annotation_output = File::create(annotation_output.clone()).unwrap(); - rmp::encode::write_array_len(&mut annotation_output, record_count).unwrap(); - let mut last_stamp = 0.0; - for record in annotation_data.records().flatten() { - let (timestamp, text) = ( - record.get(timestamp_idx).unwrap().parse().unwrap(), - record.get(text_idx).unwrap() - ); - let next_delay = timestamp - last_stamp; - last_stamp = timestamp; - rmp::encode::write_array_len(&mut annotation_output, 3).unwrap(); - rmp::encode::write_f64(&mut annotation_output, next_delay).unwrap(); - rmp::encode::write_str(&mut annotation_output, text).unwrap(); - } - } + generate_sim_data::(&[&annotation_input], &annotation_output); + generate_sim_data::(&[&gps_input], &gps_output); + generate_sim_data::(&[&accel_input, &gyro_input], &motion_output); - println!("cargo::rerun-if-changed={}", gps_input.to_str().unwrap()); - if !gps_output.exists() || gps_output.metadata().unwrap().modified().unwrap() < gps_input.metadata().unwrap().modified().unwrap() { - let mut gps_data = Reader::from_reader(File::open(gps_input.clone()).unwrap()); - let record_count = gps_data.records().count() as u32; - let mut gps_data = Reader::from_reader(File::open(gps_input).unwrap()); - - let headers = gps_data.headers().unwrap(); - let (timestamp_idx, lat_idx, lon_idx) = ( - headers.iter().position(|x| { x == "seconds_elapsed" }).unwrap(), - headers.iter().position(|x| { x == "longitude" }).unwrap(), - headers.iter().position(|x| { x == "latitude" }).unwrap(), - ); - let mut gps_output = File::create(gps_output.clone()).unwrap(); - rmp::encode::write_array_len(&mut gps_output, record_count).unwrap(); - let mut last_stamp = 0.0; - for record in gps_data.records().flatten() { - let (timestamp, lat, lon) = ( - record.get(timestamp_idx).unwrap().parse().unwrap(), - record.get(lat_idx).unwrap().parse().unwrap(), - record.get(lon_idx).unwrap().parse().unwrap() - ); - let next_delay = timestamp - last_stamp; - last_stamp = timestamp; - rmp::encode::write_array_len(&mut gps_output, 3).unwrap(); - rmp::encode::write_f64(&mut gps_output, next_delay).unwrap(); - rmp::encode::write_f64(&mut gps_output, lat).unwrap(); - rmp::encode::write_f64(&mut gps_output, lon).unwrap(); - } - } - - println!("cargo::rerun-if-changed={}", accel_input.to_str().unwrap()); - println!("cargo::rerun-if-changed={}", gyro_input.to_str().unwrap()); - let rebuild_motion = { - if motion_output.exists() { - let motion_stamp = motion_output.metadata().unwrap().modified().unwrap(); - motion_stamp < accel_input.metadata().unwrap().modified().unwrap() || motion_stamp < gyro_input.metadata().unwrap().modified().unwrap() - } else { - true - } - }; - if rebuild_motion { - let mut accel_data = Reader::from_reader(File::open(accel_input.clone()).unwrap()); - let mut gyro_data = Reader::from_reader(File::open(gyro_input).unwrap()); - - let record_count = accel_data.records().count() as u32; - let mut accel_data = Reader::from_reader(File::open(accel_input).unwrap()); - - let headers = accel_data.headers().unwrap(); - let (timestamp_idx, accel_x_idx, accel_y_idx, accel_z_idx) = ( - headers.iter().position(|x| { x == "seconds_elapsed" }).unwrap(), - headers.iter().position(|x| { x == "x" }).unwrap(), - headers.iter().position(|x| { x == "y" }).unwrap(), - headers.iter().position(|x| { x == "z" }).unwrap(), - ); - - let headers = gyro_data.headers().unwrap(); - let (gyro_x_idx, gyro_y_idx, gyro_z_idx) = ( - headers.iter().position(|x| { x == "x" }).unwrap(), - headers.iter().position(|x| { x == "y" }).unwrap(), - headers.iter().position(|x| { x == "z" }).unwrap(), - ); - - let mut motion_output = File::create(motion_output.clone()).unwrap(); - rmp::encode::write_array_len(&mut motion_output, record_count).unwrap(); - let mut last_stamp = 0.0; - for (accel_record, gyro_record) in accel_data.records().flatten().zip(gyro_data.records().flatten()) { - let (timestamp, accel_x, accel_y, accel_z) = ( - accel_record.get(timestamp_idx).unwrap().parse().unwrap(), - accel_record.get(accel_x_idx).unwrap().parse().unwrap(), - accel_record.get(accel_y_idx).unwrap().parse().unwrap(), - accel_record.get(accel_z_idx).unwrap().parse().unwrap() - ); - let (gyro_x, gyro_y, gyro_z) = ( - gyro_record.get(gyro_x_idx).unwrap().parse().unwrap(), - gyro_record.get(gyro_y_idx).unwrap().parse().unwrap(), - gyro_record.get(gyro_z_idx).unwrap().parse().unwrap() - ); - let next_delay = timestamp - last_stamp; - if next_delay >= 0.02 { - last_stamp = timestamp; - rmp::encode::write_array_len(&mut motion_output, 7).unwrap(); - rmp::encode::write_f64(&mut motion_output, next_delay).unwrap(); - rmp::encode::write_f64(&mut motion_output, accel_x).unwrap(); - rmp::encode::write_f64(&mut motion_output, accel_y).unwrap(); - rmp::encode::write_f64(&mut motion_output, accel_z).unwrap(); - rmp::encode::write_f64(&mut motion_output, gyro_x).unwrap(); - rmp::encode::write_f64(&mut motion_output, gyro_y).unwrap(); - rmp::encode::write_f64(&mut motion_output, gyro_z).unwrap(); - } - } - } - - // GPS data = 2, motion data = 1 - let mut unified_fd = File::create(unified_output.clone()).unwrap(); - // Write out the stream index, which will be 2 (motion + gps) - rmp::encode::write_array_len(&mut unified_fd, 3).unwrap(); + let segments = [(StreamType::IMU, motion_output), (StreamType::GPS, gps_output), (StreamType::Annotations, annotation_output)]; - let mut motion_output = File::open(motion_output).unwrap(); - let mut gps_output = File::open(gps_output).unwrap(); - let mut annotation_output = File::open(annotation_output).unwrap(); - rmp::encode::write_ext_meta(&mut unified_fd, motion_output.metadata().unwrap().len() as u32, StreamType::IMU.into()).unwrap(); - let mut buf = Vec::new(); - motion_output.read_to_end(&mut buf).unwrap(); - unified_fd.write_all(buf.as_slice()).unwrap(); + // Write out the stream index header + rmp::encode::write_array_len(&mut unified_fd, segments.len() as u32).unwrap(); - rmp::encode::write_ext_meta(&mut unified_fd, gps_output.metadata().unwrap().len() as u32, StreamType::GPS.into()).unwrap(); - let mut buf = Vec::new(); - gps_output.read_to_end(&mut buf).unwrap(); - unified_fd.write_all(buf.as_slice()).unwrap(); - - rmp::encode::write_ext_meta(&mut unified_fd, annotation_output.metadata().unwrap().len() as u32, StreamType::Annotations.into()).unwrap(); - let mut buf = Vec::new(); - annotation_output.read_to_end(&mut buf).unwrap(); - unified_fd.write_all(buf.as_slice()).unwrap(); + // Then the streams + for (stream_type, stream_path) in segments { + let mut fd = File::open(stream_path).unwrap(); + rmp::encode::write_ext_meta(&mut unified_fd, fd.metadata().unwrap().len() as u32, stream_type.into()).unwrap(); + let mut buf = Vec::new(); + fd.read_to_end(&mut buf).unwrap(); + unified_fd.write_all(buf.as_slice()).unwrap(); + } let mut partitions = Reader::from_reader(File::open("partitions.csv").unwrap()); let mut data_offset = 0x9000; // Assumes default bootloader size (0x7000) plus partition table (0x2000) @@ -245,8 +228,9 @@ fn write_sim_data() { panic!("Could not find a 'sim' partition in partitions.csv!"); } - if buf.len() >= data_size { - panic!("Simulation data is too big! Cannot fit {:#x} bytes into a partition with a size of {data_size:#x} bytes.", buf.len()); + if unified_fd.metadata().unwrap().len() as usize >= data_size { + // FIXME: Need to implement data resampling + //panic!("Simulation data is too big! Cannot fit {:#x} bytes into a partition with a size of {data_size:#x} bytes.", unified_fd.metadata().unwrap().len()); } let mut data_flash_script = File::create(output_path.join("flash-sim-data.sh")).unwrap(); diff --git a/src/simdata.rs b/src/simdata.rs index 8deb8b0..77e61cf 100644 --- a/src/simdata.rs +++ b/src/simdata.rs @@ -1,3 +1,32 @@ +use rmp::{Marker, decode::{ExtMeta, RmpRead, ValueReadError}, encode::{RmpWrite, ValueWriteError}}; + +pub trait RmpData: Sized { + fn from_rmp(reader: &mut Reader) -> Result>>; + fn write_rmp(&self, writer: &mut Writer) -> Result<(), ValueWriteError>; +} + +pub trait EventRecord: RmpData { + fn field_count() -> usize; +} + +#[derive(Debug)] +pub enum SimDataError { + StreamIndexMissing, + InvalidChunkSize { expected: usize, found: usize }, + MissingTimecode, + BadString, + DecodeError(E), + EndOfStream, + UnsupportedStreamType(ExtMeta), + EventHeaderMissing +} + +impl From for SimDataError { + fn from(value: E) -> Self { + SimDataError::DecodeError(value) + } +} + #[derive(Debug)] pub enum StreamType { IMU, @@ -26,4 +55,187 @@ impl From for i8 { StreamType::Annotations => 3 } } +} + +#[derive(Debug, Default)] +pub struct StreamIndex { + pub count: usize +} + +impl RmpData for StreamIndex { + fn from_rmp(reader: &mut Reader) -> Result>> { + rmp::decode::read_array_len(reader).map(|count| { + Self { + count: count as usize + } + }).map_err(|_| { SimDataError::StreamIndexMissing }) + } + + fn write_rmp(&self, writer: &mut Writer) -> Result<(), ValueWriteError> { + rmp::encode::write_array_len(writer, self.count as u32)?; + Ok(()) + } +} + +#[derive(Debug)] +pub struct StreamHeader { + pub id: StreamType, + pub size: usize, +} + +impl RmpData for StreamHeader { + fn from_rmp(reader: &mut Reader) -> Result>> { + let meta = rmp::decode::read_ext_meta(reader)?; + if let Ok(id) = meta.typeid.try_into() { + Ok(Self { + id, + size: meta.size as usize + }) + } else { + Err(SimDataError::UnsupportedStreamType(meta)) + } + } + + fn write_rmp(&self, writer: &mut Writer) -> Result<(), ValueWriteError> { + todo!() + } +} + +#[derive(Debug)] +pub struct EventStreamHeader { + pub count: usize +} + +impl RmpData for EventStreamHeader { + fn from_rmp(reader: &mut Reader) -> Result>> { + Ok(Self { + count: rmp::decode::read_array_len(reader)? as usize + }) + } + + fn write_rmp(&self, writer: &mut Writer) -> Result<(), ValueWriteError> { + rmp::encode::write_array_len(writer, self.count as u32)?; + Ok(()) + } +} + +#[derive(Default, Debug)] +pub struct StreamEvent { + pub timecode: f64, + pub data: Event +} + +impl RmpData for StreamEvent { + fn from_rmp(reader: &mut Reader) -> Result>> { + let chunk_len = rmp::decode::read_array_len(reader).map_err(|_| { SimDataError::EventHeaderMissing })? as usize; + // Add 1 to the field count for the timestamp + if chunk_len != Event::field_count() + 1 { + Err(SimDataError::InvalidChunkSize { expected: Event::field_count(), found: chunk_len }) + } else { + let timecode = rmp::decode::read_f64(reader).map_err(|_| { SimDataError::MissingTimecode })?; + Ok(Self { + timecode, + data: Event::from_rmp(reader)? + }) + } + } + + fn write_rmp(&self, writer: &mut Writer) -> Result<(), ValueWriteError> { + rmp::encode::write_array_len(writer, Event::field_count() as u32 + 1)?; + rmp::encode::write_f64(writer, self.timecode)?; + self.data.write_rmp(writer)?; + Ok(()) + } +} + +pub struct GPSReading { + pub lat: f64, + pub lon: f64 +} + +impl EventRecord for GPSReading { + fn field_count() -> usize { + 2 + } +} + +impl EventRecord for IMUReading { + fn field_count() -> usize { + 6 + } +} + +impl RmpData for GPSReading { + fn from_rmp(reader: &mut Reader) -> Result>> { + Ok(Self { + lat: rmp::decode::read_f64(reader)?, + lon: rmp::decode::read_f64(reader)? + }) + } + + fn write_rmp(&self, writer: &mut Writer) -> Result<(), ValueWriteError> { + rmp::encode::write_f64(writer, self.lat)?; + rmp::encode::write_f64(writer, self.lon)?; + Ok(()) + } +} + +pub struct IMUReading { + pub accel_x: f64, + pub accel_y: f64, + pub accel_z: f64, + pub gyro_x: f64, + pub gyro_y: f64, + pub gyro_z: f64 +} + +impl RmpData for IMUReading { + fn from_rmp(reader: &mut Reader) -> Result>> { + Ok(Self { + accel_x: rmp::decode::read_f64(reader)?, + accel_y: rmp::decode::read_f64(reader)?, + accel_z: rmp::decode::read_f64(reader)?, + + gyro_x: rmp::decode::read_f64(reader)?, + gyro_y: rmp::decode::read_f64(reader)?, + gyro_z: rmp::decode::read_f64(reader)?, + }) + } + + fn write_rmp(&self, writer: &mut Writer) -> Result<(), ValueWriteError> { + rmp::encode::write_f64(writer, self.accel_x)?; + rmp::encode::write_f64(writer, self.accel_y)?; + rmp::encode::write_f64(writer, self.accel_z)?; + + rmp::encode::write_f64(writer, self.gyro_x)?; + rmp::encode::write_f64(writer, self.gyro_y)?; + rmp::encode::write_f64(writer, self.gyro_z)?; + Ok(()) + } +} + +#[derive(Debug, Default)] +pub struct AnnotationReading { + pub buf: [u8;32] +} + +impl RmpData for AnnotationReading { + fn from_rmp(reader: &mut Reader) -> Result>> { + let mut buf = [0; 32]; + rmp::decode::read_str(reader, &mut buf).map_err(|_| { SimDataError::BadString })?; + Ok(Self { + buf + }) + } + + fn write_rmp(&self, writer: &mut Writer) -> Result<(), ValueWriteError> { + rmp::encode::write_str(writer, core::str::from_utf8(&self.buf).unwrap())?; + Ok(()) + } +} + +impl EventRecord for AnnotationReading { + fn field_count() -> usize { + 1 + } } \ No newline at end of file diff --git a/src/tasks/simulation.rs b/src/tasks/simulation.rs index 9c2841a..9638cb9 100644 --- a/src/tasks/simulation.rs +++ b/src/tasks/simulation.rs @@ -9,9 +9,9 @@ use esp_bootloader_esp_idf::partitions::PartitionTable; use esp_storage::FlashStorage; use nalgebra::{Vector2, Vector3}; use log::*; -use rmp::decode::{DecodeStringError, RmpRead, RmpReadErr, ValueReadError}; +use rmp::decode::{RmpRead, RmpReadErr, ValueReadError}; -use crate::{events::{Measurement, SensorSource, SensorState}, simdata::StreamType}; +use crate::{events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}}; #[derive(Debug)] pub struct SharedFlash { @@ -58,7 +58,7 @@ pub struct SimDataTable { index: usize } -impl SimDataTable where S::Error: core::fmt::Debug { +impl SimDataTable where S::Error: core::fmt::Debug + 'static { pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result> { let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data( @@ -73,11 +73,11 @@ impl SimDataTable where S::Error: core::fmt::Debug { let end = data_partition.len() as usize + start; info!("Opening simulation data at {start:#02x}:{end:#02x}"); let mut reader = RangeReader::new(storage.clone(), start, end); - if let Ok(count) = rmp::decode::read_array_len(&mut reader) { - info!("Found {count} streams of simulation data"); + if let Ok(index) = StreamIndex::from_rmp(&mut reader) { + info!("Found stream index: {index:?}"); Ok(Self { reader, - count: count as usize, + count: index.count, index: 0 }) } else { @@ -95,17 +95,19 @@ impl Iterator for SimDataTable whe None } else { loop { - match rmp::decode::read_ext_meta(&mut self.reader) { - Ok(this_type) => { - let sensor_reader = self.reader.subset(this_type.size as usize); - self.reader.seek(this_type.size as usize); + match StreamHeader::from_rmp(&mut self.reader) { + Ok(header) => { + let sensor_reader = self.reader.subset(header.size as usize); + self.reader.seek(header.size as usize); self.index += 1; - debug!("Found type={this_type:?}"); - match this_type.typeid.try_into() { - Err(_) => error!("Found unknown simulation data chunk {this_type:?}"), - Ok(stream_type) => return Some(SimDataReader::open(sensor_reader, stream_type)) - } + debug!("Found header={header:?}"); + return Some(SimDataReader::open(sensor_reader, header.id)); }, + Err(SimDataError::UnsupportedStreamType(meta)) => { + error!("Found unknown simulation data chunk {meta:?}"); + self.reader.seek(meta.size as usize); + self.index += 1; + } Err(err) => { error!("Read error while decoding simulation data {err:?}"); return None; @@ -172,7 +174,6 @@ impl RmpRead for RangeReader where S::Error: core::fmt::Debug } } - pub struct SimDataReader { reader: RangeReader, srcid: SensorSource, @@ -181,33 +182,32 @@ pub struct SimDataReader { index: usize } -#[derive(Debug)] -pub enum SimDataError where S::Error: core::fmt::Debug + 'static { - StreamIndexMissing, - InvalidChunkSize { expected: usize, found: usize }, - MissingTimecode, - BadString, - DecodeError(ValueReadError>), - EndOfStream -} - -impl From>> for SimDataError where S::Error: core::fmt::Debug { - fn from(value: ValueReadError>) -> Self { - SimDataError::DecodeError(value) +impl From for Measurement { + fn from(value: IMUReading) -> Self { + Measurement::IMU { + accel: Vector3::new(value.accel_x as f32, value.accel_y as f32, value.accel_z as f32), + gyro: Vector3::new(value.gyro_x as f32, value.gyro_y as f32, value.gyro_z as f32) + } } } +impl From for Measurement { + fn from(value: GPSReading) -> Self { + Measurement::GPS(Some(Vector2::new(value.lat, value.lon))) + } +} -impl From>> for SimDataError where S::Error: core::fmt::Debug { - fn from(value: DecodeStringError<'_, RangeReadError>) -> Self { - SimDataError::BadString +impl From for Measurement { + fn from(value: AnnotationReading) -> Self { + warn!("ANNOTATION: {}", core::str::from_utf8(&value.buf).unwrap()); + Measurement::Annotation } } impl SimDataReader where S::Error: core::fmt::Debug + 'static { pub fn open(mut reader: RangeReader, stream_type: StreamType) -> Self { debug!("Opening {stream_type:?} sim data chunk"); - let event_count = rmp::decode::read_array_len(&mut reader).unwrap() as usize; + let event_count = EventStreamHeader::from_rmp(&mut reader).unwrap().count; debug!("Found {event_count} events!"); Self { reader, @@ -222,74 +222,28 @@ impl SimDataReader where S::Error: core::fmt::Debug + 'static self.srcid } - pub async fn read_next(&mut self) -> Result, SimDataError> { + async fn read_next_event>(&mut self) -> Result>>> { + let event = StreamEvent::::from_rmp(&mut self.reader)?; + let delay = embassy_time::Duration::from_millis((event.timecode * 1000.0) as u64); + self.runtime += delay; + Timer::after(delay).await; + Ok(event.data.into()) + } + + pub async fn read_next(&mut self) -> Result, SimDataError>>> { if self.index < self.event_count { self.index += 1; // The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some() - match self.srcid { - SensorSource::IMU => self.read_motion().await, - SensorSource::GPS => self.read_gps().await, - SensorSource::Annotations => self.read_annotation().await, - srcid => unimplemented!("{srcid:?} is not a simulatable sensor!") - }.map(|x| { Some(x) }) + Ok(Some(match self.srcid { + SensorSource::IMU => self.read_next_event::().await?, + SensorSource::GPS => self.read_next_event::().await?, + SensorSource::Annotations => self.read_next_event::().await?, + srcid => unimplemented!("{srcid:?} is not a simulatable sensor yet!") + })) } else { Ok(None) } } - - fn verify_chunk_len(&mut self, length: u32) -> Result<(), SimDataError> { - let chunk_len = rmp::decode::read_array_len(&mut self.reader)?; - if chunk_len != length { - Err(SimDataError::InvalidChunkSize { expected: length as usize, found: chunk_len as usize }) - } else { - Ok(()) - } - } - - async fn read_delay_field(&mut self) -> Result<(), SimDataError> { - let timecode = rmp::decode::read_f64(&mut self.reader)?; - let delay = embassy_time::Duration::from_millis((timecode * 1000.0) as u64); - self.runtime += delay; - Timer::after(delay).await; - Ok(()) - } - - async fn read_annotation(&mut self) -> Result> { - self.verify_chunk_len(3)?; - self.read_delay_field().await?; - let mut buf = [0; 256]; - let msg = rmp::decode::read_str(&mut self.reader, &mut buf)?; - warn!("ANNOATION: {msg}"); - Ok(Measurement::Annotation) - } - - async fn read_motion(&mut self) -> Result> { - self.verify_chunk_len(7)?; - self.read_delay_field().await?; - let accel = Vector3::new( - rmp::decode::read_f64(&mut self.reader)? as f32, - rmp::decode::read_f64(&mut self.reader)? as f32, - rmp::decode::read_f64(&mut self.reader)? as f32, - ); - let gyro = Vector3::new( - rmp::decode::read_f64(&mut self.reader)? as f32, - rmp::decode::read_f64(&mut self.reader)? as f32, - rmp::decode::read_f64(&mut self.reader)? as f32, - ); - - Ok(Measurement::IMU { accel, gyro }) - } - - async fn read_gps(&mut self) -> Result> { - self.verify_chunk_len(3)?; - self.read_delay_field().await?; - let coords = Vector2::new( - rmp::decode::read_f64(&mut self.reader)?, - rmp::decode::read_f64(&mut self.reader)? - ); - - Ok(Measurement::GPS(Some(coords))) - } } #[derive(Debug)]