diff --git a/src/platform/esp32.rs b/src/platform/esp32.rs index b952ea8..f2d6e0a 100644 --- a/src/platform/esp32.rs +++ b/src/platform/esp32.rs @@ -2,6 +2,8 @@ use core::borrow::BorrowMut; use std::sync::Arc; use std::fmt::Debug; use std::sync::Mutex; +use std::thread::JoinHandle; +use std::thread::ScopedJoinHandle; use chrono::DateTime; use chrono::Timelike; @@ -11,6 +13,9 @@ use esp_idf_svc::eventloop::{EspSubscription, EspSystemEventLoop, System}; use esp_idf_svc::hal::modem::Modem; use esp_idf_svc::hal::prelude::Peripherals; use esp_idf_svc::hal::task::thread::ThreadSpawnConfiguration; +use esp_idf_svc::mqtt::client::EspMqttClient; +use esp_idf_svc::mqtt::client::EspMqttConnection; +use esp_idf_svc::mqtt::client::MqttClientConfiguration; use esp_idf_svc::netif::IpEvent; use esp_idf_svc::nvs::{EspDefaultNvsPartition, EspNvsPartition, NvsDefault}; use esp_idf_svc::sntp::EspSntp; @@ -97,7 +102,7 @@ pub struct Esp32Board { impl Board for Esp32Board { type Output = StrideOutput<[Rgb; 310], FastWs2812Esp32Rmt<'static>>; type Surfaces = BufferedSurfacePool; - type Scheduler = FixedSizeScheduler<2>; + type Scheduler = FixedSizeScheduler<4>; fn take() -> Self { // It is necessary to call this function once. Otherwise some patches to the runtime @@ -205,6 +210,7 @@ impl Board for Esp32Board { FixedSizeScheduler::new([ Box::new(WifiTask::new(self.modem.take().unwrap(), self.sys_loop.clone(), &nvs)), Box::new(CircadianRhythm::new()), + Box::new(MqttTask::new()), Box::new(self.surfaces.clone()) ]) } @@ -293,6 +299,74 @@ impl Task for CircadianRhythm { } } +struct MqttTask { + client: Option>, + conn_thread: Option>, +} + +impl MqttTask { + fn new() -> Self { + MqttTask { + conn_thread: None, + client: None + } + } + + fn start_mqtt(&mut self) { + log::info!("Starting MQTT"); + let (client, mut conn) = EspMqttClient::new( + "mqtt://10.0.0.2:1883", + &MqttClientConfiguration { + client_id: Some("renderbug-rs"), + ..Default::default() + } + ).unwrap(); + log::info!("Connected!"); + + self.conn_thread = Some(std::thread::Builder::new() + .stack_size(6000) + .spawn(move || { + conn.next().unwrap(); + }).unwrap()); + self.client = Some(client); + } +} + +impl Task for MqttTask { + fn tick(&mut self, event: &Event, bus: &mut EventBus) { + match event { + Event::Input(crate::events::InputEvent::NetworkOnline) => { + log::info!("Registering with MQTT"); + + self.start_mqtt(); + + if let Some(ref mut client) = self.client { + client.enqueue( + "homeassistant-test/renderbug/rust", + esp_idf_svc::mqtt::client::QoS::AtLeastOnce, + false, + "hello, world".as_bytes() + ).unwrap(); + log::info!("MQTT should be online!"); + } + }, + Event::PropertyChange(name, value) => { + if let Some(ref mut client) = self.client { + let payload = format!("name={} value={:?}", name, value); + client.enqueue( + "homeassistant-test/renderbug/rust/property-change", + esp_idf_svc::mqtt::client::QoS::AtLeastOnce, + false, + payload.as_bytes() + ).unwrap(); + log::info!("property change bump: {}", payload); + } + } + _ => () + } + } +} + impl Debug for WifiTask { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("WifiTask").finish()