diff --git a/src/cores/daemons/messaging.rs b/src/cores/daemons/messaging.rs index bdb7153ced8ec63698de7dfb2c19d441908393ce..de5e868ed65b55bc22beff773c91b4efc4bec1ee 100644 --- a/src/cores/daemons/messaging.rs +++ b/src/cores/daemons/messaging.rs @@ -14,6 +14,7 @@ use serde_json::Value; use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; use std::time; +use serde::de::DeserializeOwned; use strum::EnumIter; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, Mutex}; @@ -177,7 +178,7 @@ where impl TryFrom for WatchEventMessageResource where - T: FleetResource, + T: FleetResource + DeserializeOwned, { type Error = ServerError; @@ -413,13 +414,38 @@ pub async fn watch_raw( Ok(rx) } +#[allow(unused)] +pub async fn watch_value( + msg_cli: Arc, + ri: ResourceCollectionIdentifier, +) -> ServerResult> { + let mut rx_from = watch_raw(msg_cli, ri).await?; + let (sx, rx) = mpsc::channel(32); + tokio::spawn(async move { + while let Some(value) = rx_from.recv().await { + log::debug!("watch resource received value: {:?}", value); + let message_value: WatchEventMessageValue = match serde_json::from_value(value) { + Ok(v) => v, + Err(e) => { + log::error!("Failed to convert WatchEventMessageValue: {}", e); + continue; + } + }; + if let Err(e) = sx.send(message_value).await { + log::error!("Failed to send WatchEventMessageResource: {}", e); + } + } + }); + Ok(rx) +} + #[allow(unused)] pub async fn watch_resource( msg_cli: Arc, ri: ResourceCollectionIdentifier, ) -> ServerResult>> where - T: FleetResource + 'static, + T: FleetResource + DeserializeOwned + 'static, { let mut rx_from = watch_raw(msg_cli, ri).await?; let (sx, rx) = mpsc::channel(32);