tasks: simulation: restructure sim data storage to better support the next step of resampling data to fit the configured partition

This commit is contained in:
2025-12-21 18:41:35 +01:00
parent 83ecdb61b2
commit ea851f4b5a
5 changed files with 222 additions and 90 deletions

View File

@@ -1,5 +1,5 @@
use std::fs; use std::fs;
use std::io::{Read, Seek, Write}; use std::io::{Read, Write};
use std::path::Path; use std::path::Path;
use std::fs::File; use std::fs::File;
use image::GenericImageView; use image::GenericImageView;
@@ -78,7 +78,10 @@ fn write_sim_data() {
println!("cargo::rerun-if-changed={}", gps_input.to_str().unwrap()); println!("cargo::rerun-if-changed={}", gps_input.to_str().unwrap());
if !gps_output.exists() || gps_output.metadata().unwrap().modified().unwrap() < gps_input.metadata().unwrap().modified().unwrap() { if !gps_output.exists() || gps_output.metadata().unwrap().modified().unwrap() < gps_input.metadata().unwrap().modified().unwrap() {
let mut gps_data = Reader::from_reader(File::open(gps_input.clone()).unwrap());
let record_count = gps_data.records().count() as u32;
let mut gps_data = Reader::from_reader(File::open(gps_input).unwrap()); let mut gps_data = Reader::from_reader(File::open(gps_input).unwrap());
let headers = gps_data.headers().unwrap(); let headers = gps_data.headers().unwrap();
let (timestamp_idx, lat_idx, lon_idx) = ( let (timestamp_idx, lat_idx, lon_idx) = (
headers.iter().position(|x| { x == "seconds_elapsed" }).unwrap(), headers.iter().position(|x| { x == "seconds_elapsed" }).unwrap(),
@@ -86,6 +89,7 @@ fn write_sim_data() {
headers.iter().position(|x| { x == "latitude" }).unwrap(), headers.iter().position(|x| { x == "latitude" }).unwrap(),
); );
let mut gps_output = File::create(gps_output.clone()).unwrap(); let mut gps_output = File::create(gps_output.clone()).unwrap();
rmp::encode::write_array_len(&mut gps_output, record_count).unwrap();
let mut last_stamp = 0.0; let mut last_stamp = 0.0;
for record in gps_data.records().flatten() { for record in gps_data.records().flatten() {
let (timestamp, lat, lon) = ( let (timestamp, lat, lon) = (
@@ -113,8 +117,12 @@ fn write_sim_data() {
} }
}; };
if rebuild_motion { if rebuild_motion {
let mut accel_data = Reader::from_reader(File::open(accel_input).unwrap()); let mut accel_data = Reader::from_reader(File::open(accel_input.clone()).unwrap());
let mut gyro_data = Reader::from_reader(File::open(gyro_input).unwrap()); let mut gyro_data = Reader::from_reader(File::open(gyro_input).unwrap());
let record_count = accel_data.records().count() as u32;
let mut accel_data = Reader::from_reader(File::open(accel_input).unwrap());
let headers = accel_data.headers().unwrap(); let headers = accel_data.headers().unwrap();
let (timestamp_idx, accel_x_idx, accel_y_idx, accel_z_idx) = ( let (timestamp_idx, accel_x_idx, accel_y_idx, accel_z_idx) = (
headers.iter().position(|x| { x == "seconds_elapsed" }).unwrap(), headers.iter().position(|x| { x == "seconds_elapsed" }).unwrap(),
@@ -131,6 +139,7 @@ fn write_sim_data() {
); );
let mut motion_output = File::create(motion_output.clone()).unwrap(); let mut motion_output = File::create(motion_output.clone()).unwrap();
rmp::encode::write_array_len(&mut motion_output, record_count).unwrap();
let mut last_stamp = 0.0; let mut last_stamp = 0.0;
for (accel_record, gyro_record) in accel_data.records().flatten().zip(gyro_data.records().flatten()) { for (accel_record, gyro_record) in accel_data.records().flatten().zip(gyro_data.records().flatten()) {
let (timestamp, accel_x, accel_y, accel_z) = ( let (timestamp, accel_x, accel_y, accel_z) = (
@@ -162,6 +171,10 @@ fn write_sim_data() {
// GPS data = 2, motion data = 1 // GPS data = 2, motion data = 1
let mut unified_fd = File::create(unified_output.clone()).unwrap(); let mut unified_fd = File::create(unified_output.clone()).unwrap();
// Write out the stream index, which will be 2 (motion + gps)
rmp::encode::write_array_len(&mut unified_fd, 2).unwrap();
let mut motion_output = File::open(motion_output).unwrap(); let mut motion_output = File::open(motion_output).unwrap();
let mut gps_output = File::open(gps_output).unwrap(); let mut gps_output = File::open(gps_output).unwrap();
rmp::encode::write_ext_meta(&mut unified_fd, motion_output.metadata().unwrap().len() as u32, 1).unwrap(); rmp::encode::write_ext_meta(&mut unified_fd, motion_output.metadata().unwrap().len() as u32, 1).unwrap();

View File

@@ -130,15 +130,9 @@ async fn main(spawner: Spawner) {
let mut storage = SharedFlash::new(FlashStorage::new()); let mut storage = SharedFlash::new(FlashStorage::new());
let mut buf = [8; 1024]; let mut buf = [8; 1024];
let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut buf).unwrap(); let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut buf).unwrap();
let data_partition = partitions.find_partition( for sim_data in SimDataTable::open(storage, partitions).expect("Could not find partition for sim data!") {
esp_bootloader_esp_idf::partitions::PartitionType::Data(
esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined
)).expect("Unable to read partition table").expect("Could not find data partition!");
let data_offset = data_partition.offset() as usize;
info!("Loading simulation data starting at {data_offset:#02x}");
for sim_data in SimDataTable::open(storage.clone(), data_offset) {
info!("Found simulation data for {:?}", sim_data.srcid()); info!("Found simulation data for {:?}", sim_data.srcid());
if spawner.spawn(renderbug_embassy::tasks::simulation::simulation_task(sim_data, garage.motion.dyn_sender())).is_err() { if spawner.spawn(renderbug_embassy::tasks::simulation::simulation_task(sim_data, measurements.dyn_sender())).is_err() {
error!("Unable to spawn simulation task! Increase the task pool size."); error!("Unable to spawn simulation task! Increase the task pool size.");
} }
} }

View File

@@ -43,7 +43,8 @@ pub enum Measurement {
SensorHardwareStatus(SensorSource, SensorState), SensorHardwareStatus(SensorSource, SensorState),
// Simulation metadata updates // Simulation metadata updates
SimulationProgress(SensorSource, Duration, f32) SimulationProgress(SensorSource, Duration, f32),
Annotation
} }
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
@@ -83,27 +84,30 @@ pub enum Telemetry {
// GPS data = 2, motion data = 1 // GPS data = 2, motion data = 1
#[derive(Debug, EnumSetType, Enum)] #[derive(Debug, EnumSetType, Enum)]
pub enum SensorSource { pub enum SensorSource {
Unknown = 0,
// Real hardware // Real hardware
IMU = 1, IMU = 1,
GPS = 2, GPS = 2,
// Fusion outputs // Fusion outputs
GravityReference, GravityReference = 100,
ForwardsReference, ForwardsReference,
Location, Location,
// Simulated sensors // Simulated sensors
Demo, Demo,
Simulation Simulation,
Annotations = 3
} }
impl From<i8> for SensorSource { impl TryFrom<i8> for SensorSource {
fn from(value: i8) -> Self { type Error = ();
fn try_from(value: i8) -> Result<Self, Self::Error> {
match value { match value {
1 => SensorSource::IMU, 1 => Ok(SensorSource::IMU),
2 => SensorSource::GPS, 2 => Ok(SensorSource::GPS),
_ => SensorSource::Unknown 3 => Ok(SensorSource::Annotations),
_ => Err(())
} }
} }
} }

View File

@@ -29,6 +29,7 @@ pub async fn motion_task(src: DynamicReceiver<'static, Measurement>, ui_sink: Dy
ui_sink.publish(Notification::SensorStatus(source, state)).await; ui_sink.publish(Notification::SensorStatus(source, state)).await;
}, },
Measurement::SimulationProgress(source, duration, _pct) => debug!("{source:?} simulation time: {}", duration.as_secs()), Measurement::SimulationProgress(source, duration, _pct) => debug!("{source:?} simulation time: {}", duration.as_secs()),
Measurement::Annotation => ()
} }
} }
} }

View File

@@ -1,17 +1,19 @@
use core::cell::RefCell; use core::cell::RefCell;
use core::fmt::Formatter; use core::fmt::Formatter;
use alloc::rc::Rc; use alloc::{collections::btree_map::Range, rc::Rc};
use embassy_sync::channel::DynamicSender; use embassy_sync::channel::DynamicSender;
use embassy_time::{Duration, Timer}; use embassy_time::{Duration, Timer};
use embedded_storage::{ReadStorage, Storage}; use embedded_storage::{ReadStorage, Storage};
use esp_bootloader_esp_idf::partitions::{PartitionEntry, PartitionTable};
use esp_storage::FlashStorage; use esp_storage::FlashStorage;
use nalgebra::{Vector2, Vector3}; use nalgebra::{Vector2, Vector3};
use log::*; use log::*;
use rmp::decode::{RmpRead, RmpReadErr}; use rmp::decode::{DecodeStringError, RmpRead, RmpReadErr, ValueReadError};
use crate::events::{Measurement, SensorSource, SensorState}; use crate::events::{Measurement, SensorSource, SensorState};
#[derive(Debug)]
pub struct SharedFlash<S> { pub struct SharedFlash<S> {
storage: Rc<RefCell<S>> storage: Rc<RefCell<S>>
} }
@@ -51,85 +53,168 @@ impl<S: ReadStorage> ReadStorage for SharedFlash<S> {
} }
pub struct SimDataTable<S> { pub struct SimDataTable<S> {
storage: S, reader: RangeReader<S>,
reader: OffsetReader<S>, count: usize,
index: usize
} }
impl<S: ReadStorage + Clone> SimDataTable<S> { impl<S: ReadStorage + Clone> SimDataTable<S> where S::Error: core::fmt::Debug {
pub fn open(storage: S, offset: usize) -> Self {
debug!("Opening simulation data at {offset:#02x}"); pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result<Self, SimDataError<S>> {
Self { let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data(
reader: OffsetReader::new(storage.clone(), offset), esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined,
storage );
info!("Searching for sim data partition");
let data_partition = partitions.iter().find(|partition| {
partition.partition_type() == partition_type && partition.label_as_str() == "sim"
}).expect("Unable to locate 'sim' data partition!");
let start = data_partition.offset() as usize;
let end = data_partition.len() as usize + start;
info!("Opening simulation data at {start:#02x}:{end:#02x}");
let mut reader = RangeReader::new(storage.clone(), start, end);
if let Ok(count) = rmp::decode::read_array_len(&mut reader) {
info!("Found {count} streams of simulation data");
Ok(Self {
reader,
count: count as usize,
index: 0
})
} else {
error!("Stream index is missing! Have you flashed the partition yet?");
Err(SimDataError::StreamIndexMissing)
} }
} }
} }
impl<S: ReadStorage + Clone> Iterator for SimDataTable<S> { impl<S: ReadStorage + Clone + core::fmt::Debug> Iterator for SimDataTable<S> where S::Error: core::fmt::Debug + 'static {
type Item = SimDataReader<S>; type Item = SimDataReader<S>;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
if let Ok(this_type) = rmp::decode::read_ext_meta(&mut self.reader) { if self.index >= self.count {
debug!("Found type={this_type:?} offset={:#02x}", self.reader.pos());
let start_pos = self.reader.pos();
self.reader.offset += this_type.size as usize;
match this_type.typeid.into() {
SensorSource::Unknown => panic!("Unsupported sensor type"),
srcid => Some(SimDataReader::open(self.storage.clone(), start_pos, srcid))
}
} else {
None None
} else {
loop {
match rmp::decode::read_ext_meta(&mut self.reader) {
Ok(this_type) => {
let sensor_reader = self.reader.subset(this_type.size as usize);
self.reader.seek(this_type.size as usize);
self.index += 1;
debug!("Found type={this_type:?}");
match this_type.typeid.try_into() {
Err(()) => error!("Found unknown simulation data chunk {this_type:?}"),
Ok(srcid) => return Some(SimDataReader::open(sensor_reader, srcid))
}
},
Err(err) => {
error!("Read error while decoding simulation data {err:?}");
return None;
}
}
}
} }
} }
} }
struct OffsetReader<S> { #[derive(Debug)]
pub struct RangeReader<S> {
storage: S, storage: S,
start: usize,
end: usize,
offset: usize offset: usize
} }
impl<S: ReadStorage> OffsetReader<S> { impl<S: ReadStorage> RangeReader<S> {
pub const fn new(storage: S, offset: usize) -> Self { pub const fn new(storage: S, start: usize, end: usize) -> Self {
assert!(start <= end);
// TODO: Should add bounds checking since we will know the size of the chunk already // TODO: Should add bounds checking since we will know the size of the chunk already
Self { Self {
storage, storage,
offset start,
end,
offset: 0
} }
} }
pub const fn pos(&self) -> usize { pub fn seek(&mut self, offset: usize) {
self.offset self.offset += offset;
}
pub fn subset(&self, size: usize) -> Self where S: Clone + core::fmt::Debug {
trace!("subset {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, self.start + self.offset, self.start + self.offset + size);
assert!(self.start + self.offset + size < self.end);
Self {
storage: self.storage.clone(),
start: self.offset + self.start,
end: self.start + self.offset + size,
offset: 0
}
} }
} }
impl<S: ReadStorage> RmpRead for OffsetReader<S> { impl<S: ReadStorage> RmpRead for RangeReader<S> where S::Error: core::fmt::Debug + 'static {
type Error = RmpErr; type Error = RangeReadError<S::Error>;
fn read_exact_buf(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> { fn read_exact_buf(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> {
match self.storage.read(self.offset as u32, buf) { let pos = self.start + self.offset;
if pos > self.end {
Err(RangeReadError::OutOfData)
} else {
assert!(pos + buf.len() <= self.end);
match self.storage.read(pos as u32, buf) {
Ok(_) => { Ok(_) => {
self.offset += buf.len(); self.offset += buf.len();
Ok(()) Ok(())
}, },
_ => Err(RmpErr{}) Err(err) => Err(RangeReadError::Storage(err))
}
} }
} }
} }
pub struct SimDataReader<S> { pub struct SimDataReader<S> {
reader: OffsetReader<S>, reader: RangeReader<S>,
srcid: SensorSource, srcid: SensorSource,
runtime: Duration runtime: Duration,
event_count: usize,
index: usize
} }
impl<S: ReadStorage> SimDataReader<S> { #[derive(Debug)]
pub fn open(storage: S, offset: usize, srcid: SensorSource) -> Self { pub enum SimDataError<S: ReadStorage> where S::Error: core::fmt::Debug + 'static {
debug!("Opening {srcid:?} sim data chunk at {offset:#02x}"); StreamIndexMissing,
InvalidChunkSize { expected: usize, found: usize },
MissingTimecode,
BadString,
DecodeError(ValueReadError<RangeReadError<S::Error>>),
EndOfStream
}
impl<S: ReadStorage> From<ValueReadError<RangeReadError<S::Error>>> for SimDataError<S> where S::Error: core::fmt::Debug {
fn from(value: ValueReadError<RangeReadError<S::Error>>) -> Self {
SimDataError::DecodeError(value)
}
}
impl<S: ReadStorage> From<DecodeStringError<'_, RangeReadError<S::Error>>> for SimDataError<S> where S::Error: core::fmt::Debug {
fn from(value: DecodeStringError<'_, RangeReadError<S::Error>>) -> Self {
SimDataError::BadString
}
}
impl<S: ReadStorage> SimDataReader<S> where S::Error: core::fmt::Debug + 'static {
pub fn open(mut reader: RangeReader<S>, srcid: SensorSource) -> Self {
debug!("Opening {srcid:?} sim data chunk");
let event_count = rmp::decode::read_array_len(&mut reader).unwrap() as usize;
debug!("Found {event_count} events!");
Self { Self {
reader: OffsetReader::new(storage, offset), reader,
srcid, srcid,
runtime: Default::default() runtime: Default::default(),
event_count,
index: 0
} }
} }
@@ -137,48 +222,70 @@ impl<S: ReadStorage> SimDataReader<S> {
self.srcid self.srcid
} }
pub async fn read_next(&mut self) -> Result<Measurement, ()> { pub async fn read_next(&mut self) -> Result<Option<Measurement>, SimDataError<S>> {
if self.index < self.event_count {
self.index += 1;
// The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some()
match self.srcid { match self.srcid {
SensorSource::IMU => self.read_motion().await, SensorSource::IMU => self.read_motion().await,
SensorSource::GPS => self.read_gps().await, SensorSource::GPS => self.read_gps().await,
SensorSource::Annotations => self.read_annotation().await,
srcid => unimplemented!("{srcid:?} is not a simulatable sensor!") srcid => unimplemented!("{srcid:?} is not a simulatable sensor!")
}.map(|x| { Some(x) })
} else {
Ok(None)
} }
} }
fn verify_chunk_len(&mut self, length: u32) { fn verify_chunk_len(&mut self, length: u32) -> Result<(), SimDataError<S>> {
let chunk_len = rmp::decode::read_array_len(&mut self.reader).expect("Could not find the chunk length!"); let chunk_len = rmp::decode::read_array_len(&mut self.reader)?;
assert_eq!(chunk_len, length, "Expected {length} fields but instead found {chunk_len}"); if chunk_len != length {
Err(SimDataError::InvalidChunkSize { expected: length as usize, found: chunk_len as usize })
} else {
Ok(())
}
} }
async fn read_delay_field(&mut self) { async fn read_delay_field(&mut self) -> Result<(), SimDataError<S>> {
let delay = embassy_time::Duration::from_millis((rmp::decode::read_f64(&mut self.reader).expect("Expected to find timestamp") * 1000.0) as u64); let timecode = rmp::decode::read_f64(&mut self.reader)?;
let delay = embassy_time::Duration::from_millis((timecode * 1000.0) as u64);
self.runtime += delay; self.runtime += delay;
Timer::after(delay).await Timer::after(delay).await;
Ok(())
} }
async fn read_motion(&mut self) -> Result<Measurement, ()> { async fn read_annotation(&mut self) -> Result<Measurement, SimDataError<S>> {
self.verify_chunk_len(7); self.verify_chunk_len(3)?;
self.read_delay_field().await; self.read_delay_field().await?;
let mut buf = [0; 256];
let msg = rmp::decode::read_str(&mut self.reader, &mut buf)?;
warn!("ANNOATION: {msg}");
Ok(Measurement::Annotation)
}
async fn read_motion(&mut self) -> Result<Measurement, SimDataError<S>> {
self.verify_chunk_len(7)?;
self.read_delay_field().await?;
let accel = Vector3::new( let accel = Vector3::new(
rmp::decode::read_f64(&mut self.reader).unwrap() as f32, rmp::decode::read_f64(&mut self.reader)? as f32,
rmp::decode::read_f64(&mut self.reader).unwrap() as f32, rmp::decode::read_f64(&mut self.reader)? as f32,
rmp::decode::read_f64(&mut self.reader).unwrap() as f32, rmp::decode::read_f64(&mut self.reader)? as f32,
); );
let gyro = Vector3::new( let gyro = Vector3::new(
rmp::decode::read_f64(&mut self.reader).unwrap() as f32, rmp::decode::read_f64(&mut self.reader)? as f32,
rmp::decode::read_f64(&mut self.reader).unwrap() as f32, rmp::decode::read_f64(&mut self.reader)? as f32,
rmp::decode::read_f64(&mut self.reader).unwrap() as f32, rmp::decode::read_f64(&mut self.reader)? as f32,
); );
Ok(Measurement::IMU { accel, gyro }) Ok(Measurement::IMU { accel, gyro })
} }
async fn read_gps(&mut self) -> Result<Measurement, ()> { async fn read_gps(&mut self) -> Result<Measurement, SimDataError<S>> {
self.verify_chunk_len(3); self.verify_chunk_len(3)?;
self.read_delay_field().await; self.read_delay_field().await?;
let coords = Vector2::new( let coords = Vector2::new(
rmp::decode::read_f64(&mut self.reader).unwrap(), rmp::decode::read_f64(&mut self.reader)?,
rmp::decode::read_f64(&mut self.reader).unwrap() rmp::decode::read_f64(&mut self.reader)?
); );
Ok(Measurement::GPS(Some(coords))) Ok(Measurement::GPS(Some(coords)))
@@ -186,10 +293,13 @@ impl<S: ReadStorage> SimDataReader<S> {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct RmpErr{} pub enum RangeReadError<E> {
impl RmpReadErr for RmpErr {} OutOfData,
Storage(E)
}
impl<E: core::fmt::Debug + 'static> RmpReadErr for RangeReadError<E> {}
impl core::fmt::Display for RmpErr { impl<E> core::fmt::Display for RangeReadError<E> {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.write_str("RmpErr") f.write_str("RmpErr")
} }
@@ -203,8 +313,18 @@ pub async fn simulation_task(mut reader: SimDataReader<SharedFlash<FlashStorage>
events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Online)).await; events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Online)).await;
// TODO: SimulationProgress updates // TODO: SimulationProgress updates
while let Ok(next_evt) = reader.read_next().await { loop {
events.send(next_evt).await; match reader.read_next().await {
Ok(Some(next_evt)) => events.send(next_evt).await,
Ok(None) => {
warn!("End of simulation data stream");
break
}
Err(err) => {
warn!("Error during sensor stream: {err:?}");
break
}
}
} }
events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Offline)).await; events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Offline)).await;