diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 68fc807663b22908f17c8f5850bf73e80def4bf1..6c2d03e8f86e368f959889a98c6b23c939d69d45 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -10,15 +10,13 @@ use std::ffi::{CStr, CString}; use std::os::raw::c_char; use std::ptr::null_mut; use std::slice; -use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::Duration; +use serde::{Deserialize, Serialize}; +use tokio::time::{Instant}; pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); -static SUBSCRIBERS: Lazy>>>> = - Lazy::new(|| Mutex::new(HashMap::new())); - #[cfg(target_arch = "x86")] type DataChar = i8; @@ -795,7 +793,32 @@ pub async fn publish_handler(_: Arc, server_request: ServerRequest) -> } } -extern "C" fn rust_callback( +#[derive(Clone, Debug, Serialize, Deserialize)] +struct Message { + uuid: String, + data: Vec, + size: i32, +} + +// 定义消息队列结构 +struct QueueSystem { + // topic -> (subscriber_id -> messages) + queues: HashMap>>, +} + +impl QueueSystem { + fn new() -> Self { + QueueSystem { + queues: HashMap::new(), + } + } +} + +// 全局共享状态 +static QUEUE_SYSTEM: Lazy<(Mutex, Condvar)> = + Lazy::new(|| (Mutex::new(QueueSystem::new()), Condvar::new())); + +extern "C" fn message_callback( topic: *const ::std::os::raw::c_char, uuid: *const ::std::os::raw::c_char, size: ::std::os::raw::c_int, @@ -805,68 +828,244 @@ extern "C" fn rust_callback( let topic_str = unsafe { CStr::from_ptr(topic).to_string_lossy().to_string() }; let uuid_str = unsafe { CStr::from_ptr(uuid).to_string_lossy().to_string() }; - //将data转换为切片 - let data_bytes = unsafe { slice::from_raw_parts(data as *const u8, size as usize) }; + // 将data转换为切片 + let result = { + let slice = unsafe { std::slice::from_raw_parts(data as *const u8, size as usize) }; + slice.to_vec() + }; - let data_str = match std::str::from_utf8(data_bytes) { - Ok(s) => s.to_string(), //UTF-8直接转换 - Err(_) => STANDARD.encode(data_bytes), //非UTF-8数据转换为Base64 + let message = Message { + uuid: uuid_str, + data: result, + size, }; - let json_data = serde_json::json!({ - "topic": topic_str, - "uuid": uuid_str, - "size": size, - "data": data_str - }); + // 获取锁并添加消息 + let (queue_mutex, condvar) = &*QUEUE_SYSTEM; + let mut queue_system = queue_mutex.lock().unwrap(); - // 广播给所有订阅者 - let mut subs = SUBSCRIBERS.lock().unwrap(); - if let Some(subscribers) = subs.get_mut(&topic_str) { - subscribers.retain(|sender| sender.try_send(json_data.clone()).is_ok()); + if let Some(topic_subscribers) = queue_system.queues.get_mut(&topic_str) { + for subscriber_queue in topic_subscribers.values_mut() { + subscriber_queue.push(message.clone()); + } + // 通知所有等待的线程 + condvar.notify_all(); } } +// 订阅处理函数 pub async fn subscribe_handler( _: Arc, server_request: ServerRequest, -) -> ServerResult> { +) -> ServerResult { let headers = server_request.headers; + // 获取必要的头信息 let header_cloud_topic = match headers.get("x-cloud-topic") { Some(value) => value.to_string(), None => return Err(ServerError::bad_request("Missing x-cloud-topic header")), }; + let subscriber_id = match headers.get("x-cloud-subscriber-id") { + Some(value) => value.to_string(), + None => return Err(ServerError::bad_request("Missing x-cloud-subscriber-id header")), + }; + let cloud_topic = match CString::new(header_cloud_topic.clone()) { Ok(cstr) => cstr, Err(_) => return Err(ServerError::bad_request("Invalid cloud topic")), }; - let (tx, rx) = mpsc::channel(10); + // 检查订阅状态并创建队列 + let is_first_subscriber = { + let (queue_mutex, _) = &*QUEUE_SYSTEM; + let mut queue_system = queue_mutex.lock().unwrap(); - // 将订阅者加入全局 HashMap - let mut subs = SUBSCRIBERS.lock().unwrap(); - subs.entry(header_cloud_topic.clone()) - .or_insert_with(Vec::new) - .push(tx); + // 获取或创建topic的订阅者映射 + let topic_subscribers = queue_system.queues + .entry(header_cloud_topic.clone()) + .or_insert_with(HashMap::new); + + // 检查该订阅者是否已存在 + if topic_subscribers.contains_key(&subscriber_id) { + return Err(ServerError::bad_request("Already Subscribed")); + } - // 确保只调用一次 `datamgr_api::Subscribe` - if subs.get(&header_cloud_topic).unwrap().len() == 1 { + // 为新订阅者创建消息队列 + topic_subscribers.insert(subscriber_id.clone(), Vec::new()); + + // 判断是否为第一个订阅者 + topic_subscribers.len() == 1 + }; + + // 如果是第一个订阅者,注册回调 + if is_first_subscriber { unsafe { let ret = datamgr_api::Subscribe( DATA_PLUGIN_MANAGER, cloud_topic.as_ptr(), - Some(rust_callback), + Some(message_callback), std::ptr::null_mut(), ); if ret != 1 { - subs.remove(&header_cloud_topic); + // 注册失败,删除订阅者 + let (queue_mutex, _) = &*QUEUE_SYSTEM; + let mut queue_system = queue_mutex.lock().unwrap(); + if let Some(topic_subscribers) = queue_system.queues.get_mut(&header_cloud_topic) { + topic_subscribers.remove(&subscriber_id); + } return Err(ServerError::internal_error("Subscribe data failed")); } } } - Ok(ReceiverStream::new(rx)) + Ok(serde_json::json!("Subscribe Ok")) } + +// 获取消息处理函数 +pub async fn message_handler( + _: Arc, + server_request: ServerRequest, +) -> ServerResponse { + + let bad_request = |body_str| { + ServerRawResponse::bad_request() + .body(Vec::from(body_str)) + .build() + .into() + }; + + let not_found = |body_str| { + ServerRawResponse::not_found() + .body(Vec::from(body_str)) + .build() + .into() + }; + + let headers = server_request.headers; + + // 获取必要的头信息 + let header_cloud_topic = match headers.get("x-cloud-topic") { + Some(value) => value.to_string(), + None => return bad_request("Missing x-cloud-topic header"), + }; + + let subscriber_id = match headers.get("x-cloud-subscriber-id") { + Some(value) => value.to_string(), + None => return bad_request("Missing x-cloud-subscriber-id header"), + }; + + // 获取超时时间 + let timeout_ms = match headers.get("x-cloud-timeout") { + Some(value) => { + match value.to_string().parse::() { + Ok(ms) => Some(ms), + Err(_) => None, // 解析失败则视为无超时 + } + } + None => None, // 未指定则视为无超时 + }; + + // 在阻塞任务中获取消息 + let message_result = tokio::task::spawn_blocking(move || -> ServerResponse { + let (queue_mutex, condvar) = &*QUEUE_SYSTEM; + let mut queue_system = queue_mutex.lock().unwrap(); + + // 定义一个检查队列并获取消息的函数 + let check_and_get_message = |system: &mut QueueSystem| -> Option { + // 检查主题和订阅者是否存在 + if let Some(topic_subscribers) = system.queues.get_mut(&header_cloud_topic) { + if let Some(queue) = topic_subscribers.get_mut(&subscriber_id) { + if !queue.is_empty() { + return Some(queue.remove(0)); + } + } + } + None + }; + + // 首先检查是否有立即可用的消息 + if let Some(message) = check_and_get_message(&mut *queue_system) { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); + header.insert("Content-Length".to_string(), message.size.to_string()); + header.insert("Content-Type".to_string(), "text/plain".to_string()); + return ServerRawResponse::ok() + .body(message.data) + .headers(header) + .build() + .into(); + } + + // 检查主题和订阅者是否存在 + if !queue_system.queues.contains_key(&header_cloud_topic) { + return not_found("Topic not found"); + } + + if !queue_system.queues.get(&header_cloud_topic).unwrap().contains_key(&subscriber_id) { + return not_found("Subscriber not found"); + } + + // 队列为空,需要等待 + match timeout_ms { + Some(ms) => { + // 有超时设置 + let timeout_duration = Duration::from_millis(ms); + let start_time = Instant::now(); + + // 循环等待直到有消息或超时 + loop { + // 计算剩余等待时间 + let elapsed = start_time.elapsed(); + if elapsed >= timeout_duration { + return not_found("No message received within timeout period"); + } + + let remaining = timeout_duration - elapsed; + let (mut new_system, timeout_result) = condvar.wait_timeout(queue_system, remaining).unwrap(); + queue_system = new_system; + + // 检查是否有消息 + if let Some(message) = check_and_get_message(&mut *queue_system) { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); + header.insert("Content-Length".to_string(), message.size.to_string()); + header.insert("Content-Type".to_string(), "text/plain".to_string()); + return ServerRawResponse::ok() + .body(message.data) + .headers(header) + .build() + .into(); + } + + // 如果超时且仍没有消息,返回超时错误 + if timeout_result.timed_out() { + return not_found("No message received within timeout period"); + } + } + }, + None => { + // 无限等待 + loop { + queue_system = condvar.wait(queue_system).unwrap(); + + // 检查是否有消息 + if let Some(message) = check_and_get_message(&mut *queue_system) { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); + header.insert("Content-Length".to_string(), message.size.to_string()); + header.insert("Content-Type".to_string(), "text/plain".to_string()); + return ServerRawResponse::ok() + .body(message.data) + .headers(header) + .build() + .into(); + } + } + } + } + }).await.unwrap_or_else(|_| bad_request("Task execution failed")); + + message_result +} \ No newline at end of file diff --git a/src/cores/handlers/payload.rs b/src/cores/handlers/payload.rs index 9b4e77ad9d80afcc529fb7169d70f8429f3ad31b..334f985f1a3a0fac21500329f7f0e3f9d9811b2e 100644 --- a/src/cores/handlers/payload.rs +++ b/src/cores/handlers/payload.rs @@ -1,6 +1,5 @@ use std::sync::Arc; use fleetmodv2::api_server::{ServerError, ServerRequest, ServerResult}; -use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use crate::cores::state::AppState; use crate::db::delete::delete_from_payload; diff --git a/src/cores/router.rs b/src/cores/router.rs index 3c95573a566289a330b9f1bafe0ecfa4895edbf5..34ab118cb4da47e2816cfc6f5d891592dc5bb6dc 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -62,6 +62,7 @@ pub enum RouterKey { DataMgrOrderResult, DataMgrPublish, DataMgrSubscribe, + DataMgrMessage, // Consensus Service Router Key ConsensusRequests, @@ -394,6 +395,12 @@ impl Router { RouterKey::DataMgrSubscribe, handlers::datamgr::subscribe_handler ); + + add_route!( + router, + RouterKey::DataMgrMessage, + handlers::datamgr::message_handler + ); } // consensus service routers diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 6d90ff9f62daabb5bb440914043d081f85047efd..2e823fd0c978b40b8466541e562b7be6616a1073 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -740,6 +740,7 @@ pub mod datamgr { ); app.service(publish); app.service(subscribe); + app.service(message); } #[derive(Serialize, Deserialize, Apiv2Schema)] @@ -925,16 +926,26 @@ pub mod datamgr { "DataMgr" ); - define_json_stream_response_method!( + define_json_response_method!( body_unrequired subscribe, - get, + post, "/Subscribe", (), (), RouterKey::DataMgrSubscribe, "DataMgr" ); + + define_raw_response_method!( + message, + get, + "/Message", + (), + (), + RouterKey::DataMgrMessage, + "DataMgr" + ); } pub mod consensus {