diff --git a/src/bin/main.rs b/src/bin/main.rs index 00cdf2b..ddf623d 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -51,6 +51,10 @@ fn gpio_interrupt_handler() { INTERRUPTS.try_get().unwrap().process_interrupts(); } +static MOTION_BUS: ConstStaticCell > = ConstStaticCell::new(Channel::new()); +static RECORDING_BUS: ConstStaticCell > = ConstStaticCell::new(PubSubChannel::new()); +static PREDICTIONS: ConstStaticCell> = ConstStaticCell::new(PubSubChannel::new()); + #[esp_rtos::main] async fn main(spawner: Spawner) { // If we aren't using the second CPU, we can use the bootloader space for the heap instead @@ -72,11 +76,8 @@ async fn main(spawner: Spawner) { esp_rtos::start(sys_timer.alarm0); info!("Embassy initialized!"); - static MOTION_BUS: StaticCell > = StaticCell::new(); - let motion_bus = MOTION_BUS.init_with(|| { Channel::new() }); - - static RECORDING_BUS: StaticCell > = StaticCell::new(); - let recording_bus = RECORDING_BUS.init_with(|| { PubSubChannel::new() }); + let motion_bus = MOTION_BUS.take(); + let recording_bus = RECORDING_BUS.take(); info!("Setting up rendering pipeline"); let mut surfaces = UiSurfacePool::default(); @@ -160,32 +161,6 @@ async fn main(spawner: Spawner) { spawner.must_spawn(renderbug_bike::tasks::oled_render::oled_render(output, oled_surfaces, oled_uniforms)); } - let mut storage = renderbug_bike::storage::SharedFlash::new(esp_storage::FlashStorage::new()); - let mut partition_buf = [8; 1024]; - let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut partition_buf).unwrap(); - - #[cfg(feature="simulation")] - { - use renderbug_bike::tasks::simulation::SimDataTable; - for sim_data in SimDataTable::open(storage, partitions).expect("Could not find sim data!") { - let srcid = sim_data.srcid(); - info!("Found simulation data for {srcid:?}"); - if spawner.spawn(renderbug_bike::tasks::simulation::simulation_task(sim_data, motion_bus.dyn_sender())).is_err() { - error!("Unable to spawn simulation task for {srcid:?}! Increase the task pool size."); - } - } - } - - #[cfg(not(feature="simulation"))] - { - use renderbug_bike::storage::SimDataRecorder; - - let recorder = SimDataRecorder::open(storage, partitions).expect("Unable to open sim data partition for writing"); - //spawner.spawn(record_telemetry(recording_bus.dyn_receiver(), recorder)).unwrap(); - } - - spawner.spawn(print_sensor_readings(recording_bus.dyn_subscriber().unwrap())).unwrap(); - #[cfg(feature="radio")] let (wifi, network_device, ble) = { info!("Configuring wifi"); @@ -211,9 +186,7 @@ async fn main(spawner: Spawner) { let core2_main = |spawner: Spawner| { info!("Starting application tasks"); - - static PREDICTIONS: StaticCell> = StaticCell::new(); - let predictions = PREDICTIONS.init(PubSubChannel::new()); + let predictions = PREDICTIONS.take(); #[cfg(not(feature="demo"))] { @@ -251,24 +224,72 @@ async fn main(spawner: Spawner) { info!("Starting connectivity task"); spawner.must_spawn(renderbug_bike::tasks::wifi::wifi_connect_task(wifi, motion_bus.dyn_sender())); + info!("Starting location sampler"); + static SAMPLER: ConstStaticCell> = ConstStaticCell::new(Watch::new()); + let sampler = SAMPLER.take(); + spawner.must_spawn(renderbug_bike::tasks::wifi::location_sampler(predictions.dyn_subscriber().unwrap(), sampler.dyn_sender())); + info!("Launching HTTP telemetry"); - spawner.must_spawn(renderbug_bike::tasks::wifi::http_telemetry_task(predictions.dyn_subscriber().unwrap(), stack, motion_bus.dyn_sender())); + spawner.must_spawn(renderbug_bike::tasks::wifi::http_telemetry_task(sampler.dyn_receiver().unwrap(), stack, motion_bus.dyn_sender())); info!("Starting BLE services"); spawner.must_spawn(renderbug_bike::tasks::ble::ble_task(ble, predictions.dyn_subscriber().unwrap(), spawner)); } - #[cfg(feature="dual-core")] + spawner.must_spawn(print_sensor_status(predictions.dyn_subscriber().unwrap())); + + let mut storage = SharedFlash::new(esp_storage::FlashStorage::new(peripherals.FLASH).multicore_auto_park()); + let mut partition_buf = [8; 1024]; + let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut partition_buf).unwrap(); + + #[cfg(any(feature="flash-recording", feature="simulation"))] { - info!("Launching core 2 watchdog"); - let timer1 = TimerGroup::new(peripherals.TIMG1); - let mut ui_wdt = timer1.wdt; - ui_wdt.set_timeout(esp_hal::timer::timg::MwdtStage::Stage0, esp_hal::time::Duration::from_secs(60)); - ui_wdt.enable(); - spawner.must_spawn(wdt_task(ui_wdt)); + use renderbug_bike::tasks::simulation::{SimDataStream, SimDataTable}; + let sim_partition = SimDataTable::find_partition(storage, partitions).expect("Could not find sim data partition!"); + info!("Got partition: {sim_partition:?}"); + let mut sim_table = match SimDataTable::open(sim_partition.clone()) { + Ok(table) => table, + Err(SimDataError::StreamIndexMissing) => { + info!("Sim data partition not formatted, creating new stream table with {sim_partition:?}"); + SimDataTable::create(sim_partition).expect("Unable to create sim data stream table in partition!") + }, + Err(e) => panic!("Error opening sim data stream table: {e:?}") + }; + + loop { + match sim_table.next() { + Some((header, reader)) => { + let srcid = header.id; + info!("Found simulation data for {srcid:?} at {:#x}", reader.abs_start()); + + let stream = SimDataStream::open(reader, srcid); + + if cfg!(feature="simulation") { + if spawner.spawn(renderbug_bike::tasks::simulation::simulation_task(stream, motion_bus.dyn_sender())).is_err() { + error!("Unable to spawn simulation task for {srcid:?}! Increase the task pool size."); + } + } else if cfg!(feature="flash-recording") && srcid == StreamType::Bundle { + info!("Continuing recording stream"); + spawner.spawn(record_telemetry(recording_bus.dyn_subscriber().unwrap(), stream, motion_bus.dyn_sender())).unwrap(); + break; + } + }, + None if cfg!(feature="flash-recording") => { + // If we already found a recording stream, we break; out of the loop above + let reader = sim_table.append_new_stream(StreamType::Bundle).expect("Unable to create a new recording stream in the sim data partition! Is there enough free space?"); + warn!("Starting new recording stream at {:#x}", reader.abs_start()); + let recorder = SimDataStream::create(reader, StreamType::Bundle); + spawner.spawn(record_telemetry(recording_bus.dyn_subscriber().unwrap(), recorder, motion_bus.dyn_sender())).unwrap(); + break; + }, + _ => () + } + } } - spawner.must_spawn(print_sensor_status(predictions.dyn_subscriber().unwrap())); + spawner.must_spawn(wdt_task(rtc, predictions.dyn_subscriber().unwrap(), oled_controls, display_controls)); + + //info!("Final memory stats: {}", esp_alloc::HEAP.stats()); info!("Ready to rock and roll in {}ms", Instant::now().as_millis()); }; @@ -316,9 +337,17 @@ async fn wdt_task(mut wdt: Wdt>) { } #[embassy_executor::task] -async fn record_telemetry(firehose: DynamicReceiver<'static, Measurement>, mut storage: SimDataRecorder>>) { +async fn record_telemetry(mut firehose: DynSubscriber<'static, Measurement>, mut storage: SimDataStream>>>, motion: DynamicSender<'static, Measurement>) { + let mut skipped_events = 0; + while let Ok(Some((_, evt))) = storage.read_next() { + trace!("Skipping event {evt:?}"); + skipped_events += 1; + } + info!("Skipped {} events to catch up to the end of the recording stream", skipped_events); + motion.send(Measurement::SensorHardwareStatus(SensorSource::FlashRecording, SensorState::Online)).await; + storage.write_next(AnnotationReading { buf: *b"Telemetry recording started " }).unwrap(); loop { - match firehose.receive().await { + match firehose.next_message_pure().await { Measurement::IMU { accel, gyro } => { let reading = IMUReading { accel_x: accel.x as f64, @@ -328,24 +357,28 @@ async fn record_telemetry(firehose: DynamicReceiver<'static, Measurement>, mut s gyro_y: gyro.y as f64, gyro_z: gyro.z as f64 }; - storage.write_next(reading).unwrap(); - info!("Wrote IMU to flash"); + //storage.write_next(reading).unwrap(); + trace!("Wrote IMU to flash"); }, - _ => () - } - } -} - -#[embassy_executor::task] -async fn print_sensor_readings(mut events: DynSubscriber<'static, Measurement>) { - loop { - match events.next_message_pure().await { - Measurement::IMU { accel, gyro } => { - esp_println::println!("accel=({},{},{}) gyro=({},{},{})", accel.x, accel.y, accel.z, gyro.x, gyro.y, gyro.z); - }, - Measurement::GPS(gps) => { - esp_println::println!("gps={gps:?}"); + Measurement::GPS(Some(pos)) => { + storage.write_next(GPSReading { + lat: pos.x, + lon: pos.y, + }).unwrap(); + trace!("Wrote GPS to flash"); }, + Measurement::SensorHardwareStatus(sensor, status) => { + let annotation = alloc::format!("{:?}={:?}", sensor, status); + let mut buf = [0; 32]; + let bytes = annotation.as_bytes(); + let copy_len = bytes.len().min(buf.len()); + buf[..copy_len].copy_from_slice(&bytes[..copy_len]); + if copy_len < buf.len() { + buf[copy_len..].fill(b' '); + } + storage.write_next(AnnotationReading { buf }).unwrap(); + trace!("Wrote sensor status update to flash"); + } _ => () } } diff --git a/src/bin/playback.rs b/src/bin/playback.rs new file mode 100644 index 0000000..687dd5f --- /dev/null +++ b/src/bin/playback.rs @@ -0,0 +1,59 @@ +#![no_std] +#![no_main] + +use embassy_executor::Spawner; +use esp_hal::clock::CpuClock; +use esp_hal::timer::systimer::SystemTimer; +use log::*; +use esp_backtrace as _; + +use renderbug_bike::logging::RenderbugLogger; +use renderbug_bike::tasks::simulation::{SimDataStream, SimDataTable}; +use renderbug_bike::simdata::*; + +esp_bootloader_esp_idf::esp_app_desc!(); + +#[esp_rtos::main] +async fn main(spawner: Spawner) { + + esp_alloc::heap_allocator!(size: 100000); + + RenderbugLogger::init_logger(); + + let config = esp_hal::Config::default().with_cpu_clock(CpuClock::max()); + let peripherals = esp_hal::init(config); + let sys_timer = SystemTimer::new(peripherals.SYSTIMER); + esp_rtos::start(sys_timer.alarm0); + + let mut storage = renderbug_bike::storage::SharedFlash::new(esp_storage::FlashStorage::new(peripherals.FLASH).multicore_auto_park()); + let mut partition_buf = [8; 1024]; + let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut partition_buf).unwrap(); + + let sim_partition = SimDataTable::find_partition(storage, partitions).expect("Could not find sim data partition!"); + info!("Got partition: {sim_partition:?}"); + let sim_table = SimDataTable::open(sim_partition).expect("Sim data partition not found"); + + for (header, reader) in sim_table { + info!("Found stream {header:?}"); + if header.id == StreamType::Bundle { + let mut stream = SimDataStream::open(reader, header.id); + let mut timestamp = 0; + loop { + match stream.read_next() { + Ok(Some((timecode, next_evt))) => { + timestamp += timecode.as_millis(); + esp_println::println!("{timestamp} {next_evt:?}"); + }, + Ok(None) => { + warn!("End of simulation data stream"); + break + }, + Err(err) => { + warn!("Error during sensor stream: {err:?}"); + break + } + } + } + } + } +} \ No newline at end of file diff --git a/src/bin/reset.rs b/src/bin/reset.rs new file mode 100644 index 0000000..e427e98 --- /dev/null +++ b/src/bin/reset.rs @@ -0,0 +1,36 @@ +#![no_std] +#![no_main] + +use embassy_executor::Spawner; +use esp_hal::clock::CpuClock; +use esp_hal::timer::systimer::SystemTimer; +use log::*; +use esp_backtrace as _; + +use renderbug_bike::logging::RenderbugLogger; +use renderbug_bike::tasks::simulation::{SimDataStream, SimDataTable}; +use renderbug_bike::simdata::*; + +esp_bootloader_esp_idf::esp_app_desc!(); + +#[esp_rtos::main] +async fn main(spawner: Spawner) { + + esp_alloc::heap_allocator!(size: 100000); + + RenderbugLogger::init_logger(); + + let config = esp_hal::Config::default().with_cpu_clock(CpuClock::max()); + let peripherals = esp_hal::init(config); + let sys_timer = SystemTimer::new(peripherals.SYSTIMER); + esp_rtos::start(sys_timer.alarm0); + + let mut storage = renderbug_bike::storage::SharedFlash::new(esp_storage::FlashStorage::new(peripherals.FLASH)); + let mut partition_buf = [8; 1024]; + let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut partition_buf).unwrap(); + + let sim_partition = SimDataTable::find_partition(storage, partitions).expect("Could not find sim data partition!"); + info!("Got partition: {sim_partition:?}"); + SimDataTable::create(sim_partition).expect("Could not write empty sim partition"); + error!("Overwrote sim data partition with a blank stream table"); +} \ No newline at end of file diff --git a/src/events.rs b/src/events.rs index 8cc5e93..569d6a3 100644 --- a/src/events.rs +++ b/src/events.rs @@ -47,7 +47,7 @@ pub enum Measurement { // Simulation metadata updates SimulationProgress(SensorSource, Duration, Fract8), - Annotation + Annotation([u8; 32]) } #[derive(Default, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -81,6 +81,9 @@ pub enum SensorSource { // Connectivity related Wifi, + // Data processing/logging + FlashRecording, + // Fusion outputs MotionFrame, Location, diff --git a/src/storage.rs b/src/storage.rs index 8827359..200b7bc 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,13 +1,14 @@ use core::{cell::RefCell, fmt::Formatter}; use alloc::rc::Rc; +use embedded_io::{ErrorKind, ErrorType, Read, Write}; use embedded_storage::{ReadStorage, Storage}; use esp_bootloader_esp_idf::partitions::PartitionTable; use esp_hal::time::Instant; use log::*; use rmp::{decode::{RmpRead, RmpReadErr}, encode::{RmpWrite, RmpWriteErr, ValueWriteError}}; -use crate::simdata::{BundleEventHeader, EventRecord, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}; +use crate::{simdata::{BundleEventHeader, EventRecord, EventStreamHeader, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}, tasks::simulation::Checkpoint}; #[derive(Debug)] pub struct SharedFlash { @@ -62,12 +63,98 @@ impl core::fmt::Display for StorageRangeError { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StorageRange { storage: S, start: usize, end: usize, - offset: usize + offset: usize, + reset_pos: usize +} + +impl Checkpoint for StorageRange { + fn checkpoint(&mut self) { + self.reset_pos = self.offset; + } + + fn rollback(&mut self) { + self.offset = self.reset_pos; + } +} + +impl StorageRange { + pub fn inner(&self) -> &S { + &self.storage + } + + pub fn inner_mut(&mut self) -> &mut S { + &mut self.storage + } + + pub fn abs_start(&self) -> usize { + self.start + } + + pub fn abs_end(&self) -> usize { + self.end + } +} + +impl embedded_io::Error for StorageRangeError { + fn kind(&self) -> embedded_io::ErrorKind { + ErrorKind::Other + } +} + +impl, E: core::fmt::Debug> ErrorType for StorageRange { + type Error = StorageRangeError; +} + +impl + ReadStorage, E: core::fmt::Debug> Write for StorageRange { + fn write(&mut self, buf: &[u8]) -> Result { + let pos = self.start + self.offset; + trace!("write {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, pos, pos + buf.len()); + if pos > self.end { + Err(StorageRangeError::OutOfData) + } else { + assert!(pos + buf.len() <= self.end); + match self.storage.write(pos as u32, buf) { + Ok(_) => { + self.offset += buf.len(); + Ok(buf.len()) + }, + Err(err) => Err(StorageRangeError::Storage(err)) + } + } + } + + fn flush(&mut self) -> Result<(), Self::Error> { + Ok(()) + } +} + +impl, E: core::fmt::Debug> Read for StorageRange { + fn read(&mut self, buf: &mut [u8]) -> Result { + let pos = self.start + self.offset; + trace!("read_exact_buf {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, pos, pos + buf.len()); + if pos > self.end { + Err(StorageRangeError::OutOfData) + } else { + let remaining = self.end - pos; + let max_read = buf.len().min(remaining); + if max_read == 0 { + return Err(StorageRangeError::OutOfData) + } + assert!(pos + buf.len() <= self.end); + match self.storage.read(pos as u32, &mut buf[..max_read]) { + Ok(_) => { + self.offset += max_read; + Ok(max_read) + }, + Err(err) => Err(StorageRangeError::Storage(err)) + } + } + } } impl StorageRange { @@ -77,7 +164,8 @@ impl StorageRange { storage, start, end, - offset: 0 + offset: 0, + reset_pos: 0 } } @@ -98,6 +186,15 @@ impl StorageRange { } } + pub fn seek_abs(&mut self, pos: usize) -> Result<(), StorageRangeError> { + self.offset = pos; + if self.offset > self.end { + Err(StorageRangeError::OutOfData) + } else { + Ok(()) + } + } + pub fn subset(&self, size: usize) -> Result> where S: Clone + core::fmt::Debug { trace!("subset {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, self.start + self.offset, self.start + self.offset + size); if self.start + self.offset + size > self.end { @@ -107,7 +204,8 @@ impl StorageRange { storage: self.storage.clone(), start: self.offset + self.start, end: self.start + self.offset + size, - offset: 0 + offset: 0, + reset_pos: 0 }) } } @@ -152,51 +250,4 @@ impl RmpWrite for StorageRange where S::Error: core::fmt::Debug + } } } -} - -pub struct SimDataRecorder { - storage: S, - last_stamp: Instant -} - -impl SimDataRecorder { - pub fn new(storage: S) -> Self { - Self { - storage, - last_stamp: Instant::now() - } - } - - pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result>, SimDataError::Error>>>> where S: Storage, ::Error: core::fmt::Debug + 'static { - let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data( - esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined, - ); - info!("Searching for sim data partition"); - let data_partition = partitions.iter().find(|partition| { - partition.partition_type() == partition_type && partition.label_as_str() == "sim" - }).ok_or(SimDataError::PartitionNotFound)?; - - let start = data_partition.offset() as usize; - let end = data_partition.len() as usize + start; - let mut writer = StorageRange::new(storage, start, end); - warn!("Writing new simulation data at {start:#02x}:{end:#02x}"); - StreamIndex { count: 1 }.write_rmp(&mut writer)?; - StreamHeader { id: StreamType::Bundle, size: 0 }.write_rmp(&mut writer)?; - Ok(SimDataRecorder { - storage: writer, - last_stamp: Instant::now() - }) - } - - pub fn write_next(&mut self, event: T) -> Result<(), SimDataError>> where S: RmpWrite { - BundleEventHeader { id: T::stream_id() }.write_rmp(&mut self.storage)?; - let now = Instant::now(); - let event = StreamEvent { - data: event, - timecode: (now - self.last_stamp).as_millis() as f64 / 1000.0 - }; - event.write_rmp(&mut self.storage)?; - self.last_stamp = now; - Ok(()) - } } \ No newline at end of file diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 14924a8..882b910 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -5,7 +5,6 @@ pub mod gps; pub mod wifi; #[cfg(feature="radio")] pub mod ble; -#[cfg(feature="simulation")] pub mod simulation; #[cfg(feature="demo")] pub mod demo; diff --git a/src/tasks/motion.rs b/src/tasks/motion.rs index e100746..0648532 100644 --- a/src/tasks/motion.rs +++ b/src/tasks/motion.rs @@ -28,14 +28,16 @@ pub async fn motion_task(src: DynamicReceiver<'static, Measurement>, prediction_ }, // FIXME: This needs harmonized with the automatic data timeout from above, somehow? Measurement::SensorHardwareStatus(source, state) => { - warn!("Sensor {source:?} reports {state:?}!"); + debug!("Sensor {source:?} reports {state:?}!"); prediction_sink.publish(Prediction::SensorStatus(source, state)).with_timeout(TIMEOUT).await.expect("Could not update sensor status in time. Is the prediction bus stalled?"); }, Measurement::ExternalPower(mw) => { info!("Got external power change to {mw:?}"); }, Measurement::SimulationProgress(source, duration, pct) => debug!("{source:?} simulation time: {} {} / 255", duration.as_secs(), pct), - Measurement::Annotation => () + Measurement::Annotation(msg) => { + info!("Annotation: {}", str::from_utf8(&msg).unwrap_or("")); + } } if recording_sink.try_publish(next_measurement).is_err() { warn!("Could not publish measurement to recording bus. Is the recording bus stalled?"); diff --git a/src/tasks/sd_card.rs b/src/tasks/sd_card.rs index 956ad79..5f6d2cb 100644 --- a/src/tasks/sd_card.rs +++ b/src/tasks/sd_card.rs @@ -72,7 +72,7 @@ pub async fn sdcard_task( let stream = rootfs.open_file_in_dir("renderbug-sensors.rmp",Mode::ReadWriteCreateOrTruncate).unwrap(); let rmp_stream = EmbeddedRmp(stream); - let mut recorder = SimDataRecorder::new(EmbeddedRmp(rmp_stream)); + /*let mut recorder = SimDataStream::open(rmp_stream, StreamType::Bundle); loop { // FIXME: THis chunk could really go into an impl From for EventRecord @@ -86,17 +86,18 @@ pub async fn sdcard_task( gyro_y: gyro.y as f64, gyro_z: gyro.z as f64 }; - if recorder.write_next(reading).is_err() { + /*if recorder.write_next(reading).is_err() { error!("Failed to write IMU reading to SD card"); break; - } + }*/ info!("Wrote IMU to SD card"); }, _ => () } } - drop(recorder); + drop(recorder);*/ + drop(rmp_stream); drop(rootfs); drop(vol); @@ -104,13 +105,15 @@ pub async fn sdcard_task( } } -struct EmbeddedRmp(W); +struct EmbeddedRmp(W); #[derive(Debug)] struct EmbeddedRmpError(T); impl RmpWriteErr for EmbeddedRmpError { } +impl RmpReadErr for EmbeddedRmpError { +} impl core::fmt::Display for EmbeddedRmpError where T: core::fmt::Debug { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { @@ -127,6 +130,15 @@ impl RmpWrite for EmbeddedRmp where W::Error: core::fm } } +impl RmpRead for EmbeddedRmp where W::Error: core::fmt::Debug + 'static { + type Error = EmbeddedRmpError>; + + fn read_exact_buf(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> { + self.0.read_exact(buf).map_err(|e| { EmbeddedRmpError(e) })?; + Ok(()) + } +} + impl embedded_io::ErrorType for EmbeddedRmp { type Error = W::Error; } diff --git a/src/tasks/simulation.rs b/src/tasks/simulation.rs index d19dc71..077ed86 100644 --- a/src/tasks/simulation.rs +++ b/src/tasks/simulation.rs @@ -9,18 +9,46 @@ use esp_storage::FlashStorage; use figments::liber8tion::interpolate::Fract8; use nalgebra::{Vector2, Vector3}; use log::*; -use rmp::{decode::ValueReadError, encode::ValueWriteError}; +use rmp::{decode::{RmpRead, RmpReadErr, ValueReadError}, encode::{RmpWrite, RmpWriteErr, ValueWriteError}}; use crate::{Breaker, events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, BundleEventHeader, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}, storage::{SharedFlash, StorageRange, StorageRangeError}}; pub struct SimDataTable { - reader: StorageRange, + reader: S, count: usize, index: usize } -impl SimDataTable where S::Error: core::fmt::Debug + 'static { - pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result> { +impl, E: RmpReadErr + RmpWriteErr> SimDataTable { + pub fn open(mut reader: S) -> Result> { + if let Ok(index) = StreamIndex::from_rmp(&mut reader) { + info!("Found stream index: {index:?}"); + Ok(Self { + reader, + count: index.count, + index: 0 + }) + } else { + Err(SimDataError::StreamIndexMissing) + } + } +} + +impl, E: RmpReadErr + RmpWriteErr> SimDataTable { + pub fn create(mut writer: S) -> Result> { + // FIXME: The stream header should use a fixed size integer for the count, instead of the dynamic array length type + info!("Writing new stream index"); + StreamIndex { count: 0 }.write_rmp(&mut writer)?; + Ok(Self { + reader: writer, + count: 0, + index: 0 + }) + } +} + +impl, E: core::fmt::Debug + 'static> SimDataTable> { + pub fn find_partition(storage: S, partitions: PartitionTable<'_>) -> Result, SimDataError>> { let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data( esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined, ); @@ -32,29 +60,41 @@ impl SimDataTable where S::Error: core::fmt::Debug + 'static let start = data_partition.offset() as usize; let end = data_partition.len() as usize + start; info!("Opening simulation data at {start:#02x}:{end:#02x}"); - let mut reader = StorageRange::new(storage, start, end); - if let Ok(index) = StreamIndex::from_rmp(&mut reader) { - info!("Found stream index: {index:?}"); - Ok(Self { - reader, - count: index.count, - index: 0 - }) - } else { - error!("Stream index is missing! Have you flashed the partition yet?"); - Err(SimDataError::StreamIndexMissing) - } + Ok(StorageRange::new(storage, start, end)) } } -impl Iterator for SimDataTable where S::Error: core::fmt::Debug + 'static { - type Item = SimDataReader; +impl + Clone + core::fmt::Debug, E: core::fmt::Debug + 'static> SimDataTable> { + pub fn append_new_stream(&mut self, streamid: StreamType) -> Result, SimDataError>> { + let stream_size = self.reader.capacity() - self.reader.pos() - 6; + if stream_size < 1024 { + error!("Not enough space left in the simulation data partition to create a new stream!"); + return Err(SimDataError::NotEnoughSpace); + } + + StreamHeader { id: streamid, size: stream_size }.write_rmp(&mut self.reader)?; + self.count += 1; + self.index += 1; + let reset_pos = self.reader.pos(); + self.reader.seek_abs(0).unwrap(); + StreamIndex { count: self.count }.write_rmp(&mut self.reader)?; + self.reader.seek_abs(reset_pos).unwrap(); + + info!("Creating new stream id={streamid:?} capacity={stream_size:#02x} range={:#02x}:{:#02x}", self.reader.abs_start() + self.reader.pos(), self.reader.abs_start() + stream_size); + + Ok(self.reader.subset(stream_size).unwrap()) + } +} + +impl + Clone + core::fmt::Debug, E: core::fmt::Debug + 'static> Iterator for SimDataTable> { + type Item = (StreamHeader, StorageRange); fn next(&mut self) -> Option { if self.index >= self.count { None } else { loop { + let reset_pos = self.reader.pos(); match StreamHeader::from_rmp(&mut self.reader) { Ok(header) => { info!("Found stream header: {header:?}"); @@ -66,7 +106,7 @@ impl Iterator for SimDataTable whe }); self.index += 1; debug!("Found header={header:?}"); - return Some(SimDataReader::open(sensor_reader, header.id)); + return Some((header, sensor_reader)); }, err => { error!("Could not open the next simulation data chunk: {err:?}"); @@ -83,6 +123,8 @@ impl Iterator for SimDataTable whe } Err(err) => { error!("Read error while reading next chunk in simulation stream table {err:?}"); + // Reset back to right before we tried reading the stream header, so that append_new_stream() places it correctly. + self.reader.seek_abs(reset_pos).unwrap(); return None; } } @@ -91,10 +133,11 @@ impl Iterator for SimDataTable whe } } -pub struct SimDataReader { - reader: StorageRange, +pub struct SimDataStream { + reader: S, srcid: StreamType, - runtime: Duration, + last_stamp: Instant, + //runtime: Duration, event_count: usize, index: usize } @@ -116,60 +159,117 @@ impl From for Measurement { impl From for Measurement { fn from(value: AnnotationReading) -> Self { - warn!("ANNOTATION: {}", core::str::from_utf8(&value.buf).unwrap()); - Measurement::Annotation + Measurement::Annotation(value.buf) } } -impl SimDataReader where S::Error: core::fmt::Debug + 'static { - pub fn open(mut reader: StorageRange, stream_type: StreamType) -> Self { - debug!("Opening {stream_type:?} sim data chunk"); - let event_count = if stream_type != StreamType::Bundle { EventStreamHeader::from_rmp(&mut reader).unwrap().count } else { usize::MAX }; - debug!("Found {event_count} events!"); +impl SimDataStream { + pub fn srcid(&self) -> StreamType { + self.srcid + } +} + +pub trait Checkpoint { + fn checkpoint(&mut self); + fn rollback(&mut self); +} + +impl, E: RmpWriteErr + RmpReadErr> SimDataStream { + pub fn open(mut reader: S, stream_type: StreamType) -> Self { + info!("Opening {stream_type:?} sim data chunk"); + let event_count = EventStreamHeader::from_rmp(&mut reader).unwrap().count; + info!("Found {event_count} events!"); Self { reader, srcid: stream_type, - runtime: Default::default(), + //runtime: Default::default(), + last_stamp: Instant::now(), event_count, index: 0 } } - pub fn srcid(&self) -> StreamType { - self.srcid - } - - async fn read_next_event>(&mut self) -> Result>>> { + fn read_next_event>(&mut self) -> Result<(Duration, Measurement), SimDataError> { let event = StreamEvent::::from_rmp(&mut self.reader)?; let delay = embassy_time::Duration::from_millis((event.timecode * 1000.0) as u64); - self.runtime += delay; - info!("waiting {delay}"); - Timer::after(delay).await; - Ok(event.data.into()) + Ok((delay, event.data.into())) } - pub async fn read_next(&mut self) -> Result, SimDataError>>> { + pub fn read_next(&mut self) -> Result, SimDataError> { if self.index < self.event_count { self.index += 1; + self.reader.checkpoint(); let next_id = match self.srcid { - StreamType::Bundle => BundleEventHeader::from_rmp(&mut self.reader)?.id, + StreamType::Bundle => BundleEventHeader::from_rmp(&mut self.reader).inspect_err(|_| { self.reader.rollback() })?.id, _ => self.srcid }; // The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some() - Ok(Some(match next_id { - StreamType::IMU => self.read_next_event::().await?, - StreamType::GPS => self.read_next_event::().await?, - StreamType::Annotations => self.read_next_event::().await?, + let (delay, evt) = match next_id { + StreamType::IMU => self.read_next_event::().inspect_err(|_| { self.reader.rollback() })?, + StreamType::GPS => self.read_next_event::().inspect_err(|_| { self.reader.rollback() })?, + StreamType::Annotations => self.read_next_event::().inspect_err(|_| { self.reader.rollback() })?, srcid => unimplemented!("{srcid:?} is not a simulatable sensor yet!") - })) + }; + Ok(Some((delay, evt))) + } else { + Ok(None) + } + } + + pub async fn read_next_and_wait(&mut self) -> Result, SimDataError> { + if let Some((delay, evt)) = self.read_next()? { + trace!("waiting {delay}"); + Timer::after(delay).await; + self.last_stamp = Instant::now(); + Ok(Some(evt)) } else { Ok(None) } } } +impl, SE: core::fmt::Debug + 'static> SimDataStream> { + pub fn write_next(&mut self, event: T) -> Result<(), SimDataError>> { + BundleEventHeader { id: T::stream_id() }.write_rmp(&mut self.reader)?; + let now = Instant::now(); + let event = StreamEvent { + data: event, + timecode: (now - self.last_stamp).as_millis() as f64 / 1000.0 + }; + event.write_rmp(&mut self.reader)?; + self.last_stamp = now; + self.event_count = self.event_count.checked_add(1).ok_or(SimDataError::NotEnoughSpace)?; + + self.write_header()?; + Ok(()) + } + + fn write_header(&mut self) -> Result<(), SimDataError>> { + let reset_pos = self.reader.pos(); + self.reader.seek_abs(0).unwrap(); + trace!("Writing event stream header with count={} to {:#x}", self.event_count, self.reader.abs_start()); + EventStreamHeader { count: self.event_count }.write_rmp(&mut self.reader)?; + self.reader.seek_abs(reset_pos).unwrap(); + + Ok(()) + } + + pub fn create(mut storage: StorageRange, stream_type: StreamType) -> Self { + storage.seek_abs(0).unwrap(); + EventStreamHeader { count: 0 }.write_rmp(&mut storage).unwrap(); + Self { + reader: storage, + srcid: stream_type, + //runtime: Default::default(), + last_stamp: Instant::now(), + event_count: 0, + index: 0 + } + } +} + #[embassy_executor::task(pool_size = 3)] -pub async fn simulation_task(mut reader: SimDataReader>, events: DynamicSender<'static, Measurement>) { +pub async fn simulation_task(mut reader: SimDataStream>>>, events: DynamicSender<'static, Measurement>) { warn!("Starting simulation for {:?}", reader.srcid()); events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::AcquiringFix)).await; @@ -180,7 +280,7 @@ pub async fn simulation_task(mut reader: SimDataReader // TODO: SimulationProgress updates loop { - match reader.read_next().await { + match reader.read_next_and_wait().await { Ok(Some(next_evt)) => { events.send(next_evt).await; let pct = (idx as f32) / (reader.event_count as f32);