From 6699345f9a6721071250249be5a3b5810a116da7 Mon Sep 17 00:00:00 2001 From: Super User Date: Tue, 18 Mar 2025 11:45:56 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E4=B8=AA?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E8=80=85=E8=AE=A2=E9=98=85=E5=90=8C=E4=B8=80?= =?UTF-8?q?=E4=B8=AAtopic=E6=97=B6=E5=9B=9E=E8=B0=83=E5=87=BD=E6=95=B0?= =?UTF-8?q?=E8=A2=AB=E8=A6=86=E7=9B=96=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 3 +- src/cores/handlers/datamgr/route.rs | 77 ++++++++++++++++++++++++++++- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ad3bf40..c383cd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,4 +60,5 @@ quiche = { version = "0.23.2" } ring = "0.17.12" mio = "1.0.3" url = "2.5.4" -regex = "1.11.1" \ No newline at end of file +regex = "1.11.1" +once_cell = "1.21.1" \ No newline at end of file diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index ca8c062..f5c0c6d 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -1,9 +1,11 @@ +use std::collections::HashMap; use crate::cores::handlers::datamgr::datamgr_api; use crate::cores::models::ServerRequest; use std::ffi::{CStr, CString}; use std::os::raw::c_char; use std::ptr::null_mut; use std::sync::{Arc, Mutex}; +use once_cell::sync::Lazy; use serde_json::{json, Value}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -13,6 +15,8 @@ use crate::{ServerError, ServerRawResponse, ServerResponse}; pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); +static SUBSCRIBERS: Lazy>>>> = + Lazy::new(|| Mutex::new(HashMap::new())); /* pub async fn upload_data_handler( _: Arc, @@ -813,7 +817,7 @@ pub async fn publish_handler( } } -extern "C" fn rust_callback( +/*extern "C" fn rust_callback( topic: *const ::std::os::raw::c_char, uuid: *const ::std::os::raw::c_char, size: ::std::os::raw::c_int, @@ -881,4 +885,73 @@ pub async fn subscribe_handler( } Ok(ReceiverStream::new(rx)) -} \ No newline at end of file +}*/ + +extern "C" fn rust_callback( + topic: *const ::std::os::raw::c_char, + uuid: *const ::std::os::raw::c_char, + size: ::std::os::raw::c_int, + data: *const ::std::os::raw::c_char, + _: *mut ::std::os::raw::c_void, +) { + let topic_str = unsafe { CStr::from_ptr(topic).to_string_lossy().to_string() }; + let uuid_str = unsafe { CStr::from_ptr(uuid).to_string_lossy().to_string() }; + let data_str = unsafe { CStr::from_ptr(data).to_string_lossy().to_string() }; + + let json_data = serde_json::json!({ + "topic": topic_str, + "uuid": uuid_str, + "size": size, + "data": data_str + }); + + // 广播给所有订阅者 + let mut subs = SUBSCRIBERS.lock().unwrap(); + if let Some(subscribers) = subs.get_mut(&topic_str) { + subscribers.retain(|sender| sender.try_send(json_data.clone()).is_ok()); + } +} + +pub async fn subscribe_handler( + _: Arc, + server_request: ServerRequest, +) -> ServerResult> { + let headers = server_request.headers; + + let header_cloud_topic = match headers.get("x-cloud-topic") { + Some(value) => value.to_string(), + None => return Err(ServerError::bad_request("Missing x-cloud-topic header")), + }; + + let cloud_topic = match CString::new(header_cloud_topic.clone()) { + Ok(cstr) => cstr, + Err(_) => return Err(ServerError::bad_request("Invalid cloud topic")), + }; + + let (tx, rx) = mpsc::channel(10); + + // 将订阅者加入全局 HashMap + let mut subs = SUBSCRIBERS.lock().unwrap(); + subs.entry(header_cloud_topic.clone()) + .or_insert_with(Vec::new) + .push(tx); + + // 确保只调用一次 `datamgr_api::Subscribe` + if subs.get(&header_cloud_topic).unwrap().len() == 1 { + unsafe { + let ret = datamgr_api::Subscribe( + DATA_PLUGIN_MANAGER, + cloud_topic.as_ptr(), + Some(rust_callback), + std::ptr::null_mut(), + ); + + if ret != 1 { + subs.remove(&header_cloud_topic); + return Err(ServerError::internal_error("Subscribe data failed")); + } + } + } + + Ok(ReceiverStream::new(rx)) +} -- Gitee From d9657cd56fa4dce7291b757855fd1bf1d430aca9 Mon Sep 17 00:00:00 2001 From: Super User Date: Tue, 18 Mar 2025 15:35:16 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E5=A4=A7=E5=B0=8F=E9=99=90=E5=88=B6=E5=9C=A850GB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/servers/actix_web/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index f0dc917..715217c 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -72,6 +72,7 @@ macro_rules! init_app { App::new() .wrap_api_with_spec(spec.clone()) .app_data(Data::new($app_state.clone())) + .app_data(actix_web::web::PayloadConfig::new(50 * 1024 * 1024 * 1024))//设置50GB payload限制 .configure(api_server::configure_routes) .configure(cluster_info::configure_routes) .configure(datamgr::configure_routes) -- Gitee