From ce8e1b7f7c6bdb74bd797cde087f507e5f1450c7 Mon Sep 17 00:00:00 2001 From: Super User Date: Thu, 29 May 2025 15:05:15 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=AE=9E=E7=8E=B0Long=20polling=E7=9A=84?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers/datamgr/route.rs | 126 +++++++++++++++++++++++++++- src/cores/handlers/payload.rs | 1 - src/cores/router.rs | 7 ++ src/cores/servers/actix_web/mod.rs | 12 +++ 4 files changed, 144 insertions(+), 2 deletions(-) diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 68fc807..2c1546d 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -11,7 +11,9 @@ use std::os::raw::c_char; use std::ptr::null_mut; use std::slice; use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; +use std::time::Duration; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::timeout; use tokio_stream::wrappers::ReceiverStream; pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); @@ -19,6 +21,9 @@ pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); static SUBSCRIBERS: Lazy>>>> = Lazy::new(|| Mutex::new(HashMap::new())); +static MESSAGESUBSCRIBERS: Lazy>>>> = + Lazy::new(|| Mutex::new(HashMap::new())); + #[cfg(target_arch = "x86")] type DataChar = i8; @@ -870,3 +875,122 @@ pub async fn subscribe_handler( Ok(ReceiverStream::new(rx)) } + +extern "C" fn message_callback( + topic: *const ::std::os::raw::c_char, + uuid: *const ::std::os::raw::c_char, + size: ::std::os::raw::c_int, + data: *const ::std::os::raw::c_char, + _: *mut ::std::os::raw::c_void, +) { + let topic_str = unsafe { CStr::from_ptr(topic).to_string_lossy().to_string() }; + let uuid_str = unsafe { CStr::from_ptr(uuid).to_string_lossy().to_string() }; + + //将data转换为切片 + let data_bytes = unsafe { slice::from_raw_parts(data as *const u8, size as usize) }; + + let data_str = match std::str::from_utf8(data_bytes) { + Ok(s) => s.to_string(), //UTF-8直接转换 + Err(_) => STANDARD.encode(data_bytes), //非UTF-8数据转换为Base64 + }; + + let json_data = serde_json::json!({ + "topic": topic_str, + "uuid": uuid_str, + "size": size, + "data": data_str + }); + + // 通知所有等待的请求 + let mut subs = MESSAGESUBSCRIBERS.lock().unwrap(); + if let Some(subscribers) = subs.get_mut(&topic_str) { + // 为每个订阅者发送消息并清空列表 + for sender in subscribers.drain(..) { + let _ = sender.send(json_data.clone()); // 忽略可能的错误 + } + } +} + +pub async fn message_handler( + _: Arc, + server_request: ServerRequest, +) -> ServerResult { + let headers = server_request.headers; + + let header_cloud_topic = match headers.get("x-cloud-topic") { + Some(value) => value.to_string(), + None => return Err(ServerError::bad_request("Missing x-cloud-topic header")), + }; + + let cloud_topic = match CString::new(header_cloud_topic.clone()) { + Ok(cstr) => cstr, + Err(_) => return Err(ServerError::bad_request("Invalid cloud topic")), + }; + + // 获取超时时间 + let timeout_ms = match headers.get("x-cloud-timeout") { + Some(value) => { + match value.to_string().parse::() { + Ok(ms) => Some(ms), + Err(_) => None, // 解析失败则视为无超时 + } + } + None => None, // 未指定则视为无超时 + }; + + // 创建一个oneshot通道,用于等待消息 + let (tx, rx) = oneshot::channel(); + + { + let mut subs = MESSAGESUBSCRIBERS.lock().unwrap(); + let is_first_subscriber = !subs.contains_key(&header_cloud_topic) || + subs.get(&header_cloud_topic).unwrap().is_empty(); + + subs.entry(header_cloud_topic.clone()) + .or_insert_with(Vec::new) + .push(tx); + + // 如果是第一个订阅者,则注册回调 + if is_first_subscriber { + unsafe { + let ret = datamgr_api::Subscribe( + DATA_PLUGIN_MANAGER, + cloud_topic.as_ptr(), + Some(message_callback), + std::ptr::null_mut(), + ); + + if ret != 1 { + subs.get_mut(&header_cloud_topic).unwrap().clear(); + return Err(ServerError::internal_error("Subscribe data failed")); + } + } + } + } + + // 根据是否有超时设置来处理 + match timeout_ms { + Some(ms) => { + // 有超时设置 + match timeout(Duration::from_millis(ms), rx).await { + Ok(result) => { + match result { + Ok(value) => Ok(value), + Err(_) => Err(ServerError::internal_error("Channel closed unexpectedly")), + } + }, + Err(_) => { + // 超时发生,返回404 Not Found + Err(ServerError::not_found("No message received within timeout period")) + } + } + }, + None => { + // 无超时设置,无限等待 + match rx.await { + Ok(value) => Ok(value), + Err(_) => Err(ServerError::internal_error("Channel closed unexpectedly")), + } + } + } +} \ No newline at end of file diff --git a/src/cores/handlers/payload.rs b/src/cores/handlers/payload.rs index 9b4e77a..334f985 100644 --- a/src/cores/handlers/payload.rs +++ b/src/cores/handlers/payload.rs @@ -1,6 +1,5 @@ use std::sync::Arc; use fleetmodv2::api_server::{ServerError, ServerRequest, ServerResult}; -use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use crate::cores::state::AppState; use crate::db::delete::delete_from_payload; diff --git a/src/cores/router.rs b/src/cores/router.rs index 3c95573..34ab118 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -62,6 +62,7 @@ pub enum RouterKey { DataMgrOrderResult, DataMgrPublish, DataMgrSubscribe, + DataMgrMessage, // Consensus Service Router Key ConsensusRequests, @@ -394,6 +395,12 @@ impl Router { RouterKey::DataMgrSubscribe, handlers::datamgr::subscribe_handler ); + + add_route!( + router, + RouterKey::DataMgrMessage, + handlers::datamgr::message_handler + ); } // consensus service routers diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 6d90ff9..f4a87ac 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -740,6 +740,7 @@ pub mod datamgr { ); app.service(publish); app.service(subscribe); + app.service(message); } #[derive(Serialize, Deserialize, Apiv2Schema)] @@ -935,6 +936,17 @@ pub mod datamgr { RouterKey::DataMgrSubscribe, "DataMgr" ); + + define_json_response_method!( + body_unrequired + message, + get, + "/Message", + (), + (), + RouterKey::DataMgrMessage, + "DataMgr" + ); } pub mod consensus { -- Gitee From cce30dd6c31c2478b9360948161646583d87c156 Mon Sep 17 00:00:00 2001 From: Super User Date: Thu, 29 May 2025 22:20:52 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9/Subscribe=E4=BB=A5?= =?UTF-8?q?=E5=8F=8A/Message?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers/datamgr/route.rs | 321 +++++++++++++++++----------- src/cores/servers/actix_web/mod.rs | 5 +- 2 files changed, 200 insertions(+), 126 deletions(-) diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 2c1546d..6c2d03e 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -10,20 +10,13 @@ use std::ffi::{CStr, CString}; use std::os::raw::c_char; use std::ptr::null_mut; use std::slice; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; -use tokio::sync::{mpsc, oneshot}; -use tokio::time::timeout; -use tokio_stream::wrappers::ReceiverStream; +use serde::{Deserialize, Serialize}; +use tokio::time::{Instant}; pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); -static SUBSCRIBERS: Lazy>>>> = - Lazy::new(|| Mutex::new(HashMap::new())); - -static MESSAGESUBSCRIBERS: Lazy>>>> = - Lazy::new(|| Mutex::new(HashMap::new())); - #[cfg(target_arch = "x86")] type DataChar = i8; @@ -800,7 +793,32 @@ pub async fn publish_handler(_: Arc, server_request: ServerRequest) -> } } -extern "C" fn rust_callback( +#[derive(Clone, Debug, Serialize, Deserialize)] +struct Message { + uuid: String, + data: Vec, + size: i32, +} + +// 定义消息队列结构 +struct QueueSystem { + // topic -> (subscriber_id -> messages) + queues: HashMap>>, +} + +impl QueueSystem { + fn new() -> Self { + QueueSystem { + queues: HashMap::new(), + } + } +} + +// 全局共享状态 +static QUEUE_SYSTEM: Lazy<(Mutex, Condvar)> = + Lazy::new(|| (Mutex::new(QueueSystem::new()), Condvar::new())); + +extern "C" fn message_callback( topic: *const ::std::os::raw::c_char, uuid: *const ::std::os::raw::c_char, size: ::std::os::raw::c_int, @@ -810,121 +828,132 @@ extern "C" fn rust_callback( let topic_str = unsafe { CStr::from_ptr(topic).to_string_lossy().to_string() }; let uuid_str = unsafe { CStr::from_ptr(uuid).to_string_lossy().to_string() }; - //将data转换为切片 - let data_bytes = unsafe { slice::from_raw_parts(data as *const u8, size as usize) }; + // 将data转换为切片 + let result = { + let slice = unsafe { std::slice::from_raw_parts(data as *const u8, size as usize) }; + slice.to_vec() + }; - let data_str = match std::str::from_utf8(data_bytes) { - Ok(s) => s.to_string(), //UTF-8直接转换 - Err(_) => STANDARD.encode(data_bytes), //非UTF-8数据转换为Base64 + let message = Message { + uuid: uuid_str, + data: result, + size, }; - let json_data = serde_json::json!({ - "topic": topic_str, - "uuid": uuid_str, - "size": size, - "data": data_str - }); + // 获取锁并添加消息 + let (queue_mutex, condvar) = &*QUEUE_SYSTEM; + let mut queue_system = queue_mutex.lock().unwrap(); - // 广播给所有订阅者 - let mut subs = SUBSCRIBERS.lock().unwrap(); - if let Some(subscribers) = subs.get_mut(&topic_str) { - subscribers.retain(|sender| sender.try_send(json_data.clone()).is_ok()); + if let Some(topic_subscribers) = queue_system.queues.get_mut(&topic_str) { + for subscriber_queue in topic_subscribers.values_mut() { + subscriber_queue.push(message.clone()); + } + // 通知所有等待的线程 + condvar.notify_all(); } } +// 订阅处理函数 pub async fn subscribe_handler( _: Arc, server_request: ServerRequest, -) -> ServerResult> { +) -> ServerResult { let headers = server_request.headers; + // 获取必要的头信息 let header_cloud_topic = match headers.get("x-cloud-topic") { Some(value) => value.to_string(), None => return Err(ServerError::bad_request("Missing x-cloud-topic header")), }; + let subscriber_id = match headers.get("x-cloud-subscriber-id") { + Some(value) => value.to_string(), + None => return Err(ServerError::bad_request("Missing x-cloud-subscriber-id header")), + }; + let cloud_topic = match CString::new(header_cloud_topic.clone()) { Ok(cstr) => cstr, Err(_) => return Err(ServerError::bad_request("Invalid cloud topic")), }; - let (tx, rx) = mpsc::channel(10); + // 检查订阅状态并创建队列 + let is_first_subscriber = { + let (queue_mutex, _) = &*QUEUE_SYSTEM; + let mut queue_system = queue_mutex.lock().unwrap(); + + // 获取或创建topic的订阅者映射 + let topic_subscribers = queue_system.queues + .entry(header_cloud_topic.clone()) + .or_insert_with(HashMap::new); + + // 检查该订阅者是否已存在 + if topic_subscribers.contains_key(&subscriber_id) { + return Err(ServerError::bad_request("Already Subscribed")); + } - // 将订阅者加入全局 HashMap - let mut subs = SUBSCRIBERS.lock().unwrap(); - subs.entry(header_cloud_topic.clone()) - .or_insert_with(Vec::new) - .push(tx); + // 为新订阅者创建消息队列 + topic_subscribers.insert(subscriber_id.clone(), Vec::new()); - // 确保只调用一次 `datamgr_api::Subscribe` - if subs.get(&header_cloud_topic).unwrap().len() == 1 { + // 判断是否为第一个订阅者 + topic_subscribers.len() == 1 + }; + + // 如果是第一个订阅者,注册回调 + if is_first_subscriber { unsafe { let ret = datamgr_api::Subscribe( DATA_PLUGIN_MANAGER, cloud_topic.as_ptr(), - Some(rust_callback), + Some(message_callback), std::ptr::null_mut(), ); if ret != 1 { - subs.remove(&header_cloud_topic); + // 注册失败,删除订阅者 + let (queue_mutex, _) = &*QUEUE_SYSTEM; + let mut queue_system = queue_mutex.lock().unwrap(); + if let Some(topic_subscribers) = queue_system.queues.get_mut(&header_cloud_topic) { + topic_subscribers.remove(&subscriber_id); + } return Err(ServerError::internal_error("Subscribe data failed")); } } } - Ok(ReceiverStream::new(rx)) + Ok(serde_json::json!("Subscribe Ok")) } -extern "C" fn message_callback( - topic: *const ::std::os::raw::c_char, - uuid: *const ::std::os::raw::c_char, - size: ::std::os::raw::c_int, - data: *const ::std::os::raw::c_char, - _: *mut ::std::os::raw::c_void, -) { - let topic_str = unsafe { CStr::from_ptr(topic).to_string_lossy().to_string() }; - let uuid_str = unsafe { CStr::from_ptr(uuid).to_string_lossy().to_string() }; - - //将data转换为切片 - let data_bytes = unsafe { slice::from_raw_parts(data as *const u8, size as usize) }; +// 获取消息处理函数 +pub async fn message_handler( + _: Arc, + server_request: ServerRequest, +) -> ServerResponse { - let data_str = match std::str::from_utf8(data_bytes) { - Ok(s) => s.to_string(), //UTF-8直接转换 - Err(_) => STANDARD.encode(data_bytes), //非UTF-8数据转换为Base64 + let bad_request = |body_str| { + ServerRawResponse::bad_request() + .body(Vec::from(body_str)) + .build() + .into() }; - let json_data = serde_json::json!({ - "topic": topic_str, - "uuid": uuid_str, - "size": size, - "data": data_str - }); - - // 通知所有等待的请求 - let mut subs = MESSAGESUBSCRIBERS.lock().unwrap(); - if let Some(subscribers) = subs.get_mut(&topic_str) { - // 为每个订阅者发送消息并清空列表 - for sender in subscribers.drain(..) { - let _ = sender.send(json_data.clone()); // 忽略可能的错误 - } - } -} + let not_found = |body_str| { + ServerRawResponse::not_found() + .body(Vec::from(body_str)) + .build() + .into() + }; -pub async fn message_handler( - _: Arc, - server_request: ServerRequest, -) -> ServerResult { let headers = server_request.headers; + // 获取必要的头信息 let header_cloud_topic = match headers.get("x-cloud-topic") { Some(value) => value.to_string(), - None => return Err(ServerError::bad_request("Missing x-cloud-topic header")), + None => return bad_request("Missing x-cloud-topic header"), }; - let cloud_topic = match CString::new(header_cloud_topic.clone()) { - Ok(cstr) => cstr, - Err(_) => return Err(ServerError::bad_request("Invalid cloud topic")), + let subscriber_id = match headers.get("x-cloud-subscriber-id") { + Some(value) => value.to_string(), + None => return bad_request("Missing x-cloud-subscriber-id header"), }; // 获取超时时间 @@ -938,59 +967,105 @@ pub async fn message_handler( None => None, // 未指定则视为无超时 }; - // 创建一个oneshot通道,用于等待消息 - let (tx, rx) = oneshot::channel(); - - { - let mut subs = MESSAGESUBSCRIBERS.lock().unwrap(); - let is_first_subscriber = !subs.contains_key(&header_cloud_topic) || - subs.get(&header_cloud_topic).unwrap().is_empty(); + // 在阻塞任务中获取消息 + let message_result = tokio::task::spawn_blocking(move || -> ServerResponse { + let (queue_mutex, condvar) = &*QUEUE_SYSTEM; + let mut queue_system = queue_mutex.lock().unwrap(); + + // 定义一个检查队列并获取消息的函数 + let check_and_get_message = |system: &mut QueueSystem| -> Option { + // 检查主题和订阅者是否存在 + if let Some(topic_subscribers) = system.queues.get_mut(&header_cloud_topic) { + if let Some(queue) = topic_subscribers.get_mut(&subscriber_id) { + if !queue.is_empty() { + return Some(queue.remove(0)); + } + } + } + None + }; - subs.entry(header_cloud_topic.clone()) - .or_insert_with(Vec::new) - .push(tx); + // 首先检查是否有立即可用的消息 + if let Some(message) = check_and_get_message(&mut *queue_system) { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); + header.insert("Content-Length".to_string(), message.size.to_string()); + header.insert("Content-Type".to_string(), "text/plain".to_string()); + return ServerRawResponse::ok() + .body(message.data) + .headers(header) + .build() + .into(); + } - // 如果是第一个订阅者,则注册回调 - if is_first_subscriber { - unsafe { - let ret = datamgr_api::Subscribe( - DATA_PLUGIN_MANAGER, - cloud_topic.as_ptr(), - Some(message_callback), - std::ptr::null_mut(), - ); + // 检查主题和订阅者是否存在 + if !queue_system.queues.contains_key(&header_cloud_topic) { + return not_found("Topic not found"); + } - if ret != 1 { - subs.get_mut(&header_cloud_topic).unwrap().clear(); - return Err(ServerError::internal_error("Subscribe data failed")); - } - } + if !queue_system.queues.get(&header_cloud_topic).unwrap().contains_key(&subscriber_id) { + return not_found("Subscriber not found"); } - } - // 根据是否有超时设置来处理 - match timeout_ms { - Some(ms) => { - // 有超时设置 - match timeout(Duration::from_millis(ms), rx).await { - Ok(result) => { - match result { - Ok(value) => Ok(value), - Err(_) => Err(ServerError::internal_error("Channel closed unexpectedly")), + // 队列为空,需要等待 + match timeout_ms { + Some(ms) => { + // 有超时设置 + let timeout_duration = Duration::from_millis(ms); + let start_time = Instant::now(); + + // 循环等待直到有消息或超时 + loop { + // 计算剩余等待时间 + let elapsed = start_time.elapsed(); + if elapsed >= timeout_duration { + return not_found("No message received within timeout period"); + } + + let remaining = timeout_duration - elapsed; + let (mut new_system, timeout_result) = condvar.wait_timeout(queue_system, remaining).unwrap(); + queue_system = new_system; + + // 检查是否有消息 + if let Some(message) = check_and_get_message(&mut *queue_system) { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); + header.insert("Content-Length".to_string(), message.size.to_string()); + header.insert("Content-Type".to_string(), "text/plain".to_string()); + return ServerRawResponse::ok() + .body(message.data) + .headers(header) + .build() + .into(); + } + + // 如果超时且仍没有消息,返回超时错误 + if timeout_result.timed_out() { + return not_found("No message received within timeout period"); + } + } + }, + None => { + // 无限等待 + loop { + queue_system = condvar.wait(queue_system).unwrap(); + + // 检查是否有消息 + if let Some(message) = check_and_get_message(&mut *queue_system) { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); + header.insert("Content-Length".to_string(), message.size.to_string()); + header.insert("Content-Type".to_string(), "text/plain".to_string()); + return ServerRawResponse::ok() + .body(message.data) + .headers(header) + .build() + .into(); } - }, - Err(_) => { - // 超时发生,返回404 Not Found - Err(ServerError::not_found("No message received within timeout period")) } - } - }, - None => { - // 无超时设置,无限等待 - match rx.await { - Ok(value) => Ok(value), - Err(_) => Err(ServerError::internal_error("Channel closed unexpectedly")), } } - } + }).await.unwrap_or_else(|_| bad_request("Task execution failed")); + + message_result } \ No newline at end of file diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index f4a87ac..9a26908 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -926,7 +926,7 @@ pub mod datamgr { "DataMgr" ); - define_json_stream_response_method!( + define_json_response_method!( body_unrequired subscribe, get, @@ -937,8 +937,7 @@ pub mod datamgr { "DataMgr" ); - define_json_response_method!( - body_unrequired + define_raw_response_method!( message, get, "/Message", -- Gitee From 4a7938b4bee739f72908d809ce8fd22d9935e681 Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Fri, 30 May 2025 09:53:38 +0800 Subject: [PATCH 3/3] =?UTF-8?q?subscribe=E7=9A=84=E8=AF=B7=E6=B1=82?= =?UTF-8?q?=E6=96=B9=E6=B3=95=E6=94=B9=E4=B8=BApost?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/servers/actix_web/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 9a26908..2e823fd 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -929,7 +929,7 @@ pub mod datamgr { define_json_response_method!( body_unrequired subscribe, - get, + post, "/Subscribe", (), (), -- Gitee