From ce8e1b7f7c6bdb74bd797cde087f507e5f1450c7 Mon Sep 17 00:00:00 2001 From: Super User Date: Thu, 29 May 2025 15:05:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0Long=20polling=E7=9A=84?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers/datamgr/route.rs | 126 +++++++++++++++++++++++++++- src/cores/handlers/payload.rs | 1 - src/cores/router.rs | 7 ++ src/cores/servers/actix_web/mod.rs | 12 +++ 4 files changed, 144 insertions(+), 2 deletions(-) diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 68fc807..2c1546d 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -11,7 +11,9 @@ use std::os::raw::c_char; use std::ptr::null_mut; use std::slice; use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::timeout; use tokio_stream::wrappers::ReceiverStream; pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); @@ -19,6 +21,9 @@ pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); static SUBSCRIBERS: Lazy>>>> = Lazy::new(|| Mutex::new(HashMap::new())); +static MESSAGESUBSCRIBERS: Lazy>>>> = + Lazy::new(|| Mutex::new(HashMap::new())); + #[cfg(target_arch = "x86")] type DataChar = i8; @@ -870,3 +875,122 @@ pub async fn subscribe_handler( Ok(ReceiverStream::new(rx)) } + +extern "C" fn message_callback( + topic: *const ::std::os::raw::c_char, + uuid: *const ::std::os::raw::c_char, + size: ::std::os::raw::c_int, + data: *const ::std::os::raw::c_char, + _: *mut ::std::os::raw::c_void, +) { + let topic_str = unsafe { CStr::from_ptr(topic).to_string_lossy().to_string() }; + let uuid_str = unsafe { CStr::from_ptr(uuid).to_string_lossy().to_string() }; + + //将data转换为切片 + let data_bytes = unsafe { slice::from_raw_parts(data as *const u8, size as usize) }; + + let data_str = match std::str::from_utf8(data_bytes) { + Ok(s) => s.to_string(), //UTF-8直接转换 + Err(_) => STANDARD.encode(data_bytes), //非UTF-8数据转换为Base64 + }; + + let json_data = serde_json::json!({ + "topic": topic_str, + "uuid": uuid_str, + "size": size, + "data": data_str + }); + + // 通知所有等待的请求 + let mut subs = MESSAGESUBSCRIBERS.lock().unwrap(); + if let Some(subscribers) = subs.get_mut(&topic_str) { + // 为每个订阅者发送消息并清空列表 + for sender in subscribers.drain(..) { + let _ = sender.send(json_data.clone()); // 忽略可能的错误 + } + } +} + +pub async fn message_handler( + _: Arc, + server_request: ServerRequest, +) -> ServerResult { + let headers = server_request.headers; + + let header_cloud_topic = match headers.get("x-cloud-topic") { + Some(value) => value.to_string(), + None => return Err(ServerError::bad_request("Missing x-cloud-topic header")), + }; + + let cloud_topic = match CString::new(header_cloud_topic.clone()) { + Ok(cstr) => cstr, + Err(_) => return Err(ServerError::bad_request("Invalid cloud topic")), + }; + + // 获取超时时间 + let timeout_ms = match headers.get("x-cloud-timeout") { + Some(value) => { + match value.to_string().parse::() { + Ok(ms) => Some(ms), + Err(_) => None, // 解析失败则视为无超时 + } + } + None => None, // 未指定则视为无超时 + }; + + // 创建一个oneshot通道,用于等待消息 + let (tx, rx) = oneshot::channel(); + + { + let mut subs = MESSAGESUBSCRIBERS.lock().unwrap(); + let is_first_subscriber = !subs.contains_key(&header_cloud_topic) || + subs.get(&header_cloud_topic).unwrap().is_empty(); + + subs.entry(header_cloud_topic.clone()) + .or_insert_with(Vec::new) + .push(tx); + + // 如果是第一个订阅者,则注册回调 + if is_first_subscriber { + unsafe { + let ret = datamgr_api::Subscribe( + DATA_PLUGIN_MANAGER, + cloud_topic.as_ptr(), + Some(message_callback), + std::ptr::null_mut(), + ); + + if ret != 1 { + subs.get_mut(&header_cloud_topic).unwrap().clear(); + return Err(ServerError::internal_error("Subscribe data failed")); + } + } + } + } + + // 根据是否有超时设置来处理 + match timeout_ms { + Some(ms) => { + // 有超时设置 + match timeout(Duration::from_millis(ms), rx).await { + Ok(result) => { + match result { + Ok(value) => Ok(value), + Err(_) => Err(ServerError::internal_error("Channel closed unexpectedly")), + } + }, + Err(_) => { + // 超时发生,返回404 Not Found + Err(ServerError::not_found("No message received within timeout period")) + } + } + }, + None => { + // 无超时设置,无限等待 + match rx.await { + Ok(value) => Ok(value), + Err(_) => Err(ServerError::internal_error("Channel closed unexpectedly")), + } + } + } +} \ No newline at end of file diff --git a/src/cores/handlers/payload.rs b/src/cores/handlers/payload.rs index 9b4e77a..334f985 100644 --- a/src/cores/handlers/payload.rs +++ b/src/cores/handlers/payload.rs @@ -1,6 +1,5 @@ use std::sync::Arc; use fleetmodv2::api_server::{ServerError, ServerRequest, ServerResult}; -use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use crate::cores::state::AppState; use crate::db::delete::delete_from_payload; diff --git a/src/cores/router.rs b/src/cores/router.rs index 3c95573..34ab118 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -62,6 +62,7 @@ pub enum RouterKey { DataMgrOrderResult, DataMgrPublish, DataMgrSubscribe, + DataMgrMessage, // Consensus Service Router Key ConsensusRequests, @@ -394,6 +395,12 @@ impl Router { RouterKey::DataMgrSubscribe, handlers::datamgr::subscribe_handler ); + + add_route!( + router, + RouterKey::DataMgrMessage, + handlers::datamgr::message_handler + ); } // consensus service routers diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 6d90ff9..f4a87ac 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -740,6 +740,7 @@ pub mod datamgr { ); app.service(publish); app.service(subscribe); + app.service(message); } #[derive(Serialize, Deserialize, Apiv2Schema)] @@ -935,6 +936,17 @@ pub mod datamgr { RouterKey::DataMgrSubscribe, "DataMgr" ); + + define_json_response_method!( + body_unrequired + message, + get, + "/Message", + (), + (), + RouterKey::DataMgrMessage, + "DataMgr" + ); } pub mod consensus { -- Gitee