tasks: simulation: give even more accurate error reporting
This commit is contained in:
@@ -11,7 +11,7 @@ use nalgebra::{Vector2, Vector3};
|
|||||||
use log::*;
|
use log::*;
|
||||||
use rmp::decode::{RmpRead, RmpReadErr, ValueReadError};
|
use rmp::decode::{RmpRead, RmpReadErr, ValueReadError};
|
||||||
|
|
||||||
use crate::{events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}};
|
use crate::{Breaker, events::{Measurement, SensorSource, SensorState}, simdata::{AnnotationReading, EventRecord, EventStreamHeader, GPSReading, IMUReading, RmpData, SimDataError, StreamEvent, StreamHeader, StreamIndex, StreamType}};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SharedFlash<S> {
|
pub struct SharedFlash<S> {
|
||||||
@@ -99,24 +99,28 @@ impl<S: ReadStorage + Clone + core::fmt::Debug> Iterator for SimDataTable<S> whe
|
|||||||
Ok(header) => {
|
Ok(header) => {
|
||||||
match self.reader.subset(header.size) {
|
match self.reader.subset(header.size) {
|
||||||
Ok(sensor_reader) => {
|
Ok(sensor_reader) => {
|
||||||
self.reader.seek(header.size);
|
self.reader.seek(header.size).unwrap_or_else(|err| {
|
||||||
|
error!("Simulation data appears bigger than the storage capacity: {err:?}")
|
||||||
|
});
|
||||||
self.index += 1;
|
self.index += 1;
|
||||||
debug!("Found header={header:?}");
|
debug!("Found header={header:?}");
|
||||||
return Some(SimDataReader::open(sensor_reader, header.id));
|
return Some(SimDataReader::open(sensor_reader, header.id));
|
||||||
},
|
},
|
||||||
err => {
|
err => {
|
||||||
error!("Read error while opening simulation stream {err:?}");
|
error!("Could not open the next simulation data chunk: {err:?}");
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(SimDataError::UnsupportedStreamType(meta)) => {
|
Err(SimDataError::UnsupportedStreamType(meta)) => {
|
||||||
error!("Found unknown simulation data chunk {meta:?}");
|
error!("Found unknown simulation data chunk {meta:?}");
|
||||||
self.reader.seek(meta.size as usize);
|
self.reader.seek(meta.size as usize).unwrap_or_else(|err| {
|
||||||
|
error!("Simulation data appears bigger than the storage capacity: {err:?}")
|
||||||
|
});
|
||||||
self.index += 1;
|
self.index += 1;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Read error while decoding simulation stream table {err:?}");
|
error!("Read error while reading next chunk in simulation stream table {err:?}");
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -145,13 +149,18 @@ impl<S: ReadStorage> RangeReader<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn seek(&mut self, offset: usize) {
|
pub fn seek(&mut self, offset: usize) -> Result<(), RangeReadError<S::Error>> {
|
||||||
self.offset += offset;
|
self.offset += offset;
|
||||||
|
if self.offset > self.end {
|
||||||
|
Err(RangeReadError::OutOfData)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn subset(&self, size: usize) -> Result<Self, SimDataError<S::Error>> where S: Clone + core::fmt::Debug {
|
pub fn subset(&self, size: usize) -> Result<Self, SimDataError<S::Error>> where S: Clone + core::fmt::Debug {
|
||||||
trace!("subset {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, self.start + self.offset, self.start + self.offset + size);
|
trace!("subset {:#02x}:{:#02x} -> {:#02x}:{:#02x}", self.start, self.end, self.start + self.offset, self.start + self.offset + size);
|
||||||
if self.start + self.offset + size < self.end {
|
if self.start + self.offset + size > self.end {
|
||||||
Err(SimDataError::EndOfStream)
|
Err(SimDataError::EndOfStream)
|
||||||
} else {
|
} else {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
@@ -276,10 +285,21 @@ pub async fn simulation_task(mut reader: SimDataReader<SharedFlash<FlashStorage>
|
|||||||
events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::AcquiringFix)).await;
|
events.send(Measurement::SensorHardwareStatus(SensorSource::Simulation, SensorState::AcquiringFix)).await;
|
||||||
events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Online)).await;
|
events.send(Measurement::SensorHardwareStatus(reader.srcid(), SensorState::Online)).await;
|
||||||
|
|
||||||
|
let mut idx = 0;
|
||||||
|
let mut idx_breaker = Breaker::default();
|
||||||
|
|
||||||
// TODO: SimulationProgress updates
|
// TODO: SimulationProgress updates
|
||||||
loop {
|
loop {
|
||||||
match reader.read_next().await {
|
match reader.read_next().await {
|
||||||
Ok(Some(next_evt)) => events.send(next_evt).await,
|
Ok(Some(next_evt)) => {
|
||||||
|
events.send(next_evt).await;
|
||||||
|
let pct = (idx as f32) / (reader.event_count as f32);
|
||||||
|
idx_breaker.set((pct * 255.0) as u8);
|
||||||
|
if let Some(pct) = idx_breaker.read_tripped() {
|
||||||
|
events.send(Measurement::SimulationProgress(reader.srcid(), reader.runtime, pct)).await;
|
||||||
|
}
|
||||||
|
idx += 1;
|
||||||
|
},
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
warn!("End of simulation data stream");
|
warn!("End of simulation data stream");
|
||||||
break
|
break
|
||||||
|
|||||||
Reference in New Issue
Block a user