diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 6c2d03e8f86e368f959889a98c6b23c939d69d45..8e041d40538bdf46d30dd6462b533fcc2fc9400a 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -1,19 +1,20 @@ use crate::cores::handlers::datamgr::datamgr_api; use crate::cores::state::AppState; -use base64::engine::general_purpose::STANDARD; -use base64::Engine; -use fleetmodv2::api_server::{ServerError, ServerRawResponse, ServerRequest, ServerResponse, ServerResult}; +use fleetmodv2::api_server::{ + ServerError, ServerRawResponse, ServerRequest, ServerResponse, ServerResult, +}; use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::os::raw::c_char; use std::ptr::null_mut; -use std::slice; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; -use serde::{Deserialize, Serialize}; -use tokio::time::{Instant}; +use tokio::sync::oneshot; +use tokio::time::Instant; +use uuid::Uuid; pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); @@ -30,7 +31,7 @@ type DataChar = u8; type DataChar = u8; pub async fn upload_data_handler( - state : Arc, + state: Arc, server_request: ServerRequest, ) -> ServerResponse { let headers = server_request.headers; @@ -265,10 +266,7 @@ pub async fn download_data_handler( datamgr_api::FreeMem(data); - ServerRawResponse::ok() - .body(result) - .build() - .into() + ServerRawResponse::ok().body(result).build().into() } } @@ -868,7 +866,11 @@ pub async fn subscribe_handler( let subscriber_id = match headers.get("x-cloud-subscriber-id") { Some(value) => value.to_string(), - None => return Err(ServerError::bad_request("Missing x-cloud-subscriber-id header")), + None => { + return Err(ServerError::bad_request( + "Missing x-cloud-subscriber-id header", + )) + } }; let cloud_topic = match CString::new(header_cloud_topic.clone()) { @@ -882,7 +884,8 @@ pub async fn subscribe_handler( let mut queue_system = queue_mutex.lock().unwrap(); // 获取或创建topic的订阅者映射 - let topic_subscribers = queue_system.queues + let topic_subscribers = queue_system + .queues .entry(header_cloud_topic.clone()) .or_insert_with(HashMap::new); @@ -924,11 +927,7 @@ pub async fn subscribe_handler( } // 获取消息处理函数 -pub async fn message_handler( - _: Arc, - server_request: ServerRequest, -) -> ServerResponse { - +pub async fn message_handler(_: Arc, server_request: ServerRequest) -> ServerResponse { let bad_request = |body_str| { ServerRawResponse::bad_request() .body(Vec::from(body_str)) @@ -1003,7 +1002,12 @@ pub async fn message_handler( return not_found("Topic not found"); } - if !queue_system.queues.get(&header_cloud_topic).unwrap().contains_key(&subscriber_id) { + if !queue_system + .queues + .get(&header_cloud_topic) + .unwrap() + .contains_key(&subscriber_id) + { return not_found("Subscriber not found"); } @@ -1023,7 +1027,8 @@ pub async fn message_handler( } let remaining = timeout_duration - elapsed; - let (mut new_system, timeout_result) = condvar.wait_timeout(queue_system, remaining).unwrap(); + let (mut new_system, timeout_result) = + condvar.wait_timeout(queue_system, remaining).unwrap(); queue_system = new_system; // 检查是否有消息 @@ -1044,7 +1049,7 @@ pub async fn message_handler( return not_found("No message received within timeout period"); } } - }, + } None => { // 无限等待 loop { @@ -1065,7 +1070,252 @@ pub async fn message_handler( } } } - }).await.unwrap_or_else(|_| bad_request("Task execution failed")); + }) + .await + .unwrap_or_else(|_| bad_request("Task execution failed")); message_result -} \ No newline at end of file +} + +// 定义request-reply使用的消息结构 +struct RequestReplyPair { + // request_uuid -> messages + pairs: HashMap>, +} + +impl RequestReplyPair { + fn new() -> Self { + RequestReplyPair { + pairs: HashMap::new(), + } + } +} + +// 全局共享状态 +static PAIR_SYSTEM: Lazy<(Mutex, Condvar)> = + Lazy::new(|| (Mutex::new(RequestReplyPair::new()), Condvar::new())); + +fn c_str_to_string(ptr: *const c_char, ctx: &str) -> Option { + unsafe { + CStr::from_ptr(ptr) + .to_str() + .map(|s| s.to_string()) + .map_err(|e| log::error!("Invalid UTF-8 in {}: {}", ctx, e)) + .ok() + } +} + +extern "C" fn response_callback( + topic: *const c_char, + uuid: *const c_char, + size: i32, + data: *const c_char, + closure: *mut ::std::os::raw::c_void, +) { + // 转换 topic + let topic_str = if let Some(s) = c_str_to_string(topic, "topic") { + s + } else { + return; + }; + // 转换 uuid + let uuid_str = if let Some(s) = c_str_to_string(uuid, "uuid") { + s + } else { + return; + }; + log::debug!( + "apisverver response callback: topic = {}, uuid={}, size = {}", + topic_str, + uuid_str, + size as i32 + ); + + let arc_uuid_cstr = unsafe { Arc::from_raw(closure as *const CString) }; + let closure_str = match arc_uuid_cstr.to_str() { + Ok(s) => s.to_string(), + Err(e) => { + log::error!("Invalid UTF-8 in closure: {}", e); + return; + } + }; + + // 校验数据指针 + if data.is_null() || size < 0 { + log::error!("Invalid data (ptr={:?}, size={})", data, size); + return; + } + log::debug!( + "apisverver response received: topic={}, uuid={}, size={}", + topic_str, + uuid_str, + size + ); + + // 将data转换为切片 + let result = { + let slice = unsafe { std::slice::from_raw_parts(data as *const u8, size as usize) }; + slice.to_vec() + }; + + let message = Message { + uuid: uuid_str.clone(), + data: result.clone(), + size, + }; + + let mut response_map = RESPONSE_MAP.lock().unwrap(); + if let Some(sender) = response_map.remove(&closure_str) { + let _ = sender.send(message); + } else { + log::warn!("No request found for UUID: {}", closure_str); + } + // }); +} + +#[repr(C)] +pub struct CallbackContext { + uuid: *mut c_char, // CString 转 raw +} + +static RESPONSE_MAP: Lazy>>>> = + Lazy::new(|| Arc::new(Mutex::new(HashMap::new()))); + +pub async fn request_handler(_: Arc, server_request: ServerRequest) -> ServerResponse { + let bad_request = |body_str| { + ServerRawResponse::bad_request() + .body(Vec::from(body_str)) + .build() + .into() + }; + let headers = server_request.headers; + + // 获取消息主题 + let header_cloud_topic = match headers.get("x-cloud-topic") { + Some(value) => value.to_string(), + None => return bad_request("Missing x-cloud-topic header"), + }; + + let cloud_topic = match CString::new(header_cloud_topic.clone()) { + Ok(cstr) => cstr, + Err(_) => return bad_request("Invalid cloud topic"), + }; + + let body = server_request.body.as_ref().unwrap().as_slice(); + let data_content = body.to_vec(); + + // 获取超时时间 + let timeout_ms = match headers.get("x-cloud-timeout") { + Some(value) => { + match value.to_string().parse::() { + Ok(ms) => ms, + Err(_) => 300000, // 解析失败超时时间默认为五分钟 + } + } + None => 300000, // 解析失败超时时间默认为五分钟 + }; + + // 生成这次请求的uuid + let uuid = Uuid::new_v4().to_string(); + println!("uuid: {}", uuid); + + let uuid_cstr = match CString::new(uuid.clone()) { + Ok(cstr) => cstr, + Err(_) => return bad_request("Invalid cloud uuid"), + }; + + let uuid_cstr_arc = Arc::new(uuid_cstr); + let (tx, mut rx) = oneshot::channel(); + { + let mut response_map = RESPONSE_MAP.lock().unwrap(); + response_map.insert(uuid.clone(), tx); + } + unsafe { + let ret = datamgr_api::Request( + DATA_PLUGIN_MANAGER, + cloud_topic.as_ptr(), + data_content.len() as i32, + data_content.as_ptr() as *const DataChar, + Some(response_callback), + Arc::into_raw(uuid_cstr_arc) as *mut std::ffi::c_void, // 传入 context + ); + if ret != 1 { + let (pairs_mutex, _) = &*PAIR_SYSTEM; + let mut pairs_system = pairs_mutex.lock().unwrap(); + pairs_system.pairs.remove(&uuid); + return bad_request("Request data failed"); + } + } + + // 使用 select 等待 rx 或者超时 + tokio::select! { + res = &mut rx => { + match res { + Ok(response_message) => { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), response_message.uuid.to_string()); + header.insert("Content-Length".to_string(), response_message.size.to_string()); + header.insert("Content-Type".to_string(), "text/plain".to_string()); + return ServerRawResponse::ok() + .body(response_message.data) + .headers(header) + .build() + .into(); + } + Err(e) => { + return bad_request(format!("Failed to receive response: {:?}", e).as_str()); + } + } + } + _ = tokio::time::sleep(Duration::from_millis(timeout_ms)) => { + let mut response_map = RESPONSE_MAP.lock().unwrap(); + response_map.remove(&uuid); + return bad_request("receive response timeout"); + } + } +} + + +pub async fn reply_handler(_: Arc, server_request: ServerRequest) -> ServerResponse { + let headers = server_request.headers; + + let body = server_request.body.as_ref().unwrap().as_slice(); + let data_content = body.to_vec(); + + let bad_request = |body_str| { + ServerRawResponse::bad_request() + .body(Vec::from(body_str)) + .build() + .into() + }; + + let header_cloud_uuid = match headers.get("x-cloud-uuid") { + Some(value) => value.to_string(), + None => return bad_request("Missing x-cloud-uuid header"), + }; + + let cloud_uuid = match CString::new(header_cloud_uuid) { + Ok(cstr) => cstr, + Err(_) => return bad_request("Invalid cloud uuid"), + }; + + unsafe { + let ret = datamgr_api::Reply( + DATA_PLUGIN_MANAGER, + cloud_uuid.as_ptr(), + data_content.len() as i32, + data_content.as_ptr() as *const DataChar, + ); + if ret == 1 { + ServerRawResponse::ok() + .body(Vec::from("Reply Ok")) + .build() + .into() + } else { + ServerRawResponse::internal_error() + .body(Vec::from("Reply Failed")) + .build() + .into() + } + } +} diff --git a/src/cores/router.rs b/src/cores/router.rs index 9fc45af32dc9af4a9683532c01e86d3e3446e29d..fe693183cbb2c8f78093dc8b1782274754dfbaff 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -75,6 +75,8 @@ pub enum RouterKey { DataMgrPublish, DataMgrSubscribe, DataMgrMessage, + DataMgrRequest, + DataMgrReply, // Consensus Service Router Key ConsensusRequests, @@ -471,6 +473,18 @@ impl Router { RouterKey::DataMgrMessage, handlers::datamgr::message_handler ); + + add_route!( + router, + RouterKey::DataMgrRequest, + handlers::datamgr::request_handler + ); + + add_route!( + router, + RouterKey::DataMgrReply, + handlers::datamgr::reply_handler + ); } // consensus service routers diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 74a1e431639d352db16e3cae89a27246456ff722..f87c79794643d0aca79fe8ee509f845bc84208be 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -914,6 +914,8 @@ pub mod datamgr { app.service(publish); app.service(subscribe); app.service(message); + app.service(request); + app.service(reply); } #[derive(Serialize, Deserialize, Apiv2Schema)] @@ -1119,6 +1121,26 @@ pub mod datamgr { RouterKey::DataMgrMessage, "DataMgr" ); + + define_raw_response_method!( + request, + post, + "/Request", + (), + (), + RouterKey::DataMgrRequest, + "DataMgr" + ); + + define_raw_response_method!( + reply, + post, + "/Reply", + (), + (), + RouterKey::DataMgrReply, + "DataMgr" + ); } pub mod consensus {