From 2736eef190f164171f7ce8932175c4dc923d30c2 Mon Sep 17 00:00:00 2001 From: Yuichi <913637919@qq.com> Date: Wed, 4 Jun 2025 11:15:40 +0800 Subject: [PATCH 1/2] link #ICCBRR --- src/cores/handlers/datamgr/route.rs | 287 +++++++++++++++++++++++++++- src/cores/router.rs | 14 ++ src/cores/servers/actix_web/mod.rs | 22 +++ 3 files changed, 320 insertions(+), 3 deletions(-) diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 6c2d03e..1630b68 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -1,7 +1,5 @@ use crate::cores::handlers::datamgr::datamgr_api; use crate::cores::state::AppState; -use base64::engine::general_purpose::STANDARD; -use base64::Engine; use fleetmodv2::api_server::{ServerError, ServerRawResponse, ServerRequest, ServerResponse, ServerResult}; use once_cell::sync::Lazy; use serde_json::{json, Value}; @@ -9,11 +7,11 @@ use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::os::raw::c_char; use std::ptr::null_mut; -use std::slice; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; use serde::{Deserialize, Serialize}; use tokio::time::{Instant}; +use uuid::Uuid; pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); @@ -1068,4 +1066,287 @@ pub async fn message_handler( }).await.unwrap_or_else(|_| bad_request("Task execution failed")); message_result +} + +// 定义request-reply使用的消息结构 +struct RequestReplyPair { + // request_uuid -> messages + pairs: HashMap>, +} + +impl RequestReplyPair { + fn new() -> Self { + RequestReplyPair { + pairs: HashMap::new(), + } + } +} + +// 全局共享状态 +static PAIR_SYSTEM: Lazy<(Mutex, Condvar)> = + Lazy::new(|| (Mutex::new(RequestReplyPair::new()), Condvar::new())); + +extern "C" fn response_callback( + topic: *const ::std::os::raw::c_char, + uuid: *const ::std::os::raw::c_char, + size: ::std::os::raw::c_int, + data: *const ::std::os::raw::c_char, + _: *mut ::std::os::raw::c_void, +) { + let uuid_str = unsafe { CStr::from_ptr(uuid).to_string_lossy().to_string() }; + + // 将data转换为切片 + let result = { + let slice = unsafe { std::slice::from_raw_parts(data as *const u8, size as usize) }; + slice.to_vec() + }; + + let message = Message { + uuid: uuid_str.clone(), + data: result.clone(), + size, + }; + + println!("response: {}, {:?}",uuid_str, result); + // 获取锁并添加消息 + let (pairs_mutex, condvar) = &*PAIR_SYSTEM; + let mut pairs_system = pairs_mutex.lock().unwrap(); + + if let Some(message_vec) = pairs_system.pairs.get_mut(&uuid_str) { + message_vec.push(message.clone()); + condvar.notify_all(); + } +} + +#[repr(C)] +pub struct CallbackContext { + uuid: *mut c_char, // CString 转 raw +} + +pub async fn request_handler( + _: Arc, + server_request: ServerRequest, +) -> ServerResponse { + + let bad_request = |body_str| { + ServerRawResponse::bad_request() + .body(Vec::from(body_str)) + .build() + .into() + }; + + let not_found = |body_str| { + ServerRawResponse::not_found() + .body(Vec::from(body_str)) + .build() + .into() + }; + + let headers = server_request.headers; + + // 获取消息主题 + let header_cloud_topic = match headers.get("x-cloud-topic") { + Some(value) => value.to_string(), + None => return bad_request("Missing x-cloud-topic header"), + }; + + let cloud_topic = match CString::new(header_cloud_topic.clone()) { + Ok(cstr) => cstr, + Err(_) => return bad_request("Invalid cloud topic"), + }; + + let body = server_request.body.as_ref().unwrap().as_slice(); + let data_content = body.to_vec(); + + // 生成这次请求的uuid + let uuid = Uuid::new_v4().to_string(); + println!("uuid: {}", uuid); + { + let (pairs_mutex, _) = &*PAIR_SYSTEM; + let mut pairs_system = pairs_mutex.lock().unwrap(); + + // 创建本次request的uuid和reply消息的映射 + pairs_system.pairs.insert(uuid.clone(), Vec::new()); + } + + let uuid_cstring = CString::new(uuid.clone()).unwrap(); + let uuid_ptr = uuid_cstring.into_raw(); // 转为裸指针,防止释放 + + let context = Box::new(CallbackContext { + uuid: uuid_ptr, + }); + let context_ptr = Box::into_raw(context); // 转为 C 可用指针 + + unsafe { + let ret = datamgr_api::Request( + DATA_PLUGIN_MANAGER, + cloud_topic.as_ptr(), + data_content.len() as i32, + data_content.as_ptr() as *const DataChar, + Some(response_callback), + context_ptr as *mut std::ffi::c_void, // 传入 context + ); + + if ret != 1 { + let (pairs_mutex, _) = &*PAIR_SYSTEM; + let mut pairs_system = pairs_mutex.lock().unwrap(); + pairs_system.pairs.remove(&uuid); + + return bad_request("Request data failed"); + } + } + + // 获取超时时间 + let timeout_ms = match headers.get("x-cloud-timeout") { + Some(value) => { + match value.to_string().parse::() { + Ok(ms) => Some(ms), + Err(_) => None, // 解析失败则视为无超时 + } + } + None => None, // 未指定则视为无超时 + }; + + // 在阻塞任务中获取消息 + let response_result = tokio::task::spawn_blocking(move || -> ServerResponse { + let (pairs_mutex, condvar) = &*PAIR_SYSTEM; + let mut pairs_system = pairs_mutex.lock().unwrap(); + + // 定义一个检查队列并获取消息的函数 + let check_and_get_message = |pairs_system: &mut RequestReplyPair| -> Option { + // 检查主题和订阅者是否存在 + if let Some(message_vec) = pairs_system.pairs.get_mut(&uuid) { + if !message_vec.is_empty() { + return Some(message_vec.remove(0)); + } + } + None + }; + + // 检查本次Request对应的消息队列是否存在 + if !pairs_system.pairs.contains_key(&uuid) { + return bad_request("RequestReplyPair create failed"); + } + + // 首先检查是否有立即可用的消息 + if let Some(message) = check_and_get_message(&mut *pairs_system) { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); + header.insert("Content-Length".to_string(), message.size.to_string()); + header.insert("Content-Type".to_string(), "application/octet-stream".to_string()); + return ServerRawResponse::ok() + .body(message.data) + .headers(header) + .build() + .into(); + } + + // 队列为空,需要等待 + match timeout_ms { + Some(ms) => { + // 有超时设置 + let timeout_duration = Duration::from_millis(ms); + let start_time = Instant::now(); + + // 循环等待直到有消息或超时 + loop { + // 计算剩余等待时间 + let elapsed = start_time.elapsed(); + if elapsed >= timeout_duration { + return not_found("No response received within timeout period"); + } + + let remaining = timeout_duration - elapsed; + let (mut new_system, timeout_result) = condvar.wait_timeout(pairs_system, remaining).unwrap(); + pairs_system = new_system; + + // 检查是否有消息 + if let Some(message) = check_and_get_message(&mut *pairs_system) { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); + header.insert("Content-Length".to_string(), message.size.to_string()); + header.insert("Content-Type".to_string(), "application/octet-stream".to_string()); + return ServerRawResponse::ok() + .body(message.data) + .headers(header) + .build() + .into(); + } + + // 如果超时且仍没有消息,返回超时错误 + if timeout_result.timed_out() { + return not_found("No message received within timeout period"); + } + } + }, + None => { + // 无限等待 + loop { + pairs_system = condvar.wait(pairs_system).unwrap(); + + // 检查是否有消息 + if let Some(message) = check_and_get_message(&mut *pairs_system) { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); + header.insert("Content-Length".to_string(), message.size.to_string()); + header.insert("Content-Type".to_string(), "application/octet-stream".to_string()); + return ServerRawResponse::ok() + .body(message.data) + .headers(header) + .build() + .into(); + } + } + } + } + }).await.unwrap_or_else(|_| bad_request("GetResponse execution failed")); + + response_result +} + +pub async fn reply_handler( + _: Arc, + server_request: ServerRequest, +) -> ServerResponse { + let headers = server_request.headers; + + let body = server_request.body.as_ref().unwrap().as_slice(); + let data_content = body.to_vec(); + + let bad_request = |body_str| { + ServerRawResponse::bad_request() + .body(Vec::from(body_str)) + .build() + .into() + }; + + let header_cloud_uuid = match headers.get("x-cloud-uuid") { + Some(value) => value.to_string(), + None => return bad_request("Missing x-cloud-uuid header"), + }; + + let cloud_uuid = match CString::new(header_cloud_uuid) { + Ok(cstr) => cstr, + Err(_) => return bad_request("Invalid cloud uuid"), + }; + + unsafe { + let ret = datamgr_api::Reply( + DATA_PLUGIN_MANAGER, + cloud_uuid.as_ptr(), + data_content.len() as i32, + data_content.as_ptr() as *const DataChar, + ); + if ret == 1 { + ServerRawResponse::ok() + .body(Vec::from("Reply Ok")) + .build() + .into() + } else { + ServerRawResponse::internal_error() + .body(Vec::from("Reply Failed")) + .build() + .into() + } + } } \ No newline at end of file diff --git a/src/cores/router.rs b/src/cores/router.rs index 9fc45af..fe69318 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -75,6 +75,8 @@ pub enum RouterKey { DataMgrPublish, DataMgrSubscribe, DataMgrMessage, + DataMgrRequest, + DataMgrReply, // Consensus Service Router Key ConsensusRequests, @@ -471,6 +473,18 @@ impl Router { RouterKey::DataMgrMessage, handlers::datamgr::message_handler ); + + add_route!( + router, + RouterKey::DataMgrRequest, + handlers::datamgr::request_handler + ); + + add_route!( + router, + RouterKey::DataMgrReply, + handlers::datamgr::reply_handler + ); } // consensus service routers diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 74a1e43..f87c797 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -914,6 +914,8 @@ pub mod datamgr { app.service(publish); app.service(subscribe); app.service(message); + app.service(request); + app.service(reply); } #[derive(Serialize, Deserialize, Apiv2Schema)] @@ -1119,6 +1121,26 @@ pub mod datamgr { RouterKey::DataMgrMessage, "DataMgr" ); + + define_raw_response_method!( + request, + post, + "/Request", + (), + (), + RouterKey::DataMgrRequest, + "DataMgr" + ); + + define_raw_response_method!( + reply, + post, + "/Reply", + (), + (), + RouterKey::DataMgrReply, + "DataMgr" + ); } pub mod consensus { -- Gitee From 265a3fbf193563c1070961b6b6699c607cf6bfc3 Mon Sep 17 00:00:00 2001 From: jiangliuwei Date: Wed, 4 Jun 2025 17:13:08 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8Drequest=20=E5=92=8Creply?= =?UTF-8?q?=20=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers/datamgr/route.rs | 307 +++++++++++++--------------- 1 file changed, 138 insertions(+), 169 deletions(-) diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 1630b68..8e041d4 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -1,7 +1,10 @@ use crate::cores::handlers::datamgr::datamgr_api; use crate::cores::state::AppState; -use fleetmodv2::api_server::{ServerError, ServerRawResponse, ServerRequest, ServerResponse, ServerResult}; +use fleetmodv2::api_server::{ + ServerError, ServerRawResponse, ServerRequest, ServerResponse, ServerResult, +}; use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; use std::ffi::{CStr, CString}; @@ -9,8 +12,8 @@ use std::os::raw::c_char; use std::ptr::null_mut; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; -use serde::{Deserialize, Serialize}; -use tokio::time::{Instant}; +use tokio::sync::oneshot; +use tokio::time::Instant; use uuid::Uuid; pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); @@ -28,7 +31,7 @@ type DataChar = u8; type DataChar = u8; pub async fn upload_data_handler( - state : Arc, + state: Arc, server_request: ServerRequest, ) -> ServerResponse { let headers = server_request.headers; @@ -263,10 +266,7 @@ pub async fn download_data_handler( datamgr_api::FreeMem(data); - ServerRawResponse::ok() - .body(result) - .build() - .into() + ServerRawResponse::ok().body(result).build().into() } } @@ -866,7 +866,11 @@ pub async fn subscribe_handler( let subscriber_id = match headers.get("x-cloud-subscriber-id") { Some(value) => value.to_string(), - None => return Err(ServerError::bad_request("Missing x-cloud-subscriber-id header")), + None => { + return Err(ServerError::bad_request( + "Missing x-cloud-subscriber-id header", + )) + } }; let cloud_topic = match CString::new(header_cloud_topic.clone()) { @@ -880,7 +884,8 @@ pub async fn subscribe_handler( let mut queue_system = queue_mutex.lock().unwrap(); // 获取或创建topic的订阅者映射 - let topic_subscribers = queue_system.queues + let topic_subscribers = queue_system + .queues .entry(header_cloud_topic.clone()) .or_insert_with(HashMap::new); @@ -922,11 +927,7 @@ pub async fn subscribe_handler( } // 获取消息处理函数 -pub async fn message_handler( - _: Arc, - server_request: ServerRequest, -) -> ServerResponse { - +pub async fn message_handler(_: Arc, server_request: ServerRequest) -> ServerResponse { let bad_request = |body_str| { ServerRawResponse::bad_request() .body(Vec::from(body_str)) @@ -1001,7 +1002,12 @@ pub async fn message_handler( return not_found("Topic not found"); } - if !queue_system.queues.get(&header_cloud_topic).unwrap().contains_key(&subscriber_id) { + if !queue_system + .queues + .get(&header_cloud_topic) + .unwrap() + .contains_key(&subscriber_id) + { return not_found("Subscriber not found"); } @@ -1021,7 +1027,8 @@ pub async fn message_handler( } let remaining = timeout_duration - elapsed; - let (mut new_system, timeout_result) = condvar.wait_timeout(queue_system, remaining).unwrap(); + let (mut new_system, timeout_result) = + condvar.wait_timeout(queue_system, remaining).unwrap(); queue_system = new_system; // 检查是否有消息 @@ -1042,7 +1049,7 @@ pub async fn message_handler( return not_found("No message received within timeout period"); } } - }, + } None => { // 无限等待 loop { @@ -1063,7 +1070,9 @@ pub async fn message_handler( } } } - }).await.unwrap_or_else(|_| bad_request("Task execution failed")); + }) + .await + .unwrap_or_else(|_| bad_request("Task execution failed")); message_result } @@ -1086,14 +1095,62 @@ impl RequestReplyPair { static PAIR_SYSTEM: Lazy<(Mutex, Condvar)> = Lazy::new(|| (Mutex::new(RequestReplyPair::new()), Condvar::new())); +fn c_str_to_string(ptr: *const c_char, ctx: &str) -> Option { + unsafe { + CStr::from_ptr(ptr) + .to_str() + .map(|s| s.to_string()) + .map_err(|e| log::error!("Invalid UTF-8 in {}: {}", ctx, e)) + .ok() + } +} + extern "C" fn response_callback( - topic: *const ::std::os::raw::c_char, - uuid: *const ::std::os::raw::c_char, - size: ::std::os::raw::c_int, - data: *const ::std::os::raw::c_char, - _: *mut ::std::os::raw::c_void, + topic: *const c_char, + uuid: *const c_char, + size: i32, + data: *const c_char, + closure: *mut ::std::os::raw::c_void, ) { - let uuid_str = unsafe { CStr::from_ptr(uuid).to_string_lossy().to_string() }; + // 转换 topic + let topic_str = if let Some(s) = c_str_to_string(topic, "topic") { + s + } else { + return; + }; + // 转换 uuid + let uuid_str = if let Some(s) = c_str_to_string(uuid, "uuid") { + s + } else { + return; + }; + log::debug!( + "apisverver response callback: topic = {}, uuid={}, size = {}", + topic_str, + uuid_str, + size as i32 + ); + + let arc_uuid_cstr = unsafe { Arc::from_raw(closure as *const CString) }; + let closure_str = match arc_uuid_cstr.to_str() { + Ok(s) => s.to_string(), + Err(e) => { + log::error!("Invalid UTF-8 in closure: {}", e); + return; + } + }; + + // 校验数据指针 + if data.is_null() || size < 0 { + log::error!("Invalid data (ptr={:?}, size={})", data, size); + return; + } + log::debug!( + "apisverver response received: topic={}, uuid={}, size={}", + topic_str, + uuid_str, + size + ); // 将data转换为切片 let result = { @@ -1107,15 +1164,13 @@ extern "C" fn response_callback( size, }; - println!("response: {}, {:?}",uuid_str, result); - // 获取锁并添加消息 - let (pairs_mutex, condvar) = &*PAIR_SYSTEM; - let mut pairs_system = pairs_mutex.lock().unwrap(); - - if let Some(message_vec) = pairs_system.pairs.get_mut(&uuid_str) { - message_vec.push(message.clone()); - condvar.notify_all(); + let mut response_map = RESPONSE_MAP.lock().unwrap(); + if let Some(sender) = response_map.remove(&closure_str) { + let _ = sender.send(message); + } else { + log::warn!("No request found for UUID: {}", closure_str); } + // }); } #[repr(C)] @@ -1123,25 +1178,16 @@ pub struct CallbackContext { uuid: *mut c_char, // CString 转 raw } -pub async fn request_handler( - _: Arc, - server_request: ServerRequest, -) -> ServerResponse { +static RESPONSE_MAP: Lazy>>>> = + Lazy::new(|| Arc::new(Mutex::new(HashMap::new()))); +pub async fn request_handler(_: Arc, server_request: ServerRequest) -> ServerResponse { let bad_request = |body_str| { ServerRawResponse::bad_request() .body(Vec::from(body_str)) .build() .into() }; - - let not_found = |body_str| { - ServerRawResponse::not_found() - .body(Vec::from(body_str)) - .build() - .into() - }; - let headers = server_request.headers; // 获取消息主题 @@ -1158,25 +1204,32 @@ pub async fn request_handler( let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); + // 获取超时时间 + let timeout_ms = match headers.get("x-cloud-timeout") { + Some(value) => { + match value.to_string().parse::() { + Ok(ms) => ms, + Err(_) => 300000, // 解析失败超时时间默认为五分钟 + } + } + None => 300000, // 解析失败超时时间默认为五分钟 + }; + // 生成这次请求的uuid let uuid = Uuid::new_v4().to_string(); println!("uuid: {}", uuid); - { - let (pairs_mutex, _) = &*PAIR_SYSTEM; - let mut pairs_system = pairs_mutex.lock().unwrap(); - - // 创建本次request的uuid和reply消息的映射 - pairs_system.pairs.insert(uuid.clone(), Vec::new()); - } - let uuid_cstring = CString::new(uuid.clone()).unwrap(); - let uuid_ptr = uuid_cstring.into_raw(); // 转为裸指针,防止释放 - - let context = Box::new(CallbackContext { - uuid: uuid_ptr, - }); - let context_ptr = Box::into_raw(context); // 转为 C 可用指针 + let uuid_cstr = match CString::new(uuid.clone()) { + Ok(cstr) => cstr, + Err(_) => return bad_request("Invalid cloud uuid"), + }; + let uuid_cstr_arc = Arc::new(uuid_cstr); + let (tx, mut rx) = oneshot::channel(); + { + let mut response_map = RESPONSE_MAP.lock().unwrap(); + response_map.insert(uuid.clone(), tx); + } unsafe { let ret = datamgr_api::Request( DATA_PLUGIN_MANAGER, @@ -1184,130 +1237,46 @@ pub async fn request_handler( data_content.len() as i32, data_content.as_ptr() as *const DataChar, Some(response_callback), - context_ptr as *mut std::ffi::c_void, // 传入 context + Arc::into_raw(uuid_cstr_arc) as *mut std::ffi::c_void, // 传入 context ); - if ret != 1 { let (pairs_mutex, _) = &*PAIR_SYSTEM; let mut pairs_system = pairs_mutex.lock().unwrap(); pairs_system.pairs.remove(&uuid); - return bad_request("Request data failed"); } } - // 获取超时时间 - let timeout_ms = match headers.get("x-cloud-timeout") { - Some(value) => { - match value.to_string().parse::() { - Ok(ms) => Some(ms), - Err(_) => None, // 解析失败则视为无超时 - } - } - None => None, // 未指定则视为无超时 - }; - - // 在阻塞任务中获取消息 - let response_result = tokio::task::spawn_blocking(move || -> ServerResponse { - let (pairs_mutex, condvar) = &*PAIR_SYSTEM; - let mut pairs_system = pairs_mutex.lock().unwrap(); - - // 定义一个检查队列并获取消息的函数 - let check_and_get_message = |pairs_system: &mut RequestReplyPair| -> Option { - // 检查主题和订阅者是否存在 - if let Some(message_vec) = pairs_system.pairs.get_mut(&uuid) { - if !message_vec.is_empty() { - return Some(message_vec.remove(0)); - } - } - None - }; - - // 检查本次Request对应的消息队列是否存在 - if !pairs_system.pairs.contains_key(&uuid) { - return bad_request("RequestReplyPair create failed"); - } - - // 首先检查是否有立即可用的消息 - if let Some(message) = check_and_get_message(&mut *pairs_system) { - let mut header = HashMap::new(); - header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); - header.insert("Content-Length".to_string(), message.size.to_string()); - header.insert("Content-Type".to_string(), "application/octet-stream".to_string()); - return ServerRawResponse::ok() - .body(message.data) - .headers(header) - .build() - .into(); - } - - // 队列为空,需要等待 - match timeout_ms { - Some(ms) => { - // 有超时设置 - let timeout_duration = Duration::from_millis(ms); - let start_time = Instant::now(); - - // 循环等待直到有消息或超时 - loop { - // 计算剩余等待时间 - let elapsed = start_time.elapsed(); - if elapsed >= timeout_duration { - return not_found("No response received within timeout period"); - } - - let remaining = timeout_duration - elapsed; - let (mut new_system, timeout_result) = condvar.wait_timeout(pairs_system, remaining).unwrap(); - pairs_system = new_system; - - // 检查是否有消息 - if let Some(message) = check_and_get_message(&mut *pairs_system) { - let mut header = HashMap::new(); - header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); - header.insert("Content-Length".to_string(), message.size.to_string()); - header.insert("Content-Type".to_string(), "application/octet-stream".to_string()); - return ServerRawResponse::ok() - .body(message.data) - .headers(header) - .build() - .into(); - } - - // 如果超时且仍没有消息,返回超时错误 - if timeout_result.timed_out() { - return not_found("No message received within timeout period"); - } + // 使用 select 等待 rx 或者超时 + tokio::select! { + res = &mut rx => { + match res { + Ok(response_message) => { + let mut header = HashMap::new(); + header.insert("x-cloud-uuid".to_string(), response_message.uuid.to_string()); + header.insert("Content-Length".to_string(), response_message.size.to_string()); + header.insert("Content-Type".to_string(), "text/plain".to_string()); + return ServerRawResponse::ok() + .body(response_message.data) + .headers(header) + .build() + .into(); } - }, - None => { - // 无限等待 - loop { - pairs_system = condvar.wait(pairs_system).unwrap(); - - // 检查是否有消息 - if let Some(message) = check_and_get_message(&mut *pairs_system) { - let mut header = HashMap::new(); - header.insert("x-cloud-uuid".to_string(), message.uuid.to_string()); - header.insert("Content-Length".to_string(), message.size.to_string()); - header.insert("Content-Type".to_string(), "application/octet-stream".to_string()); - return ServerRawResponse::ok() - .body(message.data) - .headers(header) - .build() - .into(); - } + Err(e) => { + return bad_request(format!("Failed to receive response: {:?}", e).as_str()); } } } - }).await.unwrap_or_else(|_| bad_request("GetResponse execution failed")); - - response_result + _ = tokio::time::sleep(Duration::from_millis(timeout_ms)) => { + let mut response_map = RESPONSE_MAP.lock().unwrap(); + response_map.remove(&uuid); + return bad_request("receive response timeout"); + } + } } -pub async fn reply_handler( - _: Arc, - server_request: ServerRequest, -) -> ServerResponse { + +pub async fn reply_handler(_: Arc, server_request: ServerRequest) -> ServerResponse { let headers = server_request.headers; let body = server_request.body.as_ref().unwrap().as_slice(); @@ -1349,4 +1318,4 @@ pub async fn reply_handler( .into() } } -} \ No newline at end of file +} -- Gitee