simulation: rewrite the storage and simulation stack
This commit is contained in:
153
src/bin/main.rs
153
src/bin/main.rs
@@ -51,6 +51,10 @@ fn gpio_interrupt_handler() {
|
|||||||
INTERRUPTS.try_get().unwrap().process_interrupts();
|
INTERRUPTS.try_get().unwrap().process_interrupts();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static MOTION_BUS: ConstStaticCell<Channel<CriticalSectionRawMutex,Measurement,5> > = ConstStaticCell::new(Channel::new());
|
||||||
|
static RECORDING_BUS: ConstStaticCell<PubSubChannel<CriticalSectionRawMutex,Measurement,10, 4, 1> > = ConstStaticCell::new(PubSubChannel::new());
|
||||||
|
static PREDICTIONS: ConstStaticCell<PubSubChannel<NoopRawMutex, Prediction, 30, 7, 1>> = ConstStaticCell::new(PubSubChannel::new());
|
||||||
|
|
||||||
#[esp_rtos::main]
|
#[esp_rtos::main]
|
||||||
async fn main(spawner: Spawner) {
|
async fn main(spawner: Spawner) {
|
||||||
// If we aren't using the second CPU, we can use the bootloader space for the heap instead
|
// If we aren't using the second CPU, we can use the bootloader space for the heap instead
|
||||||
@@ -72,11 +76,8 @@ async fn main(spawner: Spawner) {
|
|||||||
esp_rtos::start(sys_timer.alarm0);
|
esp_rtos::start(sys_timer.alarm0);
|
||||||
info!("Embassy initialized!");
|
info!("Embassy initialized!");
|
||||||
|
|
||||||
static MOTION_BUS: StaticCell<Channel<CriticalSectionRawMutex,Measurement,5> > = StaticCell::new();
|
let motion_bus = MOTION_BUS.take();
|
||||||
let motion_bus = MOTION_BUS.init_with(|| { Channel::new() });
|
let recording_bus = RECORDING_BUS.take();
|
||||||
|
|
||||||
static RECORDING_BUS: StaticCell<PubSubChannel<CriticalSectionRawMutex,Measurement,1, 2, 1> > = StaticCell::new();
|
|
||||||
let recording_bus = RECORDING_BUS.init_with(|| { PubSubChannel::new() });
|
|
||||||
|
|
||||||
info!("Setting up rendering pipeline");
|
info!("Setting up rendering pipeline");
|
||||||
let mut surfaces = UiSurfacePool::default();
|
let mut surfaces = UiSurfacePool::default();
|
||||||
@@ -160,32 +161,6 @@ async fn main(spawner: Spawner) {
|
|||||||
spawner.must_spawn(renderbug_bike::tasks::oled_render::oled_render(output, oled_surfaces, oled_uniforms));
|
spawner.must_spawn(renderbug_bike::tasks::oled_render::oled_render(output, oled_surfaces, oled_uniforms));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut storage = renderbug_bike::storage::SharedFlash::new(esp_storage::FlashStorage::new());
|
|
||||||
let mut partition_buf = [8; 1024];
|
|
||||||
let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut partition_buf).unwrap();
|
|
||||||
|
|
||||||
#[cfg(feature="simulation")]
|
|
||||||
{
|
|
||||||
use renderbug_bike::tasks::simulation::SimDataTable;
|
|
||||||
for sim_data in SimDataTable::open(storage, partitions).expect("Could not find sim data!") {
|
|
||||||
let srcid = sim_data.srcid();
|
|
||||||
info!("Found simulation data for {srcid:?}");
|
|
||||||
if spawner.spawn(renderbug_bike::tasks::simulation::simulation_task(sim_data, motion_bus.dyn_sender())).is_err() {
|
|
||||||
error!("Unable to spawn simulation task for {srcid:?}! Increase the task pool size.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(not(feature="simulation"))]
|
|
||||||
{
|
|
||||||
use renderbug_bike::storage::SimDataRecorder;
|
|
||||||
|
|
||||||
let recorder = SimDataRecorder::open(storage, partitions).expect("Unable to open sim data partition for writing");
|
|
||||||
//spawner.spawn(record_telemetry(recording_bus.dyn_receiver(), recorder)).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
spawner.spawn(print_sensor_readings(recording_bus.dyn_subscriber().unwrap())).unwrap();
|
|
||||||
|
|
||||||
#[cfg(feature="radio")]
|
#[cfg(feature="radio")]
|
||||||
let (wifi, network_device, ble) = {
|
let (wifi, network_device, ble) = {
|
||||||
info!("Configuring wifi");
|
info!("Configuring wifi");
|
||||||
@@ -211,9 +186,7 @@ async fn main(spawner: Spawner) {
|
|||||||
|
|
||||||
let core2_main = |spawner: Spawner| {
|
let core2_main = |spawner: Spawner| {
|
||||||
info!("Starting application tasks");
|
info!("Starting application tasks");
|
||||||
|
let predictions = PREDICTIONS.take();
|
||||||
static PREDICTIONS: StaticCell<PubSubChannel<NoopRawMutex, Prediction, 15, 6, 1>> = StaticCell::new();
|
|
||||||
let predictions = PREDICTIONS.init(PubSubChannel::new());
|
|
||||||
|
|
||||||
#[cfg(not(feature="demo"))]
|
#[cfg(not(feature="demo"))]
|
||||||
{
|
{
|
||||||
@@ -251,24 +224,72 @@ async fn main(spawner: Spawner) {
|
|||||||
info!("Starting connectivity task");
|
info!("Starting connectivity task");
|
||||||
spawner.must_spawn(renderbug_bike::tasks::wifi::wifi_connect_task(wifi, motion_bus.dyn_sender()));
|
spawner.must_spawn(renderbug_bike::tasks::wifi::wifi_connect_task(wifi, motion_bus.dyn_sender()));
|
||||||
|
|
||||||
|
info!("Starting location sampler");
|
||||||
|
static SAMPLER: ConstStaticCell<Watch<NoopRawMutex, Prediction, 1>> = ConstStaticCell::new(Watch::new());
|
||||||
|
let sampler = SAMPLER.take();
|
||||||
|
spawner.must_spawn(renderbug_bike::tasks::wifi::location_sampler(predictions.dyn_subscriber().unwrap(), sampler.dyn_sender()));
|
||||||
|
|
||||||
info!("Launching HTTP telemetry");
|
info!("Launching HTTP telemetry");
|
||||||
spawner.must_spawn(renderbug_bike::tasks::wifi::http_telemetry_task(predictions.dyn_subscriber().unwrap(), stack, motion_bus.dyn_sender()));
|
spawner.must_spawn(renderbug_bike::tasks::wifi::http_telemetry_task(sampler.dyn_receiver().unwrap(), stack, motion_bus.dyn_sender()));
|
||||||
|
|
||||||
info!("Starting BLE services");
|
info!("Starting BLE services");
|
||||||
spawner.must_spawn(renderbug_bike::tasks::ble::ble_task(ble, predictions.dyn_subscriber().unwrap(), spawner));
|
spawner.must_spawn(renderbug_bike::tasks::ble::ble_task(ble, predictions.dyn_subscriber().unwrap(), spawner));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature="dual-core")]
|
spawner.must_spawn(print_sensor_status(predictions.dyn_subscriber().unwrap()));
|
||||||
|
|
||||||
|
let mut storage = SharedFlash::new(esp_storage::FlashStorage::new(peripherals.FLASH).multicore_auto_park());
|
||||||
|
let mut partition_buf = [8; 1024];
|
||||||
|
let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut partition_buf).unwrap();
|
||||||
|
|
||||||
|
#[cfg(any(feature="flash-recording", feature="simulation"))]
|
||||||
{
|
{
|
||||||
info!("Launching core 2 watchdog");
|
use renderbug_bike::tasks::simulation::{SimDataStream, SimDataTable};
|
||||||
let timer1 = TimerGroup::new(peripherals.TIMG1);
|
let sim_partition = SimDataTable::find_partition(storage, partitions).expect("Could not find sim data partition!");
|
||||||
let mut ui_wdt = timer1.wdt;
|
info!("Got partition: {sim_partition:?}");
|
||||||
ui_wdt.set_timeout(esp_hal::timer::timg::MwdtStage::Stage0, esp_hal::time::Duration::from_secs(60));
|
let mut sim_table = match SimDataTable::open(sim_partition.clone()) {
|
||||||
ui_wdt.enable();
|
Ok(table) => table,
|
||||||
spawner.must_spawn(wdt_task(ui_wdt));
|
Err(SimDataError::StreamIndexMissing) => {
|
||||||
|
info!("Sim data partition not formatted, creating new stream table with {sim_partition:?}");
|
||||||
|
SimDataTable::create(sim_partition).expect("Unable to create sim data stream table in partition!")
|
||||||
|
},
|
||||||
|
Err(e) => panic!("Error opening sim data stream table: {e:?}")
|
||||||
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match sim_table.next() {
|
||||||
|
Some((header, reader)) => {
|
||||||
|
let srcid = header.id;
|
||||||
|
info!("Found simulation data for {srcid:?} at {:#x}", reader.abs_start());
|
||||||
|
|
||||||
|
let stream = SimDataStream::open(reader, srcid);
|
||||||
|
|
||||||
|
if cfg!(feature="simulation") {
|
||||||
|
if spawner.spawn(renderbug_bike::tasks::simulation::simulation_task(stream, motion_bus.dyn_sender())).is_err() {
|
||||||
|
error!("Unable to spawn simulation task for {srcid:?}! Increase the task pool size.");
|
||||||
|
}
|
||||||
|
} else if cfg!(feature="flash-recording") && srcid == StreamType::Bundle {
|
||||||
|
info!("Continuing recording stream");
|
||||||
|
spawner.spawn(record_telemetry(recording_bus.dyn_subscriber().unwrap(), stream, motion_bus.dyn_sender())).unwrap();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None if cfg!(feature="flash-recording") => {
|
||||||
|
// If we already found a recording stream, we break; out of the loop above
|
||||||
|
let reader = sim_table.append_new_stream(StreamType::Bundle).expect("Unable to create a new recording stream in the sim data partition! Is there enough free space?");
|
||||||
|
warn!("Starting new recording stream at {:#x}", reader.abs_start());
|
||||||
|
let recorder = SimDataStream::create(reader, StreamType::Bundle);
|
||||||
|
spawner.spawn(record_telemetry(recording_bus.dyn_subscriber().unwrap(), recorder, motion_bus.dyn_sender())).unwrap();
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
_ => ()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
spawner.must_spawn(print_sensor_status(predictions.dyn_subscriber().unwrap()));
|
spawner.must_spawn(wdt_task(rtc, predictions.dyn_subscriber().unwrap(), oled_controls, display_controls));
|
||||||
|
|
||||||
|
//info!("Final memory stats: {}", esp_alloc::HEAP.stats());
|
||||||
|
|
||||||
info!("Ready to rock and roll in {}ms", Instant::now().as_millis());
|
info!("Ready to rock and roll in {}ms", Instant::now().as_millis());
|
||||||
};
|
};
|
||||||
@@ -316,9 +337,17 @@ async fn wdt_task(mut wdt: Wdt<esp_hal::peripherals::TIMG1<'static>>) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[embassy_executor::task]
|
#[embassy_executor::task]
|
||||||
async fn record_telemetry(firehose: DynamicReceiver<'static, Measurement>, mut storage: SimDataRecorder<StorageRange<SharedFlash<FlashStorage>>>) {
|
async fn record_telemetry(mut firehose: DynSubscriber<'static, Measurement>, mut storage: SimDataStream<StorageRange<SharedFlash<FlashStorage<'static>>>>, motion: DynamicSender<'static, Measurement>) {
|
||||||
|
let mut skipped_events = 0;
|
||||||
|
while let Ok(Some((_, evt))) = storage.read_next() {
|
||||||
|
trace!("Skipping event {evt:?}");
|
||||||
|
skipped_events += 1;
|
||||||
|
}
|
||||||
|
info!("Skipped {} events to catch up to the end of the recording stream", skipped_events);
|
||||||
|
motion.send(Measurement::SensorHardwareStatus(SensorSource::FlashRecording, SensorState::Online)).await;
|
||||||
|
storage.write_next(AnnotationReading { buf: *b"Telemetry recording started " }).unwrap();
|
||||||
loop {
|
loop {
|
||||||
match firehose.receive().await {
|
match firehose.next_message_pure().await {
|
||||||
Measurement::IMU { accel, gyro } => {
|
Measurement::IMU { accel, gyro } => {
|
||||||
let reading = IMUReading {
|
let reading = IMUReading {
|
||||||
accel_x: accel.x as f64,
|
accel_x: accel.x as f64,
|
||||||
@@ -328,24 +357,28 @@ async fn record_telemetry(firehose: DynamicReceiver<'static, Measurement>, mut s
|
|||||||
gyro_y: gyro.y as f64,
|
gyro_y: gyro.y as f64,
|
||||||
gyro_z: gyro.z as f64
|
gyro_z: gyro.z as f64
|
||||||
};
|
};
|
||||||
storage.write_next(reading).unwrap();
|
//storage.write_next(reading).unwrap();
|
||||||
info!("Wrote IMU to flash");
|
trace!("Wrote IMU to flash");
|
||||||
},
|
},
|
||||||
_ => ()
|
Measurement::GPS(Some(pos)) => {
|
||||||
|
storage.write_next(GPSReading {
|
||||||
|
lat: pos.x,
|
||||||
|
lon: pos.y,
|
||||||
|
}).unwrap();
|
||||||
|
trace!("Wrote GPS to flash");
|
||||||
|
},
|
||||||
|
Measurement::SensorHardwareStatus(sensor, status) => {
|
||||||
|
let annotation = alloc::format!("{:?}={:?}", sensor, status);
|
||||||
|
let mut buf = [0; 32];
|
||||||
|
let bytes = annotation.as_bytes();
|
||||||
|
let copy_len = bytes.len().min(buf.len());
|
||||||
|
buf[..copy_len].copy_from_slice(&bytes[..copy_len]);
|
||||||
|
if copy_len < buf.len() {
|
||||||
|
buf[copy_len..].fill(b' ');
|
||||||
}
|
}
|
||||||
|
storage.write_next(AnnotationReading { buf }).unwrap();
|
||||||
|
trace!("Wrote sensor status update to flash");
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
#[embassy_executor::task]
|
|
||||||
async fn print_sensor_readings(mut events: DynSubscriber<'static, Measurement>) {
|
|
||||||
loop {
|
|
||||||
match events.next_message_pure().await {
|
|
||||||
Measurement::IMU { accel, gyro } => {
|
|
||||||
esp_println::println!("accel=({},{},{}) gyro=({},{},{})", accel.x, accel.y, accel.z, gyro.x, gyro.y, gyro.z);
|
|
||||||
},
|
|
||||||
Measurement::GPS(gps) => {
|
|
||||||
esp_println::println!("gps={gps:?}");
|
|
||||||
},
|
|
||||||
_ => ()
|
_ => ()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
59
src/bin/playback.rs
Normal file
59
src/bin/playback.rs
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
#![no_std]
|
||||||
|
#![no_main]
|
||||||
|
|
||||||
|
use embassy_executor::Spawner;
|
||||||
|
use esp_hal::clock::CpuClock;
|
||||||
|
use esp_hal::timer::systimer::SystemTimer;
|
||||||
|
use log::*;
|
||||||
|
use esp_backtrace as _;
|
||||||
|
|
||||||
|
use renderbug_bike::logging::RenderbugLogger;
|
||||||
|
use renderbug_bike::tasks::simulation::{SimDataStream, SimDataTable};
|
||||||
|
use renderbug_bike::simdata::*;
|
||||||
|
|
||||||
|
esp_bootloader_esp_idf::esp_app_desc!();
|
||||||
|
|
||||||
|
#[esp_rtos::main]
|
||||||
|
async fn main(spawner: Spawner) {
|
||||||
|
|
||||||
|
esp_alloc::heap_allocator!(size: 100000);
|
||||||
|
|
||||||
|
RenderbugLogger::init_logger();
|
||||||
|
|
||||||
|
let config = esp_hal::Config::default().with_cpu_clock(CpuClock::max());
|
||||||
|
let peripherals = esp_hal::init(config);
|
||||||
|
let sys_timer = SystemTimer::new(peripherals.SYSTIMER);
|
||||||
|
esp_rtos::start(sys_timer.alarm0);
|
||||||
|
|
||||||
|
let mut storage = renderbug_bike::storage::SharedFlash::new(esp_storage::FlashStorage::new(peripherals.FLASH).multicore_auto_park());
|
||||||
|
let mut partition_buf = [8; 1024];
|
||||||
|
let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut partition_buf).unwrap();
|
||||||
|
|
||||||
|
let sim_partition = SimDataTable::find_partition(storage, partitions).expect("Could not find sim data partition!");
|
||||||
|
info!("Got partition: {sim_partition:?}");
|
||||||
|
let sim_table = SimDataTable::open(sim_partition).expect("Sim data partition not found");
|
||||||
|
|
||||||
|
for (header, reader) in sim_table {
|
||||||
|
info!("Found stream {header:?}");
|
||||||
|
if header.id == StreamType::Bundle {
|
||||||
|
let mut stream = SimDataStream::open(reader, header.id);
|
||||||
|
let mut timestamp = 0;
|
||||||
|
loop {
|
||||||
|
match stream.read_next() {
|
||||||
|
Ok(Some((timecode, next_evt))) => {
|
||||||
|
timestamp += timecode.as_millis();
|
||||||
|
esp_println::println!("{timestamp} {next_evt:?}");
|
||||||
|
},
|
||||||
|
Ok(None) => {
|
||||||
|
warn!("End of simulation data stream");
|
||||||
|
break
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
warn!("Error during sensor stream: {err:?}");
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
36
src/bin/reset.rs
Normal file
36
src/bin/reset.rs
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
#![no_std]
|
||||||
|
#![no_main]
|
||||||
|
|
||||||
|
use embassy_executor::Spawner;
|
||||||
|
use esp_hal::clock::CpuClock;
|
||||||
|
use esp_hal::timer::systimer::SystemTimer;
|
||||||
|
use log::*;
|
||||||
|
use esp_backtrace as _;
|
||||||
|
|
||||||
|
use renderbug_bike::logging::RenderbugLogger;
|
||||||
|
use renderbug_bike::tasks::simulation::{SimDataStream, SimDataTable};
|
||||||
|
use renderbug_bike::simdata::*;
|
||||||
|
|
||||||
|
esp_bootloader_esp_idf::esp_app_desc!();
|
||||||
|
|
||||||
|
#[esp_rtos::main]
|
||||||
|
async fn main(spawner: Spawner) {
|
||||||
|
|
||||||
|
esp_alloc::heap_allocator!(size: 100000);
|
||||||
|
|
||||||
|
RenderbugLogger::init_logger();
|
||||||
|
|
||||||
|
let config = esp_hal::Config::default().with_cpu_clock(CpuClock::max());
|
||||||
|
let peripherals = esp_hal::init(config);
|
||||||
|
let sys_timer = SystemTimer::new(peripherals.SYSTIMER);
|
||||||
|
esp_rtos::start(sys_timer.alarm0);
|
||||||
|
|
||||||
|
let mut storage = renderbug_bike::storage::SharedFlash::new(esp_storage::FlashStorage::new(peripherals.FLASH));
|
||||||
|
let mut partition_buf = [8; 1024];
|
||||||
|
let partitions = esp_bootloader_esp_idf::partitions::read_partition_table(&mut storage, &mut partition_buf).unwrap();
|
||||||
|
|
||||||
|
let sim_partition = SimDataTable::find_partition(storage, partitions).expect("Could not find sim data partition!");
|
||||||
|
info!("Got partition: {sim_partition:?}");
|
||||||
|
SimDataTable::create(sim_partition).expect("Could not write empty sim partition");
|
||||||
|
error!("Overwrote sim data partition with a blank stream table");
|
||||||
|
}
|
||||||
@@ -47,7 +47,7 @@ pub enum Measurement {
|
|||||||
|
|
||||||
// Simulation metadata updates
|
// Simulation metadata updates
|
||||||
SimulationProgress(SensorSource, Duration, Fract8),
|
SimulationProgress(SensorSource, Duration, Fract8),
|
||||||
Annotation
|
Annotation([u8; 32])
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
#[derive(Default, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
@@ -81,6 +81,9 @@ pub enum SensorSource {
|
|||||||
// Connectivity related
|
// Connectivity related
|
||||||
Wifi,
|
Wifi,
|
||||||
|
|
||||||
|
// Data processing/logging
|
||||||
|
FlashRecording,
|
||||||
|
|
||||||
// Fusion outputs
|
// Fusion outputs
|
||||||
MotionFrame,
|
MotionFrame,
|
||||||
Location,
|
Location,
|
||||||
|
|||||||
155
src/storage.rs
155
src/storage.rs
@@ -1,13 +1,14 @@
|
|||||||
use core::{cell::RefCell, fmt::Formatter};
|
use core::{cell::RefCell, fmt::Formatter};
|
||||||
|
|
||||||
use alloc::rc::Rc;
|
use alloc::rc::Rc;
|
||||||
|
use embedded_io::{ErrorKind, ErrorType, Read, Write};
|
||||||
use embedded_storage::{ReadStorage, Storage};
|
use embedded_storage::{ReadStorage, Storage};
|
||||||
use esp_bootloader_esp_idf::partitions::PartitionTable;
|
use esp_bootloader_esp_idf::partitions::PartitionTable;
|
||||||
use esp_hal::time::Instant;
|
use esp_hal::time::Instant;
|
||||||
use log::*;
|
use log::*;
|
||||||
use rmp::{decode::{RmpRead, RmpReadErr}, encode::{RmpWrite, RmpWriteErr, ValueWriteError}};
|
use rmp::{decode::{RmpRead, RmpReadErr}, encode::{RmpWrite, RmpWriteErr, ValueWriteError}};
|
||||||
|
|
||||||
use crate::simdata::{BundleEventHeader, EventRecord, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType};
|
use crate::{simdata::{BundleEventHeader, EventRecord, EventStreamHeader, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}, tasks::simulation::Checkpoint};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SharedFlash<S> {
|
pub struct SharedFlash<S> {
|
||||||
@@ -62,12 +63,98 @@ impl<E> core::fmt::Display for StorageRangeError<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct StorageRange<S> {
|
pub struct StorageRange<S> {
|
||||||
storage: S,
|
storage: S,
|
||||||
start: usize,
|
start: usize,
|
||||||
end: usize,
|
end: usize,
|
||||||
offset: usize
|
offset: usize,
|
||||||
|
reset_pos: usize
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Checkpoint for StorageRange<S> {
|
||||||
|
fn checkpoint(&mut self) {
|
||||||
|
self.reset_pos = self.offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn rollback(&mut self) {
|
||||||
|
self.offset = self.reset_pos;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> StorageRange<S> {
|
||||||
|
pub fn inner(&self) -> &S {
|
||||||
|
&self.storage
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn inner_mut(&mut self) -> &mut S {
|
||||||
|
&mut self.storage
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn abs_start(&self) -> usize {
|
||||||
|
self.start
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn abs_end(&self) -> usize {
|
||||||
|
self.end
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: core::fmt::Debug> embedded_io::Error for StorageRangeError<E> {
|
||||||
|
fn kind(&self) -> embedded_io::ErrorKind {
|
||||||
|
ErrorKind::Other
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: ReadStorage<Error = E>, E: core::fmt::Debug> ErrorType for StorageRange<S> {
|
||||||
|
type Error = StorageRangeError<E>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Storage<Error = E> + ReadStorage<Error = E>, E: core::fmt::Debug> Write for StorageRange<S> {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
|
||||||
|
let pos = self.start + self.offset;
|
||||||
|
trace!("write {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, pos, pos + buf.len());
|
||||||
|
if pos > self.end {
|
||||||
|
Err(StorageRangeError::OutOfData)
|
||||||
|
} else {
|
||||||
|
assert!(pos + buf.len() <= self.end);
|
||||||
|
match self.storage.write(pos as u32, buf) {
|
||||||
|
Ok(_) => {
|
||||||
|
self.offset += buf.len();
|
||||||
|
Ok(buf.len())
|
||||||
|
},
|
||||||
|
Err(err) => Err(StorageRangeError::Storage(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> Result<(), Self::Error> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: ReadStorage<Error = E>, E: core::fmt::Debug> Read for StorageRange<S> {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
|
||||||
|
let pos = self.start + self.offset;
|
||||||
|
trace!("read_exact_buf {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, pos, pos + buf.len());
|
||||||
|
if pos > self.end {
|
||||||
|
Err(StorageRangeError::OutOfData)
|
||||||
|
} else {
|
||||||
|
let remaining = self.end - pos;
|
||||||
|
let max_read = buf.len().min(remaining);
|
||||||
|
if max_read == 0 {
|
||||||
|
return Err(StorageRangeError::OutOfData)
|
||||||
|
}
|
||||||
|
assert!(pos + buf.len() <= self.end);
|
||||||
|
match self.storage.read(pos as u32, &mut buf[..max_read]) {
|
||||||
|
Ok(_) => {
|
||||||
|
self.offset += max_read;
|
||||||
|
Ok(max_read)
|
||||||
|
},
|
||||||
|
Err(err) => Err(StorageRangeError::Storage(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: ReadStorage> StorageRange<S> {
|
impl<S: ReadStorage> StorageRange<S> {
|
||||||
@@ -77,7 +164,8 @@ impl<S: ReadStorage> StorageRange<S> {
|
|||||||
storage,
|
storage,
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
offset: 0
|
offset: 0,
|
||||||
|
reset_pos: 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -98,6 +186,15 @@ impl<S: ReadStorage> StorageRange<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn seek_abs(&mut self, pos: usize) -> Result<(), StorageRangeError<S::Error>> {
|
||||||
|
self.offset = pos;
|
||||||
|
if self.offset > self.end {
|
||||||
|
Err(StorageRangeError::OutOfData)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn subset(&self, size: usize) -> Result<Self, StorageRangeError<S::Error>> where S: Clone + core::fmt::Debug {
|
pub fn subset(&self, size: usize) -> Result<Self, StorageRangeError<S::Error>> where S: Clone + core::fmt::Debug {
|
||||||
trace!("subset {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, self.start + self.offset, self.start + self.offset + size);
|
trace!("subset {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, self.start + self.offset, self.start + self.offset + size);
|
||||||
if self.start + self.offset + size > self.end {
|
if self.start + self.offset + size > self.end {
|
||||||
@@ -107,7 +204,8 @@ impl<S: ReadStorage> StorageRange<S> {
|
|||||||
storage: self.storage.clone(),
|
storage: self.storage.clone(),
|
||||||
start: self.offset + self.start,
|
start: self.offset + self.start,
|
||||||
end: self.start + self.offset + size,
|
end: self.start + self.offset + size,
|
||||||
offset: 0
|
offset: 0,
|
||||||
|
reset_pos: 0
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -153,50 +251,3 @@ impl<S: Storage> RmpWrite for StorageRange<S> where S::Error: core::fmt::Debug +
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SimDataRecorder<S> {
|
|
||||||
storage: S,
|
|
||||||
last_stamp: Instant
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> SimDataRecorder<S> {
|
|
||||||
pub fn new(storage: S) -> Self {
|
|
||||||
Self {
|
|
||||||
storage,
|
|
||||||
last_stamp: Instant::now()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result<SimDataRecorder<StorageRange<S>>, SimDataError<ValueWriteError<StorageRangeError<<S as ReadStorage>::Error>>>> where S: Storage, <S as ReadStorage>::Error: core::fmt::Debug + 'static {
|
|
||||||
let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data(
|
|
||||||
esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined,
|
|
||||||
);
|
|
||||||
info!("Searching for sim data partition");
|
|
||||||
let data_partition = partitions.iter().find(|partition| {
|
|
||||||
partition.partition_type() == partition_type && partition.label_as_str() == "sim"
|
|
||||||
}).ok_or(SimDataError::PartitionNotFound)?;
|
|
||||||
|
|
||||||
let start = data_partition.offset() as usize;
|
|
||||||
let end = data_partition.len() as usize + start;
|
|
||||||
let mut writer = StorageRange::new(storage, start, end);
|
|
||||||
warn!("Writing new simulation data at {start:#02x}:{end:#02x}");
|
|
||||||
StreamIndex { count: 1 }.write_rmp(&mut writer)?;
|
|
||||||
StreamHeader { id: StreamType::Bundle, size: 0 }.write_rmp(&mut writer)?;
|
|
||||||
Ok(SimDataRecorder {
|
|
||||||
storage: writer,
|
|
||||||
last_stamp: Instant::now()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_next<T: EventRecord>(&mut self, event: T) -> Result<(), SimDataError<ValueWriteError<S::Error>>> where S: RmpWrite {
|
|
||||||
BundleEventHeader { id: T::stream_id() }.write_rmp(&mut self.storage)?;
|
|
||||||
let now = Instant::now();
|
|
||||||
let event = StreamEvent {
|
|
||||||
data: event,
|
|
||||||
timecode: (now - self.last_stamp).as_millis() as f64 / 1000.0
|
|
||||||
};
|
|
||||||
event.write_rmp(&mut self.storage)?;
|
|
||||||
self.last_stamp = now;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -5,7 +5,6 @@ pub mod gps;
|
|||||||
pub mod wifi;
|
pub mod wifi;
|
||||||
#[cfg(feature="radio")]
|
#[cfg(feature="radio")]
|
||||||
pub mod ble;
|
pub mod ble;
|
||||||
#[cfg(feature="simulation")]
|
|
||||||
pub mod simulation;
|
pub mod simulation;
|
||||||
#[cfg(feature="demo")]
|
#[cfg(feature="demo")]
|
||||||
pub mod demo;
|
pub mod demo;
|
||||||
|
|||||||
@@ -28,14 +28,16 @@ pub async fn motion_task(src: DynamicReceiver<'static, Measurement>, prediction_
|
|||||||
},
|
},
|
||||||
// FIXME: This needs harmonized with the automatic data timeout from above, somehow?
|
// FIXME: This needs harmonized with the automatic data timeout from above, somehow?
|
||||||
Measurement::SensorHardwareStatus(source, state) => {
|
Measurement::SensorHardwareStatus(source, state) => {
|
||||||
warn!("Sensor {source:?} reports {state:?}!");
|
debug!("Sensor {source:?} reports {state:?}!");
|
||||||
prediction_sink.publish(Prediction::SensorStatus(source, state)).with_timeout(TIMEOUT).await.expect("Could not update sensor status in time. Is the prediction bus stalled?");
|
prediction_sink.publish(Prediction::SensorStatus(source, state)).with_timeout(TIMEOUT).await.expect("Could not update sensor status in time. Is the prediction bus stalled?");
|
||||||
},
|
},
|
||||||
Measurement::ExternalPower(mw) => {
|
Measurement::ExternalPower(mw) => {
|
||||||
info!("Got external power change to {mw:?}");
|
info!("Got external power change to {mw:?}");
|
||||||
},
|
},
|
||||||
Measurement::SimulationProgress(source, duration, pct) => debug!("{source:?} simulation time: {} {} / 255", duration.as_secs(), pct),
|
Measurement::SimulationProgress(source, duration, pct) => debug!("{source:?} simulation time: {} {} / 255", duration.as_secs(), pct),
|
||||||
Measurement::Annotation => ()
|
Measurement::Annotation(msg) => {
|
||||||
|
info!("Annotation: {}", str::from_utf8(&msg).unwrap_or("<non-utf8-data>"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if recording_sink.try_publish(next_measurement).is_err() {
|
if recording_sink.try_publish(next_measurement).is_err() {
|
||||||
warn!("Could not publish measurement to recording bus. Is the recording bus stalled?");
|
warn!("Could not publish measurement to recording bus. Is the recording bus stalled?");
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ pub async fn sdcard_task(
|
|||||||
let stream = rootfs.open_file_in_dir("renderbug-sensors.rmp",Mode::ReadWriteCreateOrTruncate).unwrap();
|
let stream = rootfs.open_file_in_dir("renderbug-sensors.rmp",Mode::ReadWriteCreateOrTruncate).unwrap();
|
||||||
let rmp_stream = EmbeddedRmp(stream);
|
let rmp_stream = EmbeddedRmp(stream);
|
||||||
|
|
||||||
let mut recorder = SimDataRecorder::new(EmbeddedRmp(rmp_stream));
|
/*let mut recorder = SimDataStream::open(rmp_stream, StreamType::Bundle);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// FIXME: THis chunk could really go into an impl From<Measurement> for EventRecord
|
// FIXME: THis chunk could really go into an impl From<Measurement> for EventRecord
|
||||||
@@ -86,17 +86,18 @@ pub async fn sdcard_task(
|
|||||||
gyro_y: gyro.y as f64,
|
gyro_y: gyro.y as f64,
|
||||||
gyro_z: gyro.z as f64
|
gyro_z: gyro.z as f64
|
||||||
};
|
};
|
||||||
if recorder.write_next(reading).is_err() {
|
/*if recorder.write_next(reading).is_err() {
|
||||||
error!("Failed to write IMU reading to SD card");
|
error!("Failed to write IMU reading to SD card");
|
||||||
break;
|
break;
|
||||||
}
|
}*/
|
||||||
info!("Wrote IMU to SD card");
|
info!("Wrote IMU to SD card");
|
||||||
},
|
},
|
||||||
_ => ()
|
_ => ()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(recorder);
|
drop(recorder);*/
|
||||||
|
drop(rmp_stream);
|
||||||
drop(rootfs);
|
drop(rootfs);
|
||||||
drop(vol);
|
drop(vol);
|
||||||
|
|
||||||
@@ -104,13 +105,15 @@ pub async fn sdcard_task(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct EmbeddedRmp<W: embedded_io::Write>(W);
|
struct EmbeddedRmp<W>(W);
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct EmbeddedRmpError<T>(T);
|
struct EmbeddedRmpError<T>(T);
|
||||||
|
|
||||||
impl<T: core::fmt::Debug + 'static> RmpWriteErr for EmbeddedRmpError<T> {
|
impl<T: core::fmt::Debug + 'static> RmpWriteErr for EmbeddedRmpError<T> {
|
||||||
}
|
}
|
||||||
|
impl<T: core::fmt::Debug + 'static> RmpReadErr for EmbeddedRmpError<T> {
|
||||||
|
}
|
||||||
|
|
||||||
impl<T> core::fmt::Display for EmbeddedRmpError<T> where T: core::fmt::Debug {
|
impl<T> core::fmt::Display for EmbeddedRmpError<T> where T: core::fmt::Debug {
|
||||||
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
|
||||||
@@ -127,6 +130,15 @@ impl<W: embedded_io::Write> RmpWrite for EmbeddedRmp<W> where W::Error: core::fm
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<W: embedded_io::Read> RmpRead for EmbeddedRmp<W> where W::Error: core::fmt::Debug + 'static {
|
||||||
|
type Error = EmbeddedRmpError<ReadExactError<W::Error>>;
|
||||||
|
|
||||||
|
fn read_exact_buf(&mut self, buf: &mut [u8]) -> Result<(), Self::Error> {
|
||||||
|
self.0.read_exact(buf).map_err(|e| { EmbeddedRmpError(e) })?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<W: embedded_io::Write> embedded_io::ErrorType for EmbeddedRmp<W> {
|
impl<W: embedded_io::Write> embedded_io::ErrorType for EmbeddedRmp<W> {
|
||||||
type Error = W::Error;
|
type Error = W::Error;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,18 +9,46 @@ use esp_storage::FlashStorage;
|
|||||||
use figments::liber8tion::interpolate::Fract8;
|
use figments::liber8tion::interpolate::Fract8;
|
||||||
use nalgebra::{Vector2, Vector3};
|
use nalgebra::{Vector2, Vector3};
|
||||||
use log::*;
|
use log::*;
|
||||||
use rmp::{decode::ValueReadError, encode::ValueWriteError};
|
use rmp::{decode::{RmpRead, RmpReadErr, ValueReadError}, encode::{RmpWrite, RmpWriteErr, ValueWriteError}};
|
||||||
|
|
||||||
use crate::{Breaker, events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, BundleEventHeader, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}, storage::{SharedFlash, StorageRange, StorageRangeError}};
|
use crate::{Breaker, events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, BundleEventHeader, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}, storage::{SharedFlash, StorageRange, StorageRangeError}};
|
||||||
|
|
||||||
pub struct SimDataTable<S> {
|
pub struct SimDataTable<S> {
|
||||||
reader: StorageRange<S>,
|
reader: S,
|
||||||
count: usize,
|
count: usize,
|
||||||
index: usize
|
index: usize
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: ReadStorage> SimDataTable<S> where S::Error: core::fmt::Debug + 'static {
|
impl<S: RmpRead<Error = E>, E: RmpReadErr + RmpWriteErr> SimDataTable<S> {
|
||||||
pub fn open(storage: S, partitions: PartitionTable<'_>) -> Result<Self, SimDataError<S>> {
|
pub fn open(mut reader: S) -> Result<Self, SimDataError<E>> {
|
||||||
|
if let Ok(index) = StreamIndex::from_rmp(&mut reader) {
|
||||||
|
info!("Found stream index: {index:?}");
|
||||||
|
Ok(Self {
|
||||||
|
reader,
|
||||||
|
count: index.count,
|
||||||
|
index: 0
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Err(SimDataError::StreamIndexMissing)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: RmpWrite<Error = E>, E: RmpReadErr + RmpWriteErr> SimDataTable<S> {
|
||||||
|
pub fn create(mut writer: S) -> Result<Self, SimDataError<E>> {
|
||||||
|
// FIXME: The stream header should use a fixed size integer for the count, instead of the dynamic array length type
|
||||||
|
info!("Writing new stream index");
|
||||||
|
StreamIndex { count: 0 }.write_rmp(&mut writer)?;
|
||||||
|
Ok(Self {
|
||||||
|
reader: writer,
|
||||||
|
count: 0,
|
||||||
|
index: 0
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: ReadStorage<Error = E>, E: core::fmt::Debug + 'static> SimDataTable<StorageRange<S>> {
|
||||||
|
pub fn find_partition(storage: S, partitions: PartitionTable<'_>) -> Result<StorageRange<S>, SimDataError<StorageRangeError<E>>> {
|
||||||
let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data(
|
let partition_type = esp_bootloader_esp_idf::partitions::PartitionType::Data(
|
||||||
esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined,
|
esp_bootloader_esp_idf::partitions::DataPartitionSubType::Undefined,
|
||||||
);
|
);
|
||||||
@@ -32,29 +60,41 @@ impl<S: ReadStorage> SimDataTable<S> where S::Error: core::fmt::Debug + 'static
|
|||||||
let start = data_partition.offset() as usize;
|
let start = data_partition.offset() as usize;
|
||||||
let end = data_partition.len() as usize + start;
|
let end = data_partition.len() as usize + start;
|
||||||
info!("Opening simulation data at {start:#02x}:{end:#02x}");
|
info!("Opening simulation data at {start:#02x}:{end:#02x}");
|
||||||
let mut reader = StorageRange::new(storage, start, end);
|
Ok(StorageRange::new(storage, start, end))
|
||||||
if let Ok(index) = StreamIndex::from_rmp(&mut reader) {
|
|
||||||
info!("Found stream index: {index:?}");
|
|
||||||
Ok(Self {
|
|
||||||
reader,
|
|
||||||
count: index.count,
|
|
||||||
index: 0
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
error!("Stream index is missing! Have you flashed the partition yet?");
|
|
||||||
Err(SimDataError::StreamIndexMissing)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: ReadStorage + Clone + core::fmt::Debug> Iterator for SimDataTable<S> where S::Error: core::fmt::Debug + 'static {
|
impl<S: Storage<Error = E> + Clone + core::fmt::Debug, E: core::fmt::Debug + 'static> SimDataTable<StorageRange<S>> {
|
||||||
type Item = SimDataReader<S>;
|
pub fn append_new_stream(&mut self, streamid: StreamType) -> Result<StorageRange<S>, SimDataError<StorageRangeError<E>>> {
|
||||||
|
let stream_size = self.reader.capacity() - self.reader.pos() - 6;
|
||||||
|
if stream_size < 1024 {
|
||||||
|
error!("Not enough space left in the simulation data partition to create a new stream!");
|
||||||
|
return Err(SimDataError::NotEnoughSpace);
|
||||||
|
}
|
||||||
|
|
||||||
|
StreamHeader { id: streamid, size: stream_size }.write_rmp(&mut self.reader)?;
|
||||||
|
self.count += 1;
|
||||||
|
self.index += 1;
|
||||||
|
let reset_pos = self.reader.pos();
|
||||||
|
self.reader.seek_abs(0).unwrap();
|
||||||
|
StreamIndex { count: self.count }.write_rmp(&mut self.reader)?;
|
||||||
|
self.reader.seek_abs(reset_pos).unwrap();
|
||||||
|
|
||||||
|
info!("Creating new stream id={streamid:?} capacity={stream_size:#02x} range={:#02x}:{:#02x}", self.reader.abs_start() + self.reader.pos(), self.reader.abs_start() + stream_size);
|
||||||
|
|
||||||
|
Ok(self.reader.subset(stream_size).unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: ReadStorage<Error = E> + Clone + core::fmt::Debug, E: core::fmt::Debug + 'static> Iterator for SimDataTable<StorageRange<S>> {
|
||||||
|
type Item = (StreamHeader, StorageRange<S>);
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
if self.index >= self.count {
|
if self.index >= self.count {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
loop {
|
loop {
|
||||||
|
let reset_pos = self.reader.pos();
|
||||||
match StreamHeader::from_rmp(&mut self.reader) {
|
match StreamHeader::from_rmp(&mut self.reader) {
|
||||||
Ok(header) => {
|
Ok(header) => {
|
||||||
info!("Found stream header: {header:?}");
|
info!("Found stream header: {header:?}");
|
||||||
@@ -66,7 +106,7 @@ impl<S: ReadStorage + Clone + core::fmt::Debug> Iterator for SimDataTable<S> whe
|
|||||||
});
|
});
|
||||||
self.index += 1;
|
self.index += 1;
|
||||||
debug!("Found header={header:?}");
|
debug!("Found header={header:?}");
|
||||||
return Some(SimDataReader::open(sensor_reader, header.id));
|
return Some((header, sensor_reader));
|
||||||
},
|
},
|
||||||
err => {
|
err => {
|
||||||
error!("Could not open the next simulation data chunk: {err:?}");
|
error!("Could not open the next simulation data chunk: {err:?}");
|
||||||
@@ -83,6 +123,8 @@ impl<S: ReadStorage + Clone + core::fmt::Debug> Iterator for SimDataTable<S> whe
|
|||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Read error while reading next chunk in simulation stream table {err:?}");
|
error!("Read error while reading next chunk in simulation stream table {err:?}");
|
||||||
|
// Reset back to right before we tried reading the stream header, so that append_new_stream() places it correctly.
|
||||||
|
self.reader.seek_abs(reset_pos).unwrap();
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -91,10 +133,11 @@ impl<S: ReadStorage + Clone + core::fmt::Debug> Iterator for SimDataTable<S> whe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SimDataReader<S> {
|
pub struct SimDataStream<S> {
|
||||||
reader: StorageRange<S>,
|
reader: S,
|
||||||
srcid: StreamType,
|
srcid: StreamType,
|
||||||
runtime: Duration,
|
last_stamp: Instant,
|
||||||
|
//runtime: Duration,
|
||||||
event_count: usize,
|
event_count: usize,
|
||||||
index: usize
|
index: usize
|
||||||
}
|
}
|
||||||
@@ -116,60 +159,117 @@ impl From<GPSReading> for Measurement {
|
|||||||
|
|
||||||
impl From<AnnotationReading> for Measurement {
|
impl From<AnnotationReading> for Measurement {
|
||||||
fn from(value: AnnotationReading) -> Self {
|
fn from(value: AnnotationReading) -> Self {
|
||||||
warn!("ANNOTATION: {}", core::str::from_utf8(&value.buf).unwrap());
|
Measurement::Annotation(value.buf)
|
||||||
Measurement::Annotation
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: ReadStorage> SimDataReader<S> where S::Error: core::fmt::Debug + 'static {
|
impl<S> SimDataStream<S> {
|
||||||
pub fn open(mut reader: StorageRange<S>, stream_type: StreamType) -> Self {
|
pub fn srcid(&self) -> StreamType {
|
||||||
debug!("Opening {stream_type:?} sim data chunk");
|
self.srcid
|
||||||
let event_count = if stream_type != StreamType::Bundle { EventStreamHeader::from_rmp(&mut reader).unwrap().count } else { usize::MAX };
|
}
|
||||||
debug!("Found {event_count} events!");
|
}
|
||||||
|
|
||||||
|
pub trait Checkpoint {
|
||||||
|
fn checkpoint(&mut self);
|
||||||
|
fn rollback(&mut self);
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Checkpoint + RmpRead<Error = E>, E: RmpWriteErr + RmpReadErr> SimDataStream<S> {
|
||||||
|
pub fn open(mut reader: S, stream_type: StreamType) -> Self {
|
||||||
|
info!("Opening {stream_type:?} sim data chunk");
|
||||||
|
let event_count = EventStreamHeader::from_rmp(&mut reader).unwrap().count;
|
||||||
|
info!("Found {event_count} events!");
|
||||||
Self {
|
Self {
|
||||||
reader,
|
reader,
|
||||||
srcid: stream_type,
|
srcid: stream_type,
|
||||||
runtime: Default::default(),
|
//runtime: Default::default(),
|
||||||
|
last_stamp: Instant::now(),
|
||||||
event_count,
|
event_count,
|
||||||
index: 0
|
index: 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn srcid(&self) -> StreamType {
|
fn read_next_event<T: EventRecord + Into<Measurement>>(&mut self) -> Result<(Duration, Measurement), SimDataError<E>> {
|
||||||
self.srcid
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_next_event<T: EventRecord + Into<Measurement>>(&mut self) -> Result<Measurement, SimDataError<ValueReadError<StorageRangeError<S::Error>>>> {
|
|
||||||
let event = StreamEvent::<T>::from_rmp(&mut self.reader)?;
|
let event = StreamEvent::<T>::from_rmp(&mut self.reader)?;
|
||||||
let delay = embassy_time::Duration::from_millis((event.timecode * 1000.0) as u64);
|
let delay = embassy_time::Duration::from_millis((event.timecode * 1000.0) as u64);
|
||||||
self.runtime += delay;
|
Ok((delay, event.data.into()))
|
||||||
info!("waiting {delay}");
|
|
||||||
Timer::after(delay).await;
|
|
||||||
Ok(event.data.into())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read_next(&mut self) -> Result<Option<Measurement>, SimDataError<ValueReadError<StorageRangeError<S::Error>>>> {
|
pub fn read_next(&mut self) -> Result<Option<(Duration, Measurement)>, SimDataError<E>> {
|
||||||
if self.index < self.event_count {
|
if self.index < self.event_count {
|
||||||
self.index += 1;
|
self.index += 1;
|
||||||
|
self.reader.checkpoint();
|
||||||
let next_id = match self.srcid {
|
let next_id = match self.srcid {
|
||||||
StreamType::Bundle => BundleEventHeader::from_rmp(&mut self.reader)?.id,
|
StreamType::Bundle => BundleEventHeader::from_rmp(&mut self.reader).inspect_err(|_| { self.reader.rollback() })?.id,
|
||||||
_ => self.srcid
|
_ => self.srcid
|
||||||
};
|
};
|
||||||
// The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some()
|
// The read_* functions can only ever return a valid result, or a data/reading error, so we map them into a Some()
|
||||||
Ok(Some(match next_id {
|
let (delay, evt) = match next_id {
|
||||||
StreamType::IMU => self.read_next_event::<IMUReading>().await?,
|
StreamType::IMU => self.read_next_event::<IMUReading>().inspect_err(|_| { self.reader.rollback() })?,
|
||||||
StreamType::GPS => self.read_next_event::<GPSReading>().await?,
|
StreamType::GPS => self.read_next_event::<GPSReading>().inspect_err(|_| { self.reader.rollback() })?,
|
||||||
StreamType::Annotations => self.read_next_event::<AnnotationReading>().await?,
|
StreamType::Annotations => self.read_next_event::<AnnotationReading>().inspect_err(|_| { self.reader.rollback() })?,
|
||||||
srcid => unimplemented!("{srcid:?} is not a simulatable sensor yet!")
|
srcid => unimplemented!("{srcid:?} is not a simulatable sensor yet!")
|
||||||
}))
|
};
|
||||||
|
Ok(Some((delay, evt)))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read_next_and_wait(&mut self) -> Result<Option<Measurement>, SimDataError<E>> {
|
||||||
|
if let Some((delay, evt)) = self.read_next()? {
|
||||||
|
trace!("waiting {delay}");
|
||||||
|
Timer::after(delay).await;
|
||||||
|
self.last_stamp = Instant::now();
|
||||||
|
Ok(Some(evt))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<S: Storage<Error = SE>, SE: core::fmt::Debug + 'static> SimDataStream<StorageRange<S>> {
|
||||||
|
pub fn write_next<T: EventRecord>(&mut self, event: T) -> Result<(), SimDataError<StorageRangeError<SE>>> {
|
||||||
|
BundleEventHeader { id: T::stream_id() }.write_rmp(&mut self.reader)?;
|
||||||
|
let now = Instant::now();
|
||||||
|
let event = StreamEvent {
|
||||||
|
data: event,
|
||||||
|
timecode: (now - self.last_stamp).as_millis() as f64 / 1000.0
|
||||||
|
};
|
||||||
|
event.write_rmp(&mut self.reader)?;
|
||||||
|
self.last_stamp = now;
|
||||||
|
self.event_count = self.event_count.checked_add(1).ok_or(SimDataError::NotEnoughSpace)?;
|
||||||
|
|
||||||
|
self.write_header()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_header(&mut self) -> Result<(), SimDataError<StorageRangeError<SE>>> {
|
||||||
|
let reset_pos = self.reader.pos();
|
||||||
|
self.reader.seek_abs(0).unwrap();
|
||||||
|
trace!("Writing event stream header with count={} to {:#x}", self.event_count, self.reader.abs_start());
|
||||||
|
EventStreamHeader { count: self.event_count }.write_rmp(&mut self.reader)?;
|
||||||
|
self.reader.seek_abs(reset_pos).unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create(mut storage: StorageRange<S>, stream_type: StreamType) -> Self {
|
||||||
|
storage.seek_abs(0).unwrap();
|
||||||
|
EventStreamHeader { count: 0 }.write_rmp(&mut storage).unwrap();
|
||||||
|
Self {
|
||||||
|
reader: storage,
|
||||||
|
srcid: stream_type,
|
||||||
|
//runtime: Default::default(),
|
||||||
|
last_stamp: Instant::now(),
|
||||||
|
event_count: 0,
|
||||||
|
index: 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[embassy_executor::task(pool_size = 3)]
|
#[embassy_executor::task(pool_size = 3)]
|
||||||
pub async fn simulation_task(mut reader: SimDataReader<SharedFlash<FlashStorage>>, events: DynamicSender<'static, Measurement>) {
|
pub async fn simulation_task(mut reader: SimDataStream<StorageRange<SharedFlash<FlashStorage<'static>>>>, events: DynamicSender<'static, Measurement>) {
|
||||||
warn!("Starting simulation for {:?}", reader.srcid());
|
warn!("Starting simulation for {:?}", reader.srcid());
|
||||||
|
|
||||||
events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::AcquiringFix)).await;
|
events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::AcquiringFix)).await;
|
||||||
@@ -180,7 +280,7 @@ pub async fn simulation_task(mut reader: SimDataReader<SharedFlash<FlashStorage>
|
|||||||
|
|
||||||
// TODO: SimulationProgress updates
|
// TODO: SimulationProgress updates
|
||||||
loop {
|
loop {
|
||||||
match reader.read_next().await {
|
match reader.read_next_and_wait().await {
|
||||||
Ok(Some(next_evt)) => {
|
Ok(Some(next_evt)) => {
|
||||||
events.send(next_evt).await;
|
events.send(next_evt).await;
|
||||||
let pct = (idx as f32) / (reader.event_count as f32);
|
let pct = (idx as f32) / (reader.event_count as f32);
|
||||||
|
|||||||
Reference in New Issue
Block a user