storage: implement storage writing

This commit is contained in:
2026-03-09 10:20:00 +01:00
parent caefdfe131
commit 96e128ac67
3 changed files with 139 additions and 47 deletions

View File

@@ -1,23 +1,25 @@
use core::usize;
use embassy_sync::channel::DynamicSender;
use embassy_time::{Duration, Timer};
use embedded_storage::ReadStorage;
use embedded_storage::{ReadStorage, Storage};
use esp_bootloader_esp_idf::partitions::PartitionTable;
use esp_hal::time::Instant;
use esp_storage::FlashStorage;
use figments::liber8tion::interpolate::Fract8;
use nalgebra::{Vector2, Vector3};
use log::*;
use rmp::decode::ValueReadError;
use rmp::{decode::ValueReadError, encode::ValueWriteError};
use crate::{Breaker, events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}, storage::{RangeReadError, RangeReader, SharedFlash}};
use crate::{Breaker, events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, BundleEventHeader, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}, storage::{SharedFlash, StorageRange, StorageRangeError}};
pub struct SimDataTable<S> {
reader: RangeReader<S>,
reader: StorageRange<S>,
count: usize,
index: usize
}
impl<S: ReadStorage + Clone> SimDataTable<S> where S::Error: core::fmt::Debug + 'static {
impl<S: ReadStorage> SimDataTable<S> where S::Error: core::fmt::Debug + 'static {
pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result<Self, SimDataError<S>> {
let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data(
esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined,
@@ -30,7 +32,7 @@ impl<S: ReadStorage + Clone> SimDataTable<S> where S::Error: core::fmt::Debug +
let start = data_partition.offset() as usize;
let end = data_partition.len() as usize + start;
info!("Opening simulation data at {start:#02x}:{end:#02x}");
let mut reader = RangeReader::new(storage.clone(), start, end);
let mut reader = StorageRange::new(storage, start, end);
if let Ok(index) = StreamIndex::from_rmp(&mut reader) {
info!("Found stream index: {index:?}");
Ok(Self {
@@ -55,9 +57,11 @@ impl<S: ReadStorage + Clone + core::fmt::Debug> Iterator for SimDataTable<S> whe
loop {
match StreamHeader::from_rmp(&mut self.reader) {
Ok(header) => {
match self.reader.subset(header.size) {
info!("Found stream header: {header:?}");
let stream_size = if header.size == 0 { self.reader.capacity() - self.reader.pos() } else { header.size };
match self.reader.subset(stream_size) {
Ok(sensor_reader) => {
self.reader.seek(header.size).unwrap_or_else(|err| {
self.reader.seek(stream_size).unwrap_or_else(|err| {
error!("Simulation data appears bigger than the storage capacity: {err:?}")
});
self.index += 1;
@@ -88,8 +92,8 @@ impl<S: ReadStorage + Clone + core::fmt::Debug> Iterator for SimDataTable<S> whe
}
pub struct SimDataReader<S> {
reader: RangeReader<S>,
srcid: SensorSource,
reader: StorageRange<S>,
srcid: StreamType,
runtime: Duration,
event_count: usize,
index: usize
@@ -118,39 +122,44 @@ impl From<AnnotationReading> for Measurement {
}
impl<S: ReadStorage> SimDataReader<S> where S::Error: core::fmt::Debug + 'static {
pub fn open(mut reader: RangeReader<S>, stream_type: StreamType) -> Self {
pub fn open(mut reader: StorageRange<S>, stream_type: StreamType) -> Self {
debug!("Opening {stream_type:?} sim data chunk");
let event_count = EventStreamHeader::from_rmp(&mut reader).unwrap().count;
let event_count = if stream_type != StreamType::Bundle { EventStreamHeader::from_rmp(&mut reader).unwrap().count } else { usize::MAX };
debug!("Found {event_count} events!");
Self {
reader,
srcid: stream_type.into(),
srcid: stream_type,
runtime: Default::default(),
event_count,
index: 0
}
}
pub fn srcid(&self) -> SensorSource {
pub fn srcid(&self) -> StreamType {
self.srcid
}
async fn read_next_event<T: EventRecord + Into<Measurement>>(&mut self) -> Result<Measurement, SimDataError<ValueReadError<RangeReadError<S::Error>>>> {
async fn read_next_event<T: EventRecord + Into<Measurement>>(&mut self) -> Result<Measurement, SimDataError<ValueReadError<StorageRangeError<S::Error>>>> {
let event = StreamEvent::<T>::from_rmp(&mut self.reader)?;
let delay = embassy_time::Duration::from_millis((event.timecode * 1000.0) as u64);
self.runtime += delay;
info!("waiting {delay}");
Timer::after(delay).await;
Ok(event.data.into())
}
pub async fn read_next(&mut self) -> Result<Option<Measurement>, SimDataError<ValueReadError<RangeReadError<S::Error>>>> {
pub async fn read_next(&mut self) -> Result<Option<Measurement>, SimDataError<ValueReadError<StorageRangeError<S::Error>>>> {
if self.index < self.event_count {
self.index += 1;
let next_id = match self.srcid {
StreamType::Bundle => BundleEventHeader::from_rmp(&mut self.reader)?.id,
_ => self.srcid
};
// The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some()
Ok(Some(match self.srcid {
SensorSource::IMU => self.read_next_event::<IMUReading>().await?,
SensorSource::GPS => self.read_next_event::<GPSReading>().await?,
SensorSource::Annotations => self.read_next_event::<AnnotationReading>().await?,
Ok(Some(match next_id {
StreamType::IMU => self.read_next_event::<IMUReading>().await?,
StreamType::GPS => self.read_next_event::<GPSReading>().await?,
StreamType::Annotations => self.read_next_event::<AnnotationReading>().await?,
srcid => unimplemented!("{srcid:?} is not a simulatable sensor yet!")
}))
} else {
@@ -164,7 +173,7 @@ pub async fn simulation_task(mut reader: SimDataReader<SharedFlash<FlashStorage>
warn!("Starting simulation for {:?}", reader.srcid());
events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::AcquiringFix)).await;
events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Online)).await;
//events.send(Measurement::SensorHardwareStatus(reader.srcid().into(), SensorState::Online)).await;
let mut idx = 0;
let mut idx_breaker = Breaker::default();
@@ -177,14 +186,14 @@ pub async fn simulation_task(mut reader: SimDataReader<SharedFlash<FlashStorage>
let pct = (idx as f32) / (reader.event_count as f32);
idx_breaker.set((pct * 255.0) as u8);
if let Some(pct) = idx_breaker.read_tripped() {
events.send(Measurement::SimulationProgress(reader.srcid(), reader.runtime, Fract8::from_raw(pct))).await;
//events.send(Measurement::SimulationProgress(reader.srcid().into(), reader.runtime, Fract8::from_raw(pct))).await;
}
idx += 1;
},
Ok(None) => {
warn!("End of simulation data stream");
break
}
},
Err(err) => {
warn!("Error during sensor stream: {err:?}");
break
@@ -192,7 +201,7 @@ pub async fn simulation_task(mut reader: SimDataReader<SharedFlash<FlashStorage>
}
}
events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Offline)).await;
//events.send(Measurement::SensorHardwareStatus(reader.srcid().into(), SensorState::Offline)).await;
events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::Degraded)).await;
warn!("End of simulation for {:?}", reader.srcid());