From dd984d46e8262f76043017d50e3b80c092fa531a Mon Sep 17 00:00:00 2001 From: yzc1114 Date: Wed, 19 Feb 2025 21:36:10 +0800 Subject: [PATCH] =?UTF-8?q?=E9=85=8D=E5=90=88=E9=87=8D=E6=9E=84rust-client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/daemons/messaging.rs | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/src/cores/daemons/messaging.rs b/src/cores/daemons/messaging.rs index bdb7153..de5e868 100644 --- a/src/cores/daemons/messaging.rs +++ b/src/cores/daemons/messaging.rs @@ -14,6 +14,7 @@ use serde_json::Value; use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; use std::time; +use serde::de::DeserializeOwned; use strum::EnumIter; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, Mutex}; @@ -177,7 +178,7 @@ where impl TryFrom for WatchEventMessageResource where - T: FleetResource, + T: FleetResource + DeserializeOwned, { type Error = ServerError; @@ -413,13 +414,38 @@ pub async fn watch_raw( Ok(rx) } +#[allow(unused)] +pub async fn watch_value( + msg_cli: Arc, + ri: ResourceCollectionIdentifier, +) -> ServerResult> { + let mut rx_from = watch_raw(msg_cli, ri).await?; + let (sx, rx) = mpsc::channel(32); + tokio::spawn(async move { + while let Some(value) = rx_from.recv().await { + log::debug!("watch resource received value: {:?}", value); + let message_value: WatchEventMessageValue = match serde_json::from_value(value) { + Ok(v) => v, + Err(e) => { + log::error!("Failed to convert WatchEventMessageValue: {}", e); + continue; + } + }; + if let Err(e) = sx.send(message_value).await { + log::error!("Failed to send WatchEventMessageResource: {}", e); + } + } + }); + Ok(rx) +} + #[allow(unused)] pub async fn watch_resource( msg_cli: Arc, ri: ResourceCollectionIdentifier, ) -> ServerResult>> where - T: FleetResource + 'static, + T: FleetResource + DeserializeOwned + 'static, { let mut rx_from = watch_raw(msg_cli, ri).await?; let (sx, rx) = mpsc::channel(32); -- Gitee