simulation: rewrite the simulation data processing to share a lot of the same code on both the build script and the chip, allowing for more general data streaming possibilities
This commit is contained in:
212
src/simdata.rs
212
src/simdata.rs
@@ -1,3 +1,32 @@
|
||||
use rmp::{Marker, decode::{ExtMeta, RmpRead, ValueReadError}, encode::{RmpWrite, ValueWriteError}};
|
||||
|
||||
pub trait RmpData: Sized {
|
||||
fn from_rmp<Reader: RmpRead>(reader: &mut Reader) -> Result<Self, SimDataError<ValueReadError<Reader::Error>>>;
|
||||
fn write_rmp<Writer: RmpWrite>(&self, writer: &mut Writer) -> Result<(), ValueWriteError<Writer::Error>>;
|
||||
}
|
||||
|
||||
pub trait EventRecord: RmpData {
|
||||
fn field_count() -> usize;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SimDataError<E> {
|
||||
StreamIndexMissing,
|
||||
InvalidChunkSize { expected: usize, found: usize },
|
||||
MissingTimecode,
|
||||
BadString,
|
||||
DecodeError(E),
|
||||
EndOfStream,
|
||||
UnsupportedStreamType(ExtMeta),
|
||||
EventHeaderMissing
|
||||
}
|
||||
|
||||
impl<E> From<E> for SimDataError<E> {
|
||||
fn from(value: E) -> Self {
|
||||
SimDataError::DecodeError(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum StreamType {
|
||||
IMU,
|
||||
@@ -26,4 +55,187 @@ impl From<StreamType> for i8 {
|
||||
StreamType::Annotations => 3
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct StreamIndex {
|
||||
pub count: usize
|
||||
}
|
||||
|
||||
impl RmpData for StreamIndex {
|
||||
fn from_rmp<Reader: RmpRead>(reader: &mut Reader) -> Result<Self, SimDataError<ValueReadError<Reader::Error>>> {
|
||||
rmp::decode::read_array_len(reader).map(|count| {
|
||||
Self {
|
||||
count: count as usize
|
||||
}
|
||||
}).map_err(|_| { SimDataError::StreamIndexMissing })
|
||||
}
|
||||
|
||||
fn write_rmp<Writer: RmpWrite>(&self, writer: &mut Writer) -> Result<(), ValueWriteError<Writer::Error>> {
|
||||
rmp::encode::write_array_len(writer, self.count as u32)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StreamHeader {
|
||||
pub id: StreamType,
|
||||
pub size: usize,
|
||||
}
|
||||
|
||||
impl RmpData for StreamHeader {
|
||||
fn from_rmp<Reader: RmpRead>(reader: &mut Reader) -> Result<Self, SimDataError<ValueReadError<Reader::Error>>> {
|
||||
let meta = rmp::decode::read_ext_meta(reader)?;
|
||||
if let Ok(id) = meta.typeid.try_into() {
|
||||
Ok(Self {
|
||||
id,
|
||||
size: meta.size as usize
|
||||
})
|
||||
} else {
|
||||
Err(SimDataError::UnsupportedStreamType(meta))
|
||||
}
|
||||
}
|
||||
|
||||
fn write_rmp<Writer: RmpWrite>(&self, writer: &mut Writer) -> Result<(), ValueWriteError<Writer::Error>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EventStreamHeader {
|
||||
pub count: usize
|
||||
}
|
||||
|
||||
impl RmpData for EventStreamHeader {
|
||||
fn from_rmp<Reader: RmpRead>(reader: &mut Reader) -> Result<Self, SimDataError<ValueReadError<Reader::Error>>> {
|
||||
Ok(Self {
|
||||
count: rmp::decode::read_array_len(reader)? as usize
|
||||
})
|
||||
}
|
||||
|
||||
fn write_rmp<Writer: RmpWrite>(&self, writer: &mut Writer) -> Result<(), ValueWriteError<Writer::Error>> {
|
||||
rmp::encode::write_array_len(writer, self.count as u32)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct StreamEvent<Event: EventRecord> {
|
||||
pub timecode: f64,
|
||||
pub data: Event
|
||||
}
|
||||
|
||||
impl<Event: EventRecord> RmpData for StreamEvent<Event> {
|
||||
fn from_rmp<Reader: RmpRead>(reader: &mut Reader) -> Result<Self, SimDataError<ValueReadError<Reader::Error>>> {
|
||||
let chunk_len = rmp::decode::read_array_len(reader).map_err(|_| { SimDataError::EventHeaderMissing })? as usize;
|
||||
// Add 1 to the field count for the timestamp
|
||||
if chunk_len != Event::field_count() + 1 {
|
||||
Err(SimDataError::InvalidChunkSize { expected: Event::field_count(), found: chunk_len })
|
||||
} else {
|
||||
let timecode = rmp::decode::read_f64(reader).map_err(|_| { SimDataError::MissingTimecode })?;
|
||||
Ok(Self {
|
||||
timecode,
|
||||
data: Event::from_rmp(reader)?
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn write_rmp<Writer: RmpWrite>(&self, writer: &mut Writer) -> Result<(), ValueWriteError<Writer::Error>> {
|
||||
rmp::encode::write_array_len(writer, Event::field_count() as u32 + 1)?;
|
||||
rmp::encode::write_f64(writer, self.timecode)?;
|
||||
self.data.write_rmp(writer)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GPSReading {
|
||||
pub lat: f64,
|
||||
pub lon: f64
|
||||
}
|
||||
|
||||
impl EventRecord for GPSReading {
|
||||
fn field_count() -> usize {
|
||||
2
|
||||
}
|
||||
}
|
||||
|
||||
impl EventRecord for IMUReading {
|
||||
fn field_count() -> usize {
|
||||
6
|
||||
}
|
||||
}
|
||||
|
||||
impl RmpData for GPSReading {
|
||||
fn from_rmp<Reader: RmpRead>(reader: &mut Reader) -> Result<Self, SimDataError<ValueReadError<Reader::Error>>> {
|
||||
Ok(Self {
|
||||
lat: rmp::decode::read_f64(reader)?,
|
||||
lon: rmp::decode::read_f64(reader)?
|
||||
})
|
||||
}
|
||||
|
||||
fn write_rmp<Writer: RmpWrite>(&self, writer: &mut Writer) -> Result<(), ValueWriteError<Writer::Error>> {
|
||||
rmp::encode::write_f64(writer, self.lat)?;
|
||||
rmp::encode::write_f64(writer, self.lon)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IMUReading {
|
||||
pub accel_x: f64,
|
||||
pub accel_y: f64,
|
||||
pub accel_z: f64,
|
||||
pub gyro_x: f64,
|
||||
pub gyro_y: f64,
|
||||
pub gyro_z: f64
|
||||
}
|
||||
|
||||
impl RmpData for IMUReading {
|
||||
fn from_rmp<Reader: RmpRead>(reader: &mut Reader) -> Result<Self, SimDataError<ValueReadError<Reader::Error>>> {
|
||||
Ok(Self {
|
||||
accel_x: rmp::decode::read_f64(reader)?,
|
||||
accel_y: rmp::decode::read_f64(reader)?,
|
||||
accel_z: rmp::decode::read_f64(reader)?,
|
||||
|
||||
gyro_x: rmp::decode::read_f64(reader)?,
|
||||
gyro_y: rmp::decode::read_f64(reader)?,
|
||||
gyro_z: rmp::decode::read_f64(reader)?,
|
||||
})
|
||||
}
|
||||
|
||||
fn write_rmp<Writer: RmpWrite>(&self, writer: &mut Writer) -> Result<(), ValueWriteError<Writer::Error>> {
|
||||
rmp::encode::write_f64(writer, self.accel_x)?;
|
||||
rmp::encode::write_f64(writer, self.accel_y)?;
|
||||
rmp::encode::write_f64(writer, self.accel_z)?;
|
||||
|
||||
rmp::encode::write_f64(writer, self.gyro_x)?;
|
||||
rmp::encode::write_f64(writer, self.gyro_y)?;
|
||||
rmp::encode::write_f64(writer, self.gyro_z)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct AnnotationReading {
|
||||
pub buf: [u8;32]
|
||||
}
|
||||
|
||||
impl RmpData for AnnotationReading {
|
||||
fn from_rmp<Reader: RmpRead>(reader: &mut Reader) -> Result<Self, SimDataError<ValueReadError<Reader::Error>>> {
|
||||
let mut buf = [0; 32];
|
||||
rmp::decode::read_str(reader, &mut buf).map_err(|_| { SimDataError::BadString })?;
|
||||
Ok(Self {
|
||||
buf
|
||||
})
|
||||
}
|
||||
|
||||
fn write_rmp<Writer: RmpWrite>(&self, writer: &mut Writer) -> Result<(), ValueWriteError<Writer::Error>> {
|
||||
rmp::encode::write_str(writer, core::str::from_utf8(&self.buf).unwrap())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl EventRecord for AnnotationReading {
|
||||
fn field_count() -> usize {
|
||||
1
|
||||
}
|
||||
}
|
||||
@@ -9,9 +9,9 @@ use esp_bootloader_esp_idf::partitions::PartitionTable;
|
||||
use esp_storage::FlashStorage;
|
||||
use nalgebra::{Vector2, Vector3};
|
||||
use log::*;
|
||||
use rmp::decode::{DecodeStringError, RmpRead, RmpReadErr, ValueReadError};
|
||||
use rmp::decode::{RmpRead, RmpReadErr, ValueReadError};
|
||||
|
||||
use crate::{events::{Measurement, SensorSource, SensorState}, simdata::StreamType};
|
||||
use crate::{events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SharedFlash<S> {
|
||||
@@ -58,7 +58,7 @@ pub struct SimDataTable<S> {
|
||||
index: usize
|
||||
}
|
||||
|
||||
impl<S: ReadStorage + Clone> SimDataTable<S> where S::Error: core::fmt::Debug {
|
||||
impl<S: ReadStorage + Clone> SimDataTable<S> where S::Error: core::fmt::Debug + 'static {
|
||||
|
||||
pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result<Self, SimDataError<S>> {
|
||||
let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data(
|
||||
@@ -73,11 +73,11 @@ impl<S: ReadStorage + Clone> SimDataTable<S> where S::Error: core::fmt::Debug {
|
||||
let end = data_partition.len() as usize + start;
|
||||
info!("Opening simulation data at {start:#02x}:{end:#02x}");
|
||||
let mut reader = RangeReader::new(storage.clone(), start, end);
|
||||
if let Ok(count) = rmp::decode::read_array_len(&mut reader) {
|
||||
info!("Found {count} streams of simulation data");
|
||||
if let Ok(index) = StreamIndex::from_rmp(&mut reader) {
|
||||
info!("Found stream index: {index:?}");
|
||||
Ok(Self {
|
||||
reader,
|
||||
count: count as usize,
|
||||
count: index.count,
|
||||
index: 0
|
||||
})
|
||||
} else {
|
||||
@@ -95,17 +95,19 @@ impl<S: ReadStorage + Clone + core::fmt::Debug> Iterator for SimDataTable<S> whe
|
||||
None
|
||||
} else {
|
||||
loop {
|
||||
match rmp::decode::read_ext_meta(&mut self.reader) {
|
||||
Ok(this_type) => {
|
||||
let sensor_reader = self.reader.subset(this_type.size as usize);
|
||||
self.reader.seek(this_type.size as usize);
|
||||
match StreamHeader::from_rmp(&mut self.reader) {
|
||||
Ok(header) => {
|
||||
let sensor_reader = self.reader.subset(header.size as usize);
|
||||
self.reader.seek(header.size as usize);
|
||||
self.index += 1;
|
||||
debug!("Found type={this_type:?}");
|
||||
match this_type.typeid.try_into() {
|
||||
Err(_) => error!("Found unknown simulation data chunk {this_type:?}"),
|
||||
Ok(stream_type) => return Some(SimDataReader::open(sensor_reader, stream_type))
|
||||
}
|
||||
debug!("Found header={header:?}");
|
||||
return Some(SimDataReader::open(sensor_reader, header.id));
|
||||
},
|
||||
Err(SimDataError::UnsupportedStreamType(meta)) => {
|
||||
error!("Found unknown simulation data chunk {meta:?}");
|
||||
self.reader.seek(meta.size as usize);
|
||||
self.index += 1;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Read error while decoding simulation data {err:?}");
|
||||
return None;
|
||||
@@ -172,7 +174,6 @@ impl<S: ReadStorage> RmpRead for RangeReader<S> where S::Error: core::fmt::Debug
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub struct SimDataReader<S> {
|
||||
reader: RangeReader<S>,
|
||||
srcid: SensorSource,
|
||||
@@ -181,33 +182,32 @@ pub struct SimDataReader<S> {
|
||||
index: usize
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SimDataError<S: ReadStorage> where S::Error: core::fmt::Debug + 'static {
|
||||
StreamIndexMissing,
|
||||
InvalidChunkSize { expected: usize, found: usize },
|
||||
MissingTimecode,
|
||||
BadString,
|
||||
DecodeError(ValueReadError<RangeReadError<S::Error>>),
|
||||
EndOfStream
|
||||
}
|
||||
|
||||
impl<S: ReadStorage> From<ValueReadError<RangeReadError<S::Error>>> for SimDataError<S> where S::Error: core::fmt::Debug {
|
||||
fn from(value: ValueReadError<RangeReadError<S::Error>>) -> Self {
|
||||
SimDataError::DecodeError(value)
|
||||
impl From<IMUReading> for Measurement {
|
||||
fn from(value: IMUReading) -> Self {
|
||||
Measurement::IMU {
|
||||
accel: Vector3::new(value.accel_x as f32, value.accel_y as f32, value.accel_z as f32),
|
||||
gyro: Vector3::new(value.gyro_x as f32, value.gyro_y as f32, value.gyro_z as f32)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GPSReading> for Measurement {
|
||||
fn from(value: GPSReading) -> Self {
|
||||
Measurement::GPS(Some(Vector2::new(value.lat, value.lon)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: ReadStorage> From<DecodeStringError<'_, RangeReadError<S::Error>>> for SimDataError<S> where S::Error: core::fmt::Debug {
|
||||
fn from(value: DecodeStringError<'_, RangeReadError<S::Error>>) -> Self {
|
||||
SimDataError::BadString
|
||||
impl From<AnnotationReading> for Measurement {
|
||||
fn from(value: AnnotationReading) -> Self {
|
||||
warn!("ANNOTATION: {}", core::str::from_utf8(&value.buf).unwrap());
|
||||
Measurement::Annotation
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: ReadStorage> SimDataReader<S> where S::Error: core::fmt::Debug + 'static {
|
||||
pub fn open(mut reader: RangeReader<S>, stream_type: StreamType) -> Self {
|
||||
debug!("Opening {stream_type:?} sim data chunk");
|
||||
let event_count = rmp::decode::read_array_len(&mut reader).unwrap() as usize;
|
||||
let event_count = EventStreamHeader::from_rmp(&mut reader).unwrap().count;
|
||||
debug!("Found {event_count} events!");
|
||||
Self {
|
||||
reader,
|
||||
@@ -222,74 +222,28 @@ impl<S: ReadStorage> SimDataReader<S> where S::Error: core::fmt::Debug + 'static
|
||||
self.srcid
|
||||
}
|
||||
|
||||
pub async fn read_next(&mut self) -> Result<Option<Measurement>, SimDataError<S>> {
|
||||
async fn read_next_event<T: EventRecord + Into<Measurement>>(&mut self) -> Result<Measurement, SimDataError<ValueReadError<RangeReadError<S::Error>>>> {
|
||||
let event = StreamEvent::<T>::from_rmp(&mut self.reader)?;
|
||||
let delay = embassy_time::Duration::from_millis((event.timecode * 1000.0) as u64);
|
||||
self.runtime += delay;
|
||||
Timer::after(delay).await;
|
||||
Ok(event.data.into())
|
||||
}
|
||||
|
||||
pub async fn read_next(&mut self) -> Result<Option<Measurement>, SimDataError<ValueReadError<RangeReadError<S::Error>>>> {
|
||||
if self.index < self.event_count {
|
||||
self.index += 1;
|
||||
// The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some()
|
||||
match self.srcid {
|
||||
SensorSource::IMU => self.read_motion().await,
|
||||
SensorSource::GPS => self.read_gps().await,
|
||||
SensorSource::Annotations => self.read_annotation().await,
|
||||
srcid => unimplemented!("{srcid:?} is not a simulatable sensor!")
|
||||
}.map(|x| { Some(x) })
|
||||
Ok(Some(match self.srcid {
|
||||
SensorSource::IMU => self.read_next_event::<IMUReading>().await?,
|
||||
SensorSource::GPS => self.read_next_event::<GPSReading>().await?,
|
||||
SensorSource::Annotations => self.read_next_event::<AnnotationReading>().await?,
|
||||
srcid => unimplemented!("{srcid:?} is not a simulatable sensor yet!")
|
||||
}))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn verify_chunk_len(&mut self, length: u32) -> Result<(), SimDataError<S>> {
|
||||
let chunk_len = rmp::decode::read_array_len(&mut self.reader)?;
|
||||
if chunk_len != length {
|
||||
Err(SimDataError::InvalidChunkSize { expected: length as usize, found: chunk_len as usize })
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_delay_field(&mut self) -> Result<(), SimDataError<S>> {
|
||||
let timecode = rmp::decode::read_f64(&mut self.reader)?;
|
||||
let delay = embassy_time::Duration::from_millis((timecode * 1000.0) as u64);
|
||||
self.runtime += delay;
|
||||
Timer::after(delay).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_annotation(&mut self) -> Result<Measurement, SimDataError<S>> {
|
||||
self.verify_chunk_len(3)?;
|
||||
self.read_delay_field().await?;
|
||||
let mut buf = [0; 256];
|
||||
let msg = rmp::decode::read_str(&mut self.reader, &mut buf)?;
|
||||
warn!("ANNOATION: {msg}");
|
||||
Ok(Measurement::Annotation)
|
||||
}
|
||||
|
||||
async fn read_motion(&mut self) -> Result<Measurement, SimDataError<S>> {
|
||||
self.verify_chunk_len(7)?;
|
||||
self.read_delay_field().await?;
|
||||
let accel = Vector3::new(
|
||||
rmp::decode::read_f64(&mut self.reader)? as f32,
|
||||
rmp::decode::read_f64(&mut self.reader)? as f32,
|
||||
rmp::decode::read_f64(&mut self.reader)? as f32,
|
||||
);
|
||||
let gyro = Vector3::new(
|
||||
rmp::decode::read_f64(&mut self.reader)? as f32,
|
||||
rmp::decode::read_f64(&mut self.reader)? as f32,
|
||||
rmp::decode::read_f64(&mut self.reader)? as f32,
|
||||
);
|
||||
|
||||
Ok(Measurement::IMU { accel, gyro })
|
||||
}
|
||||
|
||||
async fn read_gps(&mut self) -> Result<Measurement, SimDataError<S>> {
|
||||
self.verify_chunk_len(3)?;
|
||||
self.read_delay_field().await?;
|
||||
let coords = Vector2::new(
|
||||
rmp::decode::read_f64(&mut self.reader)?,
|
||||
rmp::decode::read_f64(&mut self.reader)?
|
||||
);
|
||||
|
||||
Ok(Measurement::GPS(Some(coords)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
Reference in New Issue
Block a user