task: rewrite event handling as a step towards event-based subscriptions
This commit is contained in:
@@ -1,9 +1,5 @@
|
||||
use core::borrow::BorrowMut;
|
||||
use std::sync::Arc;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Mutex;
|
||||
use std::thread::JoinHandle;
|
||||
use std::thread::ScopedJoinHandle;
|
||||
|
||||
use chrono::DateTime;
|
||||
use chrono::Timelike;
|
||||
@@ -16,11 +12,11 @@ use esp_idf_svc::hal::prelude::Peripherals;
|
||||
use esp_idf_svc::hal::rmt::RMT;
|
||||
use esp_idf_svc::hal::task::thread::ThreadSpawnConfiguration;
|
||||
use esp_idf_svc::mqtt::client::EspMqttClient;
|
||||
use esp_idf_svc::mqtt::client::EspMqttConnection;
|
||||
use esp_idf_svc::mqtt::client::MqttClientConfiguration;
|
||||
use esp_idf_svc::netif::IpEvent;
|
||||
use esp_idf_svc::nvs::{EspDefaultNvsPartition, EspNvsPartition, NvsDefault};
|
||||
use esp_idf_svc::sntp::EspSntp;
|
||||
use esp_idf_svc::sntp::SyncStatus;
|
||||
use esp_idf_svc::sys::esp_efuse_mac_get_default;
|
||||
use esp_idf_svc::wifi::{AuthMethod, ClientConfiguration, Configuration, EspWifi, WifiEvent};
|
||||
use rgb::Rgb;
|
||||
@@ -33,6 +29,7 @@ use crate::buffers::BufferedSurfacePool;
|
||||
use crate::buffers::Pixbuf;
|
||||
use crate::events::Event;
|
||||
use crate::events::EventBus;
|
||||
use crate::events::Variant;
|
||||
use crate::lib8::interpolate::lerp8by8;
|
||||
use crate::mappings::StrideMapping;
|
||||
use crate::task::FixedSizeScheduler;
|
||||
@@ -59,7 +56,7 @@ impl Board for Esp32Board {
|
||||
}
|
||||
return u64::from_be_bytes(chip_id);
|
||||
}
|
||||
|
||||
|
||||
fn take() -> Self {
|
||||
// It is necessary to call this function once. Otherwise some patches to the runtime
|
||||
// implemented by esp-idf-sys might not link properly. See https://github.com/esp-rs/esp-idf-template/issues/71
|
||||
@@ -73,7 +70,7 @@ impl Board for Esp32Board {
|
||||
|
||||
Esp32Board {
|
||||
modem: Some(peripherals.modem),
|
||||
sys_loop: sys_loop.clone(),
|
||||
sys_loop: sys_loop,
|
||||
surfaces: BufferedSurfacePool::new(),
|
||||
pins: Some(peripherals.pins),
|
||||
rmt: Some(peripherals.rmt)
|
||||
@@ -178,7 +175,7 @@ impl Board for Esp32Board {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[derive(Debug, Clone)]
|
||||
struct ScheduleEntry {
|
||||
hour: u8,
|
||||
brightness: u8
|
||||
@@ -192,7 +189,7 @@ struct CircadianRhythm {
|
||||
impl CircadianRhythm {
|
||||
fn new() -> Self {
|
||||
CircadianRhythm {
|
||||
time_check: Periodically::new_every_n_seconds(5),
|
||||
time_check: Periodically::new_every_n_seconds(60),
|
||||
schedule: [
|
||||
ScheduleEntry { hour: 0, brightness: 0 },
|
||||
ScheduleEntry { hour: 5, brightness: 0 },
|
||||
@@ -208,11 +205,17 @@ impl CircadianRhythm {
|
||||
}
|
||||
}
|
||||
|
||||
fn update_brightness(&self, bus: &mut EventBus) {
|
||||
let now: DateTime<Utc> = std::time::SystemTime::now().into();
|
||||
let next_brightness = self.brightness_for_time(now.hour() as u8, now.minute() as u8);
|
||||
bus.push(Event::new_property_change("output.brightness", next_brightness));
|
||||
}
|
||||
|
||||
fn brightness_for_time(&self, hour: u8, minute: u8) -> u8 {
|
||||
let mut start = self.schedule.last().unwrap();
|
||||
let mut end = self.schedule.first().unwrap();
|
||||
for cur in self.schedule.iter() {
|
||||
if (cur.hour <= hour ) {
|
||||
if cur.hour <= hour {
|
||||
start = cur;
|
||||
} else {
|
||||
end = cur;
|
||||
@@ -252,11 +255,20 @@ fn map_range(x: u16, in_min: u16, in_max: u16, out_min: u16, out_max: u16) -> u1
|
||||
|
||||
|
||||
impl Task for CircadianRhythm {
|
||||
fn tick(&mut self, event: &Event, bus: &mut EventBus) {
|
||||
if self.time_check.tick() || event.eq(&Event::ReadyToRock) {
|
||||
let now: DateTime<Utc> = std::time::SystemTime::now().into();
|
||||
let next_brightness = self.brightness_for_time(now.hour() as u8, now.minute() as u8);
|
||||
bus.push(Event::new_property_change("output.brightness", next_brightness));
|
||||
fn on_ready(&mut self, bus: &mut EventBus) {
|
||||
self.update_brightness(bus);
|
||||
}
|
||||
|
||||
fn on_property_change(&mut self, key: &'static str, value: &Variant, bus: &mut EventBus) {
|
||||
match (key, value) {
|
||||
("system.time.synchronized", Variant::Boolean(true)) => self.update_brightness(bus),
|
||||
_ => ()
|
||||
}
|
||||
}
|
||||
|
||||
fn on_tick(&mut self, bus: &mut EventBus) {
|
||||
if self.time_check.tick() {
|
||||
self.update_brightness(bus);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -270,7 +282,7 @@ impl MqttTask {
|
||||
fn new() -> Self {
|
||||
MqttTask {
|
||||
conn_thread: None,
|
||||
client: None
|
||||
client: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,39 +304,50 @@ impl MqttTask {
|
||||
}).unwrap());
|
||||
self.client = Some(client);
|
||||
}
|
||||
|
||||
fn topic_prefix(bus: &EventBus) -> String {
|
||||
let chip_id: u64 = bus.property("system.board.chip_id").unwrap().into();
|
||||
|
||||
format!("homeassistant-test/renderbug/{:X}", chip_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl Task for MqttTask {
|
||||
fn tick(&mut self, event: &Event, bus: &mut EventBus) {
|
||||
match event {
|
||||
Event::Input(crate::events::InputEvent::NetworkOnline) => {
|
||||
fn start(&mut self, bus: &mut EventBus) {
|
||||
bus.push(Event::new_property_change("mqtt.online", false));
|
||||
}
|
||||
|
||||
fn on_property_change(&mut self, key: &'static str, value: &Variant, bus: &mut EventBus) {
|
||||
match (key, value) {
|
||||
("system.network.online", Variant::Boolean(true)) => {
|
||||
log::info!("Registering with MQTT");
|
||||
|
||||
self.start_mqtt();
|
||||
|
||||
if let Some(ref mut client) = self.client {
|
||||
client.enqueue(
|
||||
"homeassistant-test/renderbug/rust",
|
||||
Self::topic_prefix(bus).as_str(),
|
||||
esp_idf_svc::mqtt::client::QoS::AtLeastOnce,
|
||||
false,
|
||||
"hello, world".as_bytes()
|
||||
).unwrap();
|
||||
log::info!("MQTT should be online!");
|
||||
|
||||
bus.push(Event::new_property_change("mqtt.online", true));
|
||||
}
|
||||
},
|
||||
Event::PropertyChange(name, value) => {
|
||||
(name, value) => {
|
||||
if let Some(ref mut client) = self.client {
|
||||
let payload = format!("name={} value={:?}", name, value);
|
||||
let prefix = Self::topic_prefix(bus);
|
||||
|
||||
client.enqueue(
|
||||
"homeassistant-test/renderbug/rust/property-change",
|
||||
format!("{}/properties/{}", prefix, name).as_str(),
|
||||
esp_idf_svc::mqtt::client::QoS::AtLeastOnce,
|
||||
false,
|
||||
payload.as_bytes()
|
||||
format!("{}", value).as_bytes()
|
||||
).unwrap();
|
||||
log::info!("property change bump: {}", payload);
|
||||
}
|
||||
}
|
||||
_ => ()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -335,33 +358,36 @@ impl Debug for WifiTask {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Copy)]
|
||||
enum WifiState {
|
||||
Stopped,
|
||||
Disconnected,
|
||||
Connecting,
|
||||
Connected
|
||||
}
|
||||
|
||||
struct WifiTask {
|
||||
wifi: EspWifi<'static>,
|
||||
ntp: EspSntp<'static>,
|
||||
connection_check: Periodically,
|
||||
state: Arc<Mutex<WifiState>>,
|
||||
last_state: WifiState,
|
||||
sys_loop: EspSystemEventLoop,
|
||||
wifi_sub: Option<EspSubscription<'static, System>>,
|
||||
ip_sub: Option<EspSubscription<'static, System>>
|
||||
ip_sub: Option<EspSubscription<'static, System>>,
|
||||
}
|
||||
|
||||
impl WifiTask {
|
||||
fn new(modem: Modem, sys_loop: EspSystemEventLoop, nvs: &EspNvsPartition<NvsDefault>) -> Self {
|
||||
log::info!("Installing wifi driver");
|
||||
let mut wifi = EspWifi::new(
|
||||
let wifi = EspWifi::new(
|
||||
modem,
|
||||
sys_loop.clone(),
|
||||
Some(nvs.clone())
|
||||
).unwrap();
|
||||
|
||||
WifiTask {
|
||||
wifi,
|
||||
ntp: EspSntp::new_default().unwrap(),
|
||||
connection_check: Periodically::new_every_n_seconds(1),
|
||||
sys_loop,
|
||||
wifi_sub: None,
|
||||
ip_sub: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn connect(&mut self) {
|
||||
log::info!("Connecting wifi");
|
||||
let wifi_config = Configuration::Client(ClientConfiguration {
|
||||
ssid: "The Frequency".try_into().unwrap(),
|
||||
bssid: None,
|
||||
@@ -370,22 +396,7 @@ impl WifiTask {
|
||||
channel: None,
|
||||
..Default::default()
|
||||
});
|
||||
wifi.set_configuration(&wifi_config).unwrap();
|
||||
|
||||
WifiTask {
|
||||
wifi,
|
||||
ntp: EspSntp::new_default().unwrap(),
|
||||
connection_check: Periodically::new_every_n_seconds(1),
|
||||
state: Arc::new(Mutex::new(WifiState::Stopped)),
|
||||
last_state: WifiState::Stopped,
|
||||
sys_loop,
|
||||
wifi_sub: None,
|
||||
ip_sub: None
|
||||
}
|
||||
}
|
||||
|
||||
fn connect(&mut self) {
|
||||
log::info!("Connecting wifi");
|
||||
self.wifi.set_configuration(&wifi_config).unwrap();
|
||||
self.wifi.start().unwrap();
|
||||
self.wifi.connect().unwrap();
|
||||
}
|
||||
@@ -398,71 +409,44 @@ impl WifiTask {
|
||||
}
|
||||
|
||||
impl Task for WifiTask {
|
||||
fn start(&mut self) {
|
||||
fn start(&mut self, bus: &mut EventBus) {
|
||||
log::info!("Starting wifi!");
|
||||
|
||||
let wifi_state = self.state.clone();
|
||||
let mut wifi_bus = bus.clone();
|
||||
self.wifi_sub = Some(self.sys_loop.subscribe::<WifiEvent, _>( move |evt| {
|
||||
log::warn!("wifi event {:?}", evt);
|
||||
let next_state = match evt {
|
||||
WifiEvent::StaStopped => Some(WifiState::Disconnected),
|
||||
WifiEvent::StaDisconnected => Some(WifiState::Disconnected),
|
||||
WifiEvent::StaStarted => Some(WifiState::Connecting),
|
||||
WifiEvent::StaConnected => Some(WifiState::Connecting),
|
||||
_ => None
|
||||
};
|
||||
|
||||
match next_state {
|
||||
Some(s) => {
|
||||
let mut state = wifi_state.lock().unwrap();
|
||||
*state = s
|
||||
},
|
||||
None => ()
|
||||
}
|
||||
log::debug!("wifi event {:?}", evt);
|
||||
wifi_bus.push(Event::new_property_change("system.network.online", false));
|
||||
}).unwrap());
|
||||
let ip_state = self.state.clone();
|
||||
let mut ip_bus = bus.clone();
|
||||
self.ip_sub = Some(self.sys_loop.subscribe::<IpEvent, _>(move |evt| {
|
||||
log::warn!("ip event {:?}", evt);
|
||||
log::debug!("ip event {:?}", evt);
|
||||
match evt {
|
||||
IpEvent::DhcpIpAssigned(_) => {
|
||||
let mut state = ip_state.lock().unwrap();
|
||||
*state = WifiState::Connected;
|
||||
IpEvent::DhcpIpAssigned(addr) => {
|
||||
ip_bus.push(Event::new_property_change("system.network.ip", addr.ip().to_string()));
|
||||
ip_bus.push(Event::new_property_change("system.network.gateway", addr.gateway().to_string()));
|
||||
ip_bus.push(Event::new_property_change("system.network.online", true));
|
||||
},
|
||||
_ => ()
|
||||
}
|
||||
}).unwrap());
|
||||
|
||||
self.connect();
|
||||
}
|
||||
|
||||
fn tick(&mut self, event: &Event, bus: &mut EventBus) {
|
||||
fn on_tick(&mut self, bus: &mut EventBus) {
|
||||
if self.connection_check.tick() {
|
||||
let cur_state = *self.state.lock().unwrap();
|
||||
|
||||
if self.last_state != cur_state {
|
||||
match cur_state {
|
||||
WifiState::Connected => log::info!("Wifi connected!"),
|
||||
WifiState::Connecting => log::info!("Connecting!"),
|
||||
WifiState::Stopped => log::info!("Stopped!"),
|
||||
WifiState::Disconnected => log::info!("Disconnected!")
|
||||
}
|
||||
|
||||
log::info!("online: {:?}", cur_state);
|
||||
|
||||
self.last_state = cur_state;
|
||||
|
||||
match cur_state {
|
||||
WifiState::Connected => bus.push(Event::new_input_event(crate::events::InputEvent::NetworkOnline)),
|
||||
_ => bus.push(Event::new_input_event(crate::events::InputEvent::NetworkOffline))
|
||||
if bus.property("system.network.online").unwrap() == Variant::Boolean(true) {
|
||||
match self.ntp.get_sync_status() {
|
||||
SyncStatus::Completed => bus.push(Event::new_property_change("system.time.synchronized", true)),
|
||||
_ => bus.push(Event::new_property_change("system.time.synchronized", false))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn stop(&mut self) {
|
||||
fn stop(&mut self, bus: &mut EventBus) {
|
||||
log::info!("Stopping wifi");
|
||||
self.wifi_sub.take().unwrap();
|
||||
self.ip_sub.take().unwrap();
|
||||
self.disconnect();
|
||||
bus.push(Event::new_property_change("system.network.online", false));
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user