From 6529da8074a05c5f74e64123739482384a020615 Mon Sep 17 00:00:00 2001 From: Super User Date: Thu, 13 Mar 2025 10:33:00 +0800 Subject: [PATCH 1/9] =?UTF-8?q?basic=5Fexample=E5=90=AF=E5=8A=A8=E6=97=B6?= =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96PLUGIN=5FMANAGER?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/basic_example.rs | 20 ++++++++++++++++++++ src/cores/handlers/datamgr/route.rs | 2 +- storage.replica.location/owner1rawtest | 1 + storage.replica.location/ownerrawtest | 1 + 4 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 storage.replica.location/owner1rawtest create mode 100644 storage.replica.location/ownerrawtest diff --git a/examples/basic_example.rs b/examples/basic_example.rs index 1d73296..0e356d3 100644 --- a/examples/basic_example.rs +++ b/examples/basic_example.rs @@ -1,5 +1,8 @@ use env_logger::{Builder, Target}; use fleet_apiserver::utils::test::TestServerStartParams; +use fleet_apiserver::cores::handlers::datamgr::{datamgr_api, DATA_PLUGIN_MANAGER}; +use std::ffi::CString; + const TCP_ADDRESS: &str = "0.0.0.0:38080"; const UDP_ADDRESS: &str = "0.0.0.0:38081"; @@ -8,6 +11,7 @@ const QUIC_ADDRESS: &str = "0.0.0.0:38082"; #[actix_web::main] async fn main() { let mut builder = Builder::from_default_env(); + init_data_plugin_manager(); builder.target(Target::Stdout); let _ = builder.try_init(); let (msg_cli, app_state) = fleet_apiserver::utils::test::setup_full_test_env(TestServerStartParams::builder() @@ -19,4 +23,20 @@ async fn main() { log::info!("APIServer started, cluster_id: {}", app_state.cluster_id); tokio::signal::ctrl_c().await.unwrap(); fleet_apiserver::utils::test::tear_down_message_cli(msg_cli) +} + +fn init_data_plugin_manager() { + unsafe { + if DATA_PLUGIN_MANAGER.is_null() { + DATA_PLUGIN_MANAGER = datamgr_api::NewPluginManager(); + let plugin_to_load_key = CString::new("core.pluginsToLoad").unwrap(); + let plugin_to_load_value = CString::new("Messaging Storage Portal").unwrap(); + datamgr_api::SetParameter( + DATA_PLUGIN_MANAGER, + plugin_to_load_key.as_ptr(), + plugin_to_load_value.as_ptr() + ); + datamgr_api::LoadPlugins(DATA_PLUGIN_MANAGER); + } + } } \ No newline at end of file diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 3bd7c0e..85ebb20 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -5,7 +5,7 @@ use std::os::raw::c_char; use std::sync::Arc; use crate::cores::state::AppState; -static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = std::ptr::null_mut(); +pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = std::ptr::null_mut(); pub async fn upload_data_handler( _: Arc, diff --git a/storage.replica.location/owner1rawtest b/storage.replica.location/owner1rawtest new file mode 100644 index 0000000..f69d576 --- /dev/null +++ b/storage.replica.location/owner1rawtest @@ -0,0 +1 @@ +{"123456"} \ No newline at end of file diff --git a/storage.replica.location/ownerrawtest b/storage.replica.location/ownerrawtest new file mode 100644 index 0000000..f69d576 --- /dev/null +++ b/storage.replica.location/ownerrawtest @@ -0,0 +1 @@ +{"123456"} \ No newline at end of file -- Gitee From 377b9597e366d3bf09ce1e042e35e0c8f48c6893 Mon Sep 17 00:00:00 2001 From: Super User Date: Thu, 13 Mar 2025 10:34:58 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=A4=9A=E4=BD=99?= =?UTF-8?q?=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- storage.replica.location/owner1rawtest | 1 - storage.replica.location/ownerrawtest | 1 - 2 files changed, 2 deletions(-) delete mode 100644 storage.replica.location/owner1rawtest delete mode 100644 storage.replica.location/ownerrawtest diff --git a/storage.replica.location/owner1rawtest b/storage.replica.location/owner1rawtest deleted file mode 100644 index f69d576..0000000 --- a/storage.replica.location/owner1rawtest +++ /dev/null @@ -1 +0,0 @@ -{"123456"} \ No newline at end of file diff --git a/storage.replica.location/ownerrawtest b/storage.replica.location/ownerrawtest deleted file mode 100644 index f69d576..0000000 --- a/storage.replica.location/ownerrawtest +++ /dev/null @@ -1 +0,0 @@ -{"123456"} \ No newline at end of file -- Gitee From 3f2789151d3333929ede93e68939a6acad169ea9 Mon Sep 17 00:00:00 2001 From: Super User Date: Thu, 13 Mar 2025 11:15:18 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E6=B7=BB=E5=8A=A0Pub/Sub=E7=9B=B8=E5=85=B3?= =?UTF-8?q?route=E7=9A=84=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers/datamgr/route.rs | 77 +++++++++++++++++++++++++++++ src/cores/router.rs | 14 ++++++ src/cores/servers/actix_web/mod.rs | 22 +++++++++ 3 files changed, 113 insertions(+) diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 85ebb20..e587630 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -838,4 +838,81 @@ pub async fn order_result_handler( .build() .into() } +} + +pub async fn publish_handler( + _: Arc, + server_request: ServerRequest, +) -> ServerResponse { + + let headers = server_request.headers; + + let body = server_request.body.as_ref().unwrap().as_slice(); + let data_content = body.to_vec(); + + let bad_request = |body_str| { + ServerRawResponse::bad_request() + .body(Vec::from(body_str)) + .build() + .into() + }; + + let header_cloud_topic = match headers.get("x-cloud-topic") { + Some(value) => value.to_string(), + None => return bad_request("Missing x-cloud-topic header"), + }; + + let cloud_topic = match CString::new(header_cloud_topic) { + Ok(cstr) => cstr, + Err(_) => return bad_request("Invalid cloud topic"), + }; + + unsafe { + ServerRawResponse::ok() + .body(Vec::from("Publish data success")) + .build() + .into() + } +} + +pub async fn subscribe_handler( + _: Arc, + server_request: ServerRequest, +) -> ServerResponse { + + let headers = server_request.headers; + + let bad_request = |body_str| { + ServerRawResponse::bad_request() + .body(Vec::from(body_str)) + .build() + .into() + }; + + let header_cloud_topic = match headers.get("x-cloud-topic") { + Some(value) => value.to_string(), + None => return bad_request("Missing x-cloud-topic header"), + }; + + let cloud_topic = match CString::new(header_cloud_topic) { + Ok(cstr) => cstr, + Err(_) => return bad_request("Invalid cloud topic"), + }; + + let header_cloud_subscriber_id = match headers.get("x-cloud-subscriber-id") { + Some(value) => value.to_string(), + None => return bad_request("Missing x-cloud-subscriber-id header"), + }; + + let cloud_subscriber_id = match CString::new(header_cloud_subscriber_id) { + Ok(cstr) => cstr, + Err(_) => return bad_request("Invalid cloud subscriber id"), + }; + + unsafe { + ServerRawResponse::ok() + .body(Vec::from("Subscribe data success")) + .build() + .into() + } } \ No newline at end of file diff --git a/src/cores/router.rs b/src/cores/router.rs index 8608f3e..e20c0d9 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -39,6 +39,8 @@ pub enum RouterKey { DataMgrDispatchingOrder, DataMgrOrderStatus, DataMgrOrderResult, + DataMgrPublish, + DataMgrSubscribe, } impl RouterKey { @@ -232,6 +234,18 @@ impl Router { RouterKey::DataMgrOrderResult, handlers::datamgr::order_result_handler ); + + add_route!( + router, + RouterKey::DataMgrPublish, + handlers::datamgr::publish_handler + ); + + add_route!( + router, + RouterKey::DataMgrSubscribe, + handlers::datamgr::subscribe_handler + ); } router diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 69bbe6a..6b905ce 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -422,6 +422,8 @@ pub mod datamgr { .service(upload_data) .service(query_data) .service(download_data) + .service(publish) + .service(subscribe) ); app.service( web::scope("/Order") @@ -582,4 +584,24 @@ pub mod datamgr { RouterKey::DataMgrOrderResult, "DataMgr" ); + + define_raw_response_method!( + publish, + post, + "/Publish", + (), + (), + RouterKey::DataMgrPublish, + "DataMgr" + ); + + define_raw_response_method!( + subscribe, + post, + "/Subscribe", + (), + (), + RouterKey::DataMgrSubscribe, + "DataMgr" + ); } -- Gitee From f047333323b9e0c93b1836db5e29f7320bf28a73 Mon Sep 17 00:00:00 2001 From: Super User Date: Thu, 13 Mar 2025 11:25:09 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E7=BC=96=E5=86=99publish=20handler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers/datamgr/route.rs | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index e587630..5956541 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -868,10 +868,23 @@ pub async fn publish_handler( }; unsafe { - ServerRawResponse::ok() - .body(Vec::from("Publish data success")) - .build() - .into() + let ret = datamgr_api::Publish( + DATA_PLUGIN_MANAGER, + cloud_topic.as_ptr(), + data_content.len() as i32, + data_content.as_ptr() as *const i8, + ); + if ret == 1 { + ServerRawResponse::ok() + .body(Vec::from("Publish data success")) + .build() + .into() + } else { + ServerRawResponse::internal_error() + .body(Vec::from("Publish data failed")) + .build() + .into() + } } } -- Gitee From 5e89b04216246378063ca9f4427da0d8542f2d15 Mon Sep 17 00:00:00 2001 From: Super User Date: Thu, 13 Mar 2025 17:02:05 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E7=AC=AC=E4=B8=80?= =?UTF-8?q?=E7=89=88sub=20handler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers/datamgr/route.rs | 63 +++++++++++++++++++++-------- src/cores/servers/actix_web/mod.rs | 6 +-- 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 5956541..941a700 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -1,6 +1,6 @@ use crate::cores::handlers::datamgr::datamgr_api; use crate::cores::models::{ServerRawResponse, ServerRequest, ServerResponse}; -use std::ffi::CString; +use std::ffi::{CStr, CString}; use std::os::raw::c_char; use std::sync::Arc; use crate::cores::state::AppState; @@ -11,7 +11,6 @@ pub async fn upload_data_handler( _: Arc, server_request: ServerRequest, ) -> ServerResponse { - let headers = server_request.headers; let body = server_request.body.as_ref().unwrap().as_slice(); @@ -888,11 +887,36 @@ pub async fn publish_handler( } } +extern "C" fn rust_callback( + topic: *const ::std::os::raw::c_char, + uuid: *const ::std::os::raw::c_char, + size: ::std::os::raw::c_int, + data: *const ::std::os::raw::c_char, + closure: *mut ::std::os::raw::c_void, +) { + unsafe { + let topic_str = CStr::from_ptr(topic).to_str().unwrap(); + let uuid_str = CStr::from_ptr(uuid).to_str().unwrap(); + let data_str = CStr::from_ptr(data).to_str().unwrap(); + let closure_str = if closure.is_null() { + "NULL" + } else { + match CStr::from_ptr(closure as *const ::std::os::raw::c_char).to_str() { + Ok(s) => s, + Err(_) => "INVALID_UTF8", + } + }; + println!( + "Receive message in Rust: topic = {}, uuid={}, size = {}, data = {}, closure = {}", + topic_str, uuid_str, size as i32, data_str, closure_str + ); + } +} + pub async fn subscribe_handler( _: Arc, server_request: ServerRequest, ) -> ServerResponse { - let headers = server_request.headers; let bad_request = |body_str| { @@ -912,20 +936,25 @@ pub async fn subscribe_handler( Err(_) => return bad_request("Invalid cloud topic"), }; - let header_cloud_subscriber_id = match headers.get("x-cloud-subscriber-id") { - Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-subscriber-id header"), - }; - - let cloud_subscriber_id = match CString::new(header_cloud_subscriber_id) { - Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud subscriber id"), - }; - unsafe { - ServerRawResponse::ok() - .body(Vec::from("Subscribe data success")) - .build() - .into() + let closure_string = CString::new("closure").unwrap(); + + let ret = datamgr_api::Subscribe( + DATA_PLUGIN_MANAGER, + cloud_topic.as_ptr(), + Some(rust_callback), + closure_string.as_ptr() as *mut ::std::os::raw::c_void, + ); + if ret == 1 { + ServerRawResponse::ok() + .body(Vec::from("Subscribe data success")) + .build() + .into() + } else { + ServerRawResponse::internal_error() + .body(Vec::from("Subscribe data failed")) + .build() + .into() + } } } \ No newline at end of file diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 6b905ce..4b1adac 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -422,8 +422,6 @@ pub mod datamgr { .service(upload_data) .service(query_data) .service(download_data) - .service(publish) - .service(subscribe) ); app.service( web::scope("/Order") @@ -433,6 +431,8 @@ pub mod datamgr { .service(order_status) .service(order_result) ); + app.service(publish); + app.service(subscribe); } #[derive(Serialize, Deserialize, Apiv2Schema)] @@ -597,7 +597,7 @@ pub mod datamgr { define_raw_response_method!( subscribe, - post, + get, "/Subscribe", (), (), -- Gitee From e0ae70685599ef3162094020e5453b1650a492dd Mon Sep 17 00:00:00 2001 From: Super User Date: Fri, 14 Mar 2025 11:27:16 +0800 Subject: [PATCH 6/9] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=83=A8=E5=88=86handler?= =?UTF-8?q?=E8=BF=94=E5=9B=9E=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- portal-metadata.db | Bin 0 -> 28672 bytes src/cores/handlers/datamgr/route.rs | 407 ++++++------------ src/cores/servers/actix_web/mod.rs | 48 ++- .../data-ownerrawtest-data | 1 + 4 files changed, 153 insertions(+), 303 deletions(-) create mode 100644 portal-metadata.db create mode 100644 storage.replica.location/data-ownerrawtest-data diff --git a/portal-metadata.db b/portal-metadata.db new file mode 100644 index 0000000000000000000000000000000000000000..f41f2e30fb8ebbb3e6b9e652510bd9cf0c1d2d0f GIT binary patch literal 28672 zcmeI)Z*S5-90&00#$b*xOn88cA>1A;NJ5o=ldwm2qnbHj6H4L|lUmBL)G#_(b>bem z>~-v&_}quShduBu=rivig~Eo$_=x4(wCUaT4|kv6mE?MX`m3_#Fxs|z$A&|5_}%wV_$ToPugU7K@Ym4!>bKx^XnWQLD2D(9 zAOHafKmY;|fB*#kOW@+oQXsmqAzhw3Mzh0?I(F0O=r_{-otmm>D%F(jvPu))(-QPi z%W#aUam=WuzSd}UpKpzFIn}jW(?#M;t$%V`@%~wt^``D1POn>^8QraF>I=0tzPItw zusWi~qeCV2xzZ?WG?TJVSkG`QyL-zgjgFO=!|yQoJkJG|XvA(st!qlU$+hU(i-*@LGV(&f=07RUWzOx(S1G9GR*V&Qg0Pb`wjCq934CpIezE}ouoQ%heI zDG*I0qzh%B-(x2o%QS`|n8++l#9`vltf%51AA!?;*Syik(|_%N$z*ZCAFU*2HF~0| zdS)Uk#`4SJTP4IdDSS=%iUk4?fB*y_009U<00Izz00bZaf%z497+hM9#FxcO3E38J z;CjZH!%iJpl&tV}N#P%auUH@e0SG_<0uX=z1Rwwb2tWV=5SVX)2fp>VG=2jx8vX}| z^Z!>Qd^O(LS=h>LV*Y=A zdk1O`0SG_<0uX=z1Rwwb2tWV=5SSA9, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let headers = server_request.headers; let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let params_data_type = server_request.params.get("data_type").unwrap(); let data_type = match CString::new(params_data_type.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid data type"), + Err(_) => return Err(ServerError::bad_request("Invalid data type")), }; let header_cloud_name = match headers.get("x-cloud-name") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-name header"), + None => return Err(ServerError::bad_request("Missing x-cloud-name header")), }; let cloud_name = match CString::new(header_cloud_name) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud name"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud name")), }; let header_cloud_to = match headers.get("x-cloud-to") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-to header"), + None => return Err(ServerError::bad_request("Missing x-cloud-to header")), }; let cloud_to = match CString::new(header_cloud_to) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud to"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud to")), }; unsafe { @@ -60,15 +57,10 @@ pub async fn upload_data_handler( data_content.len() as i32, ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Upload data success")) - .build() - .into() + let body_str = String::from_utf8_lossy(&data_content).to_string(); + Ok(body_str.into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Upload data failed")) - .build() - .into() + Err(ServerError::internal_error("Upload data failed")) } } } @@ -76,32 +68,25 @@ pub async fn upload_data_handler( pub async fn query_data_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let headers = server_request.headers; - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let params_data_type = server_request.params.get("data_type").unwrap(); let data_type = match CString::new(params_data_type.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid data type"), + Err(_) => return Err(ServerError::bad_request("Invalid data type")), }; let header_cloud_keyword = match headers.get("x-cloud-keyword") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-keyword header"), + None => return Err(ServerError::bad_request("Missing x-cloud-keyword header")), }; let cloud_keyword = match CString::new(header_cloud_keyword) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud keyword"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud keyword")), }; unsafe { @@ -115,11 +100,11 @@ pub async fn query_data_handler( ); if ret <= 0 { - return bad_request("QueryData failed"); + return Err(ServerError::internal_error("QueryData failed")); } if data.is_null() { - return bad_request("Received null pointer"); + return Err(ServerError::internal_error("Received null pointer")); } let result = { @@ -128,57 +113,48 @@ pub async fn query_data_handler( }; datamgr_api::FreeMem(data); - - ServerRawResponse::ok() - .body(result) - .build() - .into() + + let body_str = String::from_utf8_lossy(&result).to_string(); + Ok(body_str.into()) } } pub async fn download_data_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let headers = server_request.headers; - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let params_data_type = server_request.params.get("data_type").unwrap(); let data_type = match CString::new(params_data_type.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid data type"), + Err(_) => return Err(ServerError::bad_request("Invalid data type")), }; let header_cloud_name = match headers.get("x-cloud-name") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-name header"), + None => return Err(ServerError::bad_request("Missing x-cloud-name header")), }; let cloud_name = match CString::new(header_cloud_name) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud name"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud name")), }; let header_cloud_from = match headers.get("x-cloud-from") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-from header"), + None => return Err(ServerError::bad_request("Missing x-cloud-from header")), }; let cloud_from = match CString::new(header_cloud_from) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud from"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud from")), }; unsafe { - let mut data: *mut c_char = std::ptr::null_mut(); + let mut data: *mut c_char = null_mut(); let ret = datamgr_api::DownloadData( DATA_PLUGIN_MANAGER, @@ -189,11 +165,11 @@ pub async fn download_data_handler( ); if ret <= 0 { - return bad_request("DownloadData failed"); + return Err(ServerError::internal_error("DownloadData failed")); } if data.is_null() { - return bad_request("Received null pointer"); + return Err(ServerError::internal_error("Received null pointer")); } let result = { @@ -203,35 +179,26 @@ pub async fn download_data_handler( datamgr_api::FreeMem(data); - ServerRawResponse::ok() - .body(result) - .build() - .into() + let body_str = String::from_utf8_lossy(&result).to_string(); + Ok(body_str.into()) } } pub async fn receive_telemetry_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let headers = server_request.headers; - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let header_cloud_from = match headers.get("x-cloud-from") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-from header"), + None => return Err(ServerError::bad_request("Missing x-cloud-from header")), }; let cloud_from = match CString::new(header_cloud_from) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud from"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud from")), }; unsafe { @@ -240,15 +207,9 @@ pub async fn receive_telemetry_handler( cloud_from.as_ptr(), ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Receive telemetry success")) - .build() - .into() + Ok("Receive telemetry success".into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Receive telemetry failed")) - .build() - .into() + Err(ServerError::internal_error("Receive telemetry failed")) } } } @@ -256,28 +217,21 @@ pub async fn receive_telemetry_handler( pub async fn report_telemetry_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let headers = server_request.headers; let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let header_cloud_from = match headers.get("x-cloud-from") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-from header"), + None => return Err(ServerError::bad_request("Missing x-cloud-from header")), }; let cloud_from = match CString::new(header_cloud_from) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud from"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud from")), }; unsafe { @@ -288,15 +242,10 @@ pub async fn report_telemetry_handler( data_content.len() as i32, ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Report telemetry success")) - .build() - .into() + let body_str = String::from_utf8_lossy(&data_content).to_string(); + Ok(body_str.into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Report telemetry failed")) - .build() - .into() + Err(ServerError::internal_error("Report telemetry failed")) } } } @@ -304,28 +253,21 @@ pub async fn report_telemetry_handler( pub async fn send_remote_control_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let headers = server_request.headers; let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let header_cloud_to = match headers.get("x-cloud-to") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-to header"), + None => return Err(ServerError::bad_request("Missing x-cloud-to header")), }; let cloud_to = match CString::new(header_cloud_to) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud to"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud to")), }; unsafe { @@ -336,15 +278,10 @@ pub async fn send_remote_control_handler( data_content.len() as i32, ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Send remote control success")) - .build() - .into() + let body_str = String::from_utf8_lossy(&data_content).to_string(); + Ok(body_str.into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Send remote control failed")) - .build() - .into() + Err(ServerError::internal_error("Send remote control failed")) } } } @@ -352,52 +289,45 @@ pub async fn send_remote_control_handler( pub async fn sync_data_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let headers = server_request.headers; - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let params_data_type = server_request.params.get("data_type").unwrap(); let data_type = match CString::new(params_data_type.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid data type"), + Err(_) => return Err(ServerError::bad_request("Invalid data type")), }; let header_cloud_name = match headers.get("x-cloud-name") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-name header"), + None => return Err(ServerError::bad_request("Missing x-cloud-name header")), }; let cloud_name = match CString::new(header_cloud_name) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud name"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud name")), }; let header_cloud_from = match headers.get("x-cloud-from") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-from header"), + None => return Err(ServerError::bad_request("Missing x-cloud-from header")), }; let cloud_from = match CString::new(header_cloud_from) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud from"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud from")), }; let header_cloud_node = match headers.get("x-cloud-node") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-node header"), + None => return Err(ServerError::bad_request("Missing x-cloud-node header")), }; let cloud_node = match CString::new(header_cloud_node) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud node"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud node")), }; unsafe { @@ -409,15 +339,9 @@ pub async fn sync_data_handler( cloud_node.as_ptr(), ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Sync data success")) - .build() - .into() + Ok("Sync data success".into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Sync data failed")) - .build() - .into() + Err(ServerError::internal_error("Sync data failed")) } } } @@ -425,62 +349,55 @@ pub async fn sync_data_handler( pub async fn backup_data_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let headers = server_request.headers; - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let params_data_type = server_request.params.get("data_type").unwrap(); let data_type = match CString::new(params_data_type.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid data type"), + Err(_) => return Err(ServerError::bad_request("Invalid data type")), }; let header_cloud_name = match headers.get("x-cloud-name") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-name header"), + None => return Err(ServerError::bad_request("Missing x-cloud-name header")), }; let cloud_name = match CString::new(header_cloud_name) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud name"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud name")), }; let header_cloud_from = match headers.get("x-cloud-from") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-from header"), + None => return Err(ServerError::bad_request("Missing x-cloud-from header")), }; let cloud_from = match CString::new(header_cloud_from) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud from"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud from")), }; let header_cloud_version = match headers.get("x-cloud-version") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-version header"), + None => return Err(ServerError::bad_request("Missing x-cloud-version header")), }; let cloud_version = match CString::new(header_cloud_version) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud version"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud version")), }; let header_cloud_node = match headers.get("x-cloud-node") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-node header"), + None => return Err(ServerError::bad_request("Missing x-cloud-node header")), }; let cloud_node = match CString::new(header_cloud_node) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud node"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud node")), }; unsafe { @@ -493,15 +410,9 @@ pub async fn backup_data_handler( cloud_node.as_ptr(), ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Backup data success")) - .build() - .into() + Ok("Backup data success".into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Backup data failed")) - .build() - .into() + Err(ServerError::internal_error("Backup data failed")) } } } @@ -509,62 +420,55 @@ pub async fn backup_data_handler( pub async fn recover_data_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let headers = server_request.headers; - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let params_data_type = server_request.params.get("data_type").unwrap(); let data_type = match CString::new(params_data_type.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid data type"), + Err(_) => return Err(ServerError::bad_request("Invalid data type")), }; let header_cloud_name = match headers.get("x-cloud-name") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-name header"), + None => return Err(ServerError::bad_request("Missing x-cloud-name header")), }; let cloud_name = match CString::new(header_cloud_name) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud name"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud name")), }; let header_cloud_from = match headers.get("x-cloud-from") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-from header"), + None => return Err(ServerError::bad_request("Missing x-cloud-from header")), }; let cloud_from = match CString::new(header_cloud_from) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud from"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud from")), }; let header_cloud_version = match headers.get("x-cloud-version") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-version header"), + None => return Err(ServerError::bad_request("Missing x-cloud-version header")), }; let cloud_version = match CString::new(header_cloud_version) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud version"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud version")), }; let header_cloud_node = match headers.get("x-cloud-node") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-node header"), + None => return Err(ServerError::bad_request("Missing x-cloud-node header")), }; let cloud_node = match CString::new(header_cloud_node) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud node"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud node")), }; unsafe { @@ -577,15 +481,9 @@ pub async fn recover_data_handler( cloud_node.as_ptr(), ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Recover data success")) - .build() - .into() + Ok("Recover data success".into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Recover data failed")) - .build() - .into() + Err(ServerError::internal_error("Recover data failed")) } } } @@ -593,27 +491,20 @@ pub async fn recover_data_handler( pub async fn observation_order_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let params_uuid = server_request.params.get("uuid").unwrap(); if params_uuid.len() != 36 { - return bad_request("Invalid UUID length"); + return Err(ServerError::bad_request("Invalid UUID length")); } let uuid = match CString::new(params_uuid.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid uuid format"), + Err(_) => return Err(ServerError::bad_request("Invalid uuid format")), }; unsafe { @@ -624,15 +515,10 @@ pub async fn observation_order_handler( data_content.len() as i32, ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Observation order success")) - .build() - .into() + let body_str = String::from_utf8_lossy(&data_content).to_string(); + Ok(body_str.into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Observation order failed")) - .build() - .into() + Err(ServerError::internal_error("Observation order failed")) } } } @@ -640,27 +526,20 @@ pub async fn observation_order_handler( pub async fn processing_order_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let params_uuid = server_request.params.get("uuid").unwrap(); if params_uuid.len() != 36 { - return bad_request("Invalid UUID length"); + return Err(ServerError::bad_request("Invalid UUID length")); } let uuid = match CString::new(params_uuid.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid uuid format"), + Err(_) => return Err(ServerError::bad_request("Invalid uuid format")), }; unsafe { @@ -671,15 +550,10 @@ pub async fn processing_order_handler( data_content.len() as i32, ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Processing order success")) - .build() - .into() + let body_str = String::from_utf8_lossy(&data_content).to_string(); + Ok(body_str.into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Processing order failed")) - .build() - .into() + Err(ServerError::internal_error("Processing order failed")) } } } @@ -687,27 +561,20 @@ pub async fn processing_order_handler( pub async fn dispatching_order_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let params_uuid = server_request.params.get("uuid").unwrap(); if params_uuid.len() != 36 { - return bad_request("Invalid UUID length"); + return Err(ServerError::bad_request("Invalid UUID length")); } let uuid = match CString::new(params_uuid.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid uuid format"), + Err(_) => return Err(ServerError::bad_request("Invalid uuid format")), }; unsafe { @@ -718,15 +585,10 @@ pub async fn dispatching_order_handler( data_content.len() as i32, ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Dispatching order success")) - .build() - .into() + let body_str = String::from_utf8_lossy(&data_content).to_string(); + Ok(body_str.into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Dispatching order failed")) - .build() - .into() + Err(ServerError::internal_error("Dispatching order failed")) } } } @@ -734,24 +596,17 @@ pub async fn dispatching_order_handler( pub async fn order_status_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { - - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; +) -> ServerResult { let params_uuid = server_request.params.get("uuid").unwrap(); if params_uuid.len() != 36 { - return bad_request("Invalid UUID length"); + return Err(ServerError::bad_request("Invalid UUID length")); } let uuid = match CString::new(params_uuid.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid uuid format"), + Err(_) => return Err(ServerError::bad_request("Invalid uuid format")), }; unsafe { @@ -764,11 +619,11 @@ pub async fn order_status_handler( ); if ret <= 0 { - return bad_request("Get order status failed"); + return Err(ServerError::internal_error("Get order status failed")); } if data.is_null() { - return bad_request("Received null pointer"); + return Err(ServerError::internal_error("Received null pointer")); } let result = { @@ -778,34 +633,25 @@ pub async fn order_status_handler( datamgr_api::FreeMem(data); - ServerRawResponse::ok() - .body(result) - .build() - .into() + let body_str = String::from_utf8_lossy(&result).to_string(); + Ok(body_str.into()) } } pub async fn order_result_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { - - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; +) -> ServerResult { let params_uuid = server_request.params.get("uuid").unwrap(); if params_uuid.len() != 36 { - return bad_request("Invalid UUID length"); + return Err(ServerError::bad_request("Invalid UUID length")); } let uuid = match CString::new(params_uuid.as_str()) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid uuid format"), + Err(_) => return Err(ServerError::bad_request("Invalid uuid format")), }; unsafe { @@ -818,11 +664,11 @@ pub async fn order_result_handler( ); if ret <= 0 { - return bad_request("Get order status failed"); + return Err(ServerError::internal_error("Get order status failed")); } if data.is_null() { - return bad_request("Received null pointer"); + return Err(ServerError::internal_error("Received null pointer")); } let result = { @@ -832,10 +678,8 @@ pub async fn order_result_handler( datamgr_api::FreeMem(data); - ServerRawResponse::ok() - .body(result) - .build() - .into() + let body_str = String::from_utf8_lossy(&result).to_string(); + Ok(body_str.into()) } } @@ -892,23 +736,15 @@ extern "C" fn rust_callback( uuid: *const ::std::os::raw::c_char, size: ::std::os::raw::c_int, data: *const ::std::os::raw::c_char, - closure: *mut ::std::os::raw::c_void, + _closure: *mut ::std::os::raw::c_void, ) { unsafe { let topic_str = CStr::from_ptr(topic).to_str().unwrap(); let uuid_str = CStr::from_ptr(uuid).to_str().unwrap(); let data_str = CStr::from_ptr(data).to_str().unwrap(); - let closure_str = if closure.is_null() { - "NULL" - } else { - match CStr::from_ptr(closure as *const ::std::os::raw::c_char).to_str() { - Ok(s) => s, - Err(_) => "INVALID_UTF8", - } - }; println!( - "Receive message in Rust: topic = {}, uuid={}, size = {}, data = {}, closure = {}", - topic_str, uuid_str, size as i32, data_str, closure_str + "Receive message in Rust: topic = {}, uuid={}, size = {}, data = {}", + topic_str, uuid_str, size as i32, data_str ); } } @@ -937,13 +773,12 @@ pub async fn subscribe_handler( }; unsafe { - let closure_string = CString::new("closure").unwrap(); let ret = datamgr_api::Subscribe( DATA_PLUGIN_MANAGER, cloud_topic.as_ptr(), Some(rust_callback), - closure_string.as_ptr() as *mut ::std::os::raw::c_void, + null_mut() as *mut ::std::os::raw::c_void, ); if ret == 1 { ServerRawResponse::ok() diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 4b1adac..38683c6 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -445,7 +445,8 @@ pub mod datamgr { pub uuid: String, } - define_raw_response_method!( + define_json_response_method!( + body_required upload_data, post, "/{data_type}", @@ -455,7 +456,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_unrequired query_data, get, "/Query/{data_type}", @@ -465,7 +467,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_unrequired download_data, get, "/Download/{data_type}", @@ -475,9 +478,10 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_unrequired receive_telemetry, - get, + put, "/Telemetry", (), (), @@ -485,7 +489,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_required report_telemetry, post, "/Telemetry", @@ -495,7 +500,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_required send_remote_control, post, "/RemoteControl", @@ -505,7 +511,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_unrequired sync_data, post, "/Sync/{data_type}", @@ -515,7 +522,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_unrequired backup_data, post, "/Backup/{data_type}", @@ -525,7 +533,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_unrequired recover_data, post, "/Restore/{data_type}", @@ -535,7 +544,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_required observation_order, post, "/Observation/{uuid}", @@ -545,7 +555,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_required processing_order, post, "/Processing/{uuid}", @@ -553,9 +564,10 @@ pub mod datamgr { (), RouterKey::DataMgrProcessingOrder, "DataMgr" - ); + ); - define_raw_response_method!( + define_json_response_method!( + body_required dispatching_order, post, "/Dispatching/{uuid}", @@ -565,7 +577,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_unrequired order_status, get, "/Status/{uuid}", @@ -574,8 +587,9 @@ pub mod datamgr { RouterKey::DataMgrOrderStatus, "DataMgr" ); - - define_raw_response_method!( + + define_json_response_method!( + body_unrequired order_result, get, "/Result/{uuid}", diff --git a/storage.replica.location/data-ownerrawtest-data b/storage.replica.location/data-ownerrawtest-data new file mode 100644 index 0000000..d800886 --- /dev/null +++ b/storage.replica.location/data-ownerrawtest-data @@ -0,0 +1 @@ +123 \ No newline at end of file -- Gitee From 68d871c42d4389ac98d5a5f2b8987f0a20103661 Mon Sep 17 00:00:00 2001 From: Super User Date: Fri, 14 Mar 2025 11:27:55 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=83=A8=E5=88=86handler?= =?UTF-8?q?=E8=BF=94=E5=9B=9E=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- portal-metadata.db | Bin 28672 -> 0 bytes storage.replica.location/data-ownerrawtest-data | 1 - 2 files changed, 1 deletion(-) delete mode 100644 portal-metadata.db delete mode 100644 storage.replica.location/data-ownerrawtest-data diff --git a/portal-metadata.db b/portal-metadata.db deleted file mode 100644 index f41f2e30fb8ebbb3e6b9e652510bd9cf0c1d2d0f..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 28672 zcmeI)Z*S5-90&00#$b*xOn88cA>1A;NJ5o=ldwm2qnbHj6H4L|lUmBL)G#_(b>bem z>~-v&_}quShduBu=rivig~Eo$_=x4(wCUaT4|kv6mE?MX`m3_#Fxs|z$A&|5_}%wV_$ToPugU7K@Ym4!>bKx^XnWQLD2D(9 zAOHafKmY;|fB*#kOW@+oQXsmqAzhw3Mzh0?I(F0O=r_{-otmm>D%F(jvPu))(-QPi z%W#aUam=WuzSd}UpKpzFIn}jW(?#M;t$%V`@%~wt^``D1POn>^8QraF>I=0tzPItw zusWi~qeCV2xzZ?WG?TJVSkG`QyL-zgjgFO=!|yQoJkJG|XvA(st!qlU$+hU(i-*@LGV(&f=07RUWzOx(S1G9GR*V&Qg0Pb`wjCq934CpIezE}ouoQ%heI zDG*I0qzh%B-(x2o%QS`|n8++l#9`vltf%51AA!?;*Syik(|_%N$z*ZCAFU*2HF~0| zdS)Uk#`4SJTP4IdDSS=%iUk4?fB*y_009U<00Izz00bZaf%z497+hM9#FxcO3E38J z;CjZH!%iJpl&tV}N#P%auUH@e0SG_<0uX=z1Rwwb2tWV=5SVX)2fp>VG=2jx8vX}| z^Z!>Qd^O(LS=h>LV*Y=A zdk1O`0SG_<0uX=z1Rwwb2tWV=5SSA9 Date: Fri, 14 Mar 2025 15:43:37 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E5=BA=8F=E5=88=97=E5=8C=96/Data/Query?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E8=BF=94=E5=9B=9E=E7=9A=84=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers/datamgr/route.rs | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index d9ddeca..dd0c6b8 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -115,7 +115,30 @@ pub async fn query_data_handler( datamgr_api::FreeMem(data); let body_str = String::from_utf8_lossy(&result).to_string(); - Ok(body_str.into()) + let json_array: Vec<_> = body_str + .trim() + .lines() + .filter_map(|line| { + let parts: Vec<&str> = line.split('|').collect(); + if parts.len() < 9 { + return None; // 跳过无效行 + } + + Some(json!({ + "dataName": parts[0], + "node": parts[1], + "dataType": parts[2], + "dataOwner": parts[3], + "dataSize": parts[4].parse::().unwrap_or(0), + "available": parts[5] == "1", + "operationType": parts[6], + "operationDetail": parts[7], + "operationTime": parts[8], + })) + }) + .collect(); + + Ok(json!(json_array)) } } -- Gitee From 8cba00957e81ada45aff61fdb38e230ba0268dee Mon Sep 17 00:00:00 2001 From: Super User Date: Mon, 17 Mar 2025 11:46:44 +0800 Subject: [PATCH 9/9] =?UTF-8?q?1.define=5Fjson=5Fstream=5Fresponse=5Fmetho?= =?UTF-8?q?d=E5=AE=8F=E6=B7=BB=E5=8A=A0body=5Funrequired=E6=A0=87=E5=BF=97?= =?UTF-8?q?2.=E5=AE=8C=E5=96=84Subscribe=20handler=EF=BC=8C=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E9=95=BF=E8=BF=9E=E6=8E=A5=E6=8E=A5=E6=94=B6topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/handlers/datamgr/route.rs | 99 ++++++++++++++--------------- src/cores/servers/actix_web/mod.rs | 55 ++++++++++++++-- 2 files changed, 99 insertions(+), 55 deletions(-) diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index dd0c6b8..12009da 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -1,10 +1,12 @@ use crate::cores::handlers::datamgr::datamgr_api; -use crate::cores::models::{ServerRawResponse, ServerRequest, ServerResponse}; +use crate::cores::models::ServerRequest; use std::ffi::{CStr, CString}; use std::os::raw::c_char; use std::ptr::null_mut; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use serde_json::{json, Value}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use crate::cores::state::AppState; use crate::models::ServerResult; use crate::ServerError; @@ -709,28 +711,21 @@ pub async fn order_result_handler( pub async fn publish_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult { let headers = server_request.headers; let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let header_cloud_topic = match headers.get("x-cloud-topic") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-topic header"), + None => return Err(ServerError::bad_request("Missing x-cloud-topic header")), }; let cloud_topic = match CString::new(header_cloud_topic) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud topic"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud topic")), }; unsafe { @@ -741,15 +736,10 @@ pub async fn publish_handler( data_content.as_ptr() as *const i8, ); if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Publish data success")) - .build() - .into() + let body_str = String::from_utf8_lossy(&data_content).to_string(); + Ok(body_str.into()) } else { - ServerRawResponse::internal_error() - .body(Vec::from("Publish data failed")) - .build() - .into() + Err(ServerError::internal_error("Publish data failed")) } } } @@ -759,60 +749,67 @@ extern "C" fn rust_callback( uuid: *const ::std::os::raw::c_char, size: ::std::os::raw::c_int, data: *const ::std::os::raw::c_char, - _closure: *mut ::std::os::raw::c_void, + closure: *mut ::std::os::raw::c_void, ) { - unsafe { - let topic_str = CStr::from_ptr(topic).to_str().unwrap(); - let uuid_str = CStr::from_ptr(uuid).to_str().unwrap(); - let data_str = CStr::from_ptr(data).to_str().unwrap(); - println!( - "Receive message in Rust: topic = {}, uuid={}, size = {}, data = {}", - topic_str, uuid_str, size as i32, data_str - ); + if closure.is_null() { + println!("Closure is null, cannot send data."); + return; + } + + let sender_arc = unsafe { &*(closure as *mut Arc>>) }; + + let topic_str = unsafe { CStr::from_ptr(topic).to_string_lossy().to_string() }; + let uuid_str = unsafe { CStr::from_ptr(uuid).to_string_lossy().to_string() }; + let data_str = unsafe { CStr::from_ptr(data).to_string_lossy().to_string() }; + + let json_data = serde_json::json!({ + "topic": topic_str, + "uuid": uuid_str, + "size": size, + "data": data_str + }); + + let sender = sender_arc.lock().unwrap(); + if let Err(err) = sender.try_send(json_data) { + println!("Failed to send data to channel: {:?}", err); } } pub async fn subscribe_handler( _: Arc, server_request: ServerRequest, -) -> ServerResponse { +) -> ServerResult> { let headers = server_request.headers; - let bad_request = |body_str| { - ServerRawResponse::bad_request() - .body(Vec::from(body_str)) - .build() - .into() - }; - let header_cloud_topic = match headers.get("x-cloud-topic") { Some(value) => value.to_string(), - None => return bad_request("Missing x-cloud-topic header"), + None => return Err(ServerError::bad_request("Missing x-cloud-topic header")), }; let cloud_topic = match CString::new(header_cloud_topic) { Ok(cstr) => cstr, - Err(_) => return bad_request("Invalid cloud topic"), + Err(_) => return Err(ServerError::bad_request("Invalid cloud topic")), }; - unsafe { + let (tx, rx) = mpsc::channel(10); + + let sender_arc = Arc::new(Mutex::new(tx)); + let closure_ptr = Box::into_raw(Box::new(sender_arc.clone())) as *mut ::std::os::raw::c_void; + + unsafe { let ret = datamgr_api::Subscribe( DATA_PLUGIN_MANAGER, cloud_topic.as_ptr(), Some(rust_callback), - null_mut() as *mut ::std::os::raw::c_void, + closure_ptr, // 传递 closure ); - if ret == 1 { - ServerRawResponse::ok() - .body(Vec::from("Subscribe data success")) - .build() - .into() - } else { - ServerRawResponse::internal_error() - .body(Vec::from("Subscribe data failed")) - .build() - .into() + + if ret != 1 { + drop(Box::from_raw(closure_ptr as *mut Arc>>)); + return Err(ServerError::internal_error("Subscribe data failed")); } } + + Ok(ReceiverStream::new(rx)) } \ No newline at end of file diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 38683c6..418f4e4 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -239,7 +239,7 @@ macro_rules! define_raw_response_method { } macro_rules! define_json_stream_response_method { - ($name:ident, $method:ident, $route:expr, $path_type:ty, $query_type:ty, $router_key:path, $tag:expr) => { + (body_required $name:ident, $method:ident, $route:expr, $path_type:ty, $query_type:ty, $router_key:path, $tag:expr) => { #[api_v2_operation(tags($tag))] #[$method($route)] pub async fn $name( @@ -252,7 +252,6 @@ macro_rules! define_json_stream_response_method { let params = extract_params(params.into_inner(), query.into_inner()); let headers = headers::into_map(req.headers().clone()); let req = construct_request!(with_json_body params, headers, data); - let res: ServerResponse = call_route!(app_state, req, $router_key); if res.is_json_stream() { let json_stream_response: ServerJsonStreamResponse = res.into_json_stream().unwrap(); @@ -285,6 +284,50 @@ macro_rules! define_json_stream_response_method { } }; + + (body_unrequired $name:ident, $method:ident, $route:expr, $path_type:ty, $query_type:ty, $router_key:path, $tag:expr) => { + #[api_v2_operation(tags($tag))] + #[$method($route)] + pub async fn $name( + app_state: Data, + req: HttpRequest, + params: Path<$path_type>, + query: Query<$query_type>, + ) -> HttpResponse { + let params = extract_params(params.into_inner(), query.into_inner()); + let headers = headers::into_map(req.headers().clone()); + let req = construct_request!(without_body params, headers); + let res: ServerResponse = call_route!(app_state, req, $router_key); + if res.is_json_stream() { + let json_stream_response: ServerJsonStreamResponse = res.into_json_stream().unwrap(); + let stream: ReceiverStream = json_stream_response.stream.unwrap(); + // 将 ReceiverStream 转换为 Stream> + let sse_stream = stream.map(|value| { + // 序列化 Value 为 JSON 字符串 + match serde_json::to_string(&value) { + Ok(json) => { + // 格式化为 SSE 的 data 字段,注意末尾的两个换行符 + let data = format!("data: {}\n\n", json); + Ok(Bytes::from(data)) + } + Err(e) => { + // 将序列化错误转换为 actix_web::Error + Err(Error::from(e)) + } + } + }); + + HttpResponse::Ok() + .content_type("text/event-stream") + // 设置正确的 Content-Type 和缓存头 + .append_header((header::CACHE_CONTROL, "no-cache")) + .streaming(sse_stream) + } else { + let json_error_response = res.into_json().unwrap(); + HttpResponse::Ok().json(json_error_response) + } + } + }; } pub mod api_server { @@ -356,6 +399,7 @@ pub mod api_server { // 请求的URL:/resource/watch/{version}/{plural}/{name} // 返回值:Node、EVent、Pod等资源对应的JSON的长链接 define_json_stream_response_method!( + body_required watch_one, get, "/watch/{version}/{plural}/{name}", @@ -369,6 +413,7 @@ pub mod api_server { // 请求的URL:/resource/watch/{version}/{plural} // 返回值:Node、EVent、Pod等资源对应的JSON的长链接 define_json_stream_response_method!( + body_required watch_all, get, "/watch/{version}/{plural}", @@ -599,7 +644,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_response_method!( + body_required publish, post, "/Publish", @@ -609,7 +655,8 @@ pub mod datamgr { "DataMgr" ); - define_raw_response_method!( + define_json_stream_response_method!( + body_unrequired subscribe, get, "/Subscribe", -- Gitee