diff --git a/Cargo.toml b/Cargo.toml index 4996298805d0730b4b9de47b50bb9d5b05e695e9..7e52e630dba9721c69d6523928699bfac7559cdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,17 +12,16 @@ readme = "README.md" [[example]] name = "basic_example" -required-features = ["models", "eventbus", "servers", "test"] +required-features = ["eventbus", "servers", "test"] #[[example]] #name = "api_generator" #required-features = [] [features] -default = ["models", "eventbus", "servers", "messaging"] -models = [] +default = ["eventbus", "servers", "messaging"] eventbus = ["feventbus"] -servers = ["utils", "eventbus", "messaging", "models", "diesel", "diesel_migrations", "actix-http", "actix-web"] +servers = ["utils", "eventbus", "messaging", "diesel", "diesel_migrations", "actix-http", "actix-web"] utils = [] test = ["utils", "eventbus"] messaging = ["eventbus"] diff --git a/src/cores/daemons/messaging.rs b/src/cores/daemons/messaging.rs index 907a3cdf89c1398ecf145cd51a1e8a60b818267a..75c7b17e770b4eecdc49037de5cdc2bcc92c7f8c 100644 --- a/src/cores/daemons/messaging.rs +++ b/src/cores/daemons/messaging.rs @@ -1,18 +1,18 @@ -use crate::cores::models::ServerResult; use feventbus::impls::messaging::messaging::Messaging; use feventbus::message::Message; use feventbus::message::NativeEventAction; use feventbus::traits::consumer::Consumer; use feventbus::traits::producer::Producer; +use fleetmodv2::api_server::{ + EventTopic, PubSubEventTopic, ServerResult, WatchEventMessage, WatchEventMessageValue, +}; +use fleetmodv2::resources::resource::{ResourceCommon, ResourceIdentifier}; use serde_json::Value; use std::sync::Arc; use std::time; -use serde::de::DeserializeOwned; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, Mutex}; use tokio::time::sleep; -use fleetmodv2::resources::resource::{Resource, ResourceCommon, ResourceIdentifier}; -use crate::models::messaging::*; pub struct WatchDaemon { pub msg_cli: Arc, @@ -164,11 +164,13 @@ impl WatchDaemon { }; // cluster_id 必须做匹配 - if resource_common.metadata + if resource_common + .metadata .as_ref() .and_then(|m| m.cluster_id.as_ref()) .and_then(|c| Some(c.ne(ri.cluster_id.as_str()))) - .is_some_and(|ne| ne) { + .is_some_and(|ne| ne) + { return false; } // api_version, kind必须匹配 @@ -180,9 +182,12 @@ impl WatchDaemon { } // 如果name字段不为空,那么name字段必须匹配 if let Some(name) = &ri.name { - if resource_common.metadata + if resource_common + .metadata .and_then(|m| Some(m.name)) - .as_deref() != Some(name.as_str()) { + .as_deref() + != Some(name.as_str()) + { return false; } } @@ -254,64 +259,3 @@ pub async fn watch_raw( }); Ok(rx) } - -#[allow(unused)] -pub async fn watch_value( - msg_cli: Arc, - ri: ResourceIdentifier, -) -> ServerResult> { - let mut rx_from = watch_raw(msg_cli, ri).await?; - let (sx, rx) = mpsc::channel(32); - tokio::spawn(async move { - while let Some(value) = rx_from.recv().await { - log::debug!("watch resource received value: {:?}", value); - let message_value: WatchEventMessageValue = match serde_json::from_value(value) { - Ok(v) => v, - Err(e) => { - log::error!("Failed to convert WatchEventMessageValue: {}", e); - continue; - } - }; - if let Err(e) = sx.send(message_value).await { - log::error!("Failed to send WatchEventMessageResource: {}", e); - } - } - }); - Ok(rx) -} - -#[allow(unused)] -pub async fn watch_resource( - msg_cli: Arc, - ri: ResourceIdentifier, -) -> ServerResult>> -where - T: Resource + DeserializeOwned + 'static, -{ - let mut rx_from = watch_raw(msg_cli, ri).await?; - let (sx, rx) = mpsc::channel(32); - tokio::spawn(async move { - while let Some(value) = rx_from.recv().await { - log::debug!("watch resource received value: {:?}", value); - let message_value: WatchEventMessageValue = match serde_json::from_value(value) { - Ok(v) => v, - Err(e) => { - log::error!("Failed to convert WatchEventMessageValue: {}", e); - continue; - } - }; - log::debug!("watch resource received WatchEventMessageValue: {:?}", message_value); - let resource = match WatchEventMessageResource::::try_from(message_value) { - Ok(r) => r, - Err(e) => { - log::error!("Failed to convert WatchEventMessageResource: {}", e); - continue; - } - }; - if let Err(e) = sx.send(resource).await { - log::error!("Failed to send WatchEventMessageResource: {}", e); - } - } - }); - Ok(rx) -} diff --git a/src/cores/handlers/api_server.rs b/src/cores/handlers/api_server.rs index f86b02ad8c99a985dff2e2dbdf507499f4bc2e5f..6c31697ece65edac1295d0df86b2841f34a728f7 100644 --- a/src/cores/handlers/api_server.rs +++ b/src/cores/handlers/api_server.rs @@ -1,26 +1,23 @@ -use crate::cores::daemons::messaging::{ - watch_raw, WatchDaemon, -}; -use crate::cores::models::{ResourcesParams, ServerError, ServerRequest, ServerResult}; +use crate::cores::daemons::messaging::{watch_raw, WatchDaemon}; +use crate::cores::state::AppState; use crate::db::check_exist::check_kine; use crate::db::db::DbConnection; use crate::db::delete::delete_from_kine; use crate::db::get::{get_all_data_from_kine, get_data_from_kine}; use crate::db::insert::insert_kine; use crate::db::update::update_data_in_kine; +use fleetmodv2::api_server::{ + PubSubEventTopic, ResourcesParams, ServerError, ServerRequest, ServerResult, +}; +use fleetmodv2::resources::resource::{ResourceIdentifier, ResourceMeta}; use json_patch::Patch; use serde_json::{json, Value}; use std::cell::RefCell; use std::sync::Arc; use tokio_stream::wrappers::ReceiverStream; -use fleetmodv2::resources::resource::{ResourceIdentifier, ResourceMeta}; -use crate::cores::state::AppState; -use crate::models::messaging::PubSubEventTopic; -impl From for ServerError { - fn from(error: diesel::result::Error) -> Self { - Self::internal_error(error.to_string().as_str()) - } +fn map_diesel_err(error: diesel::result::Error) -> ServerError { + ServerError::internal_error(error.to_string().as_str()) } pub async fn create_resource( @@ -87,7 +84,8 @@ pub async fn create_resource( version.as_str(), Some("default"), ) - .await?; + .await + .map_err(map_diesel_err)?; if kine_exists { return Err(ServerError::duplicated("该资源已存在,请勿重复创建")); @@ -101,7 +99,8 @@ pub async fn create_resource( version.as_str(), Some("default"), ) - .await?; + .await + .map_err(map_diesel_err)?; } message_daemon.publish_create_event(body.clone()).await; Ok(body.clone()) @@ -138,7 +137,8 @@ pub async fn delete_resource( version.as_str(), Some("default"), ) - .await?; + .await + .map_err(map_diesel_err)?; if resource_json_string.is_none() { return Err(ServerError::not_found("指定资源不存在")); } @@ -155,7 +155,8 @@ pub async fn delete_resource( version.as_str(), Some("default"), ) - .await?; + .await + .map_err(map_diesel_err)?; if !deleted { return Err(ServerError::internal_error("指定资源不存在")); @@ -198,7 +199,8 @@ pub async fn update_resource( version.as_str(), Some("default"), ) - .await?; + .await + .map_err(map_diesel_err)?; if !kine_exists { return Err(ServerError::internal_error("指定资源不存在")); @@ -213,7 +215,8 @@ pub async fn update_resource( Some("default"), &body, ) - .await?; + .await + .map_err(map_diesel_err)?; if !updated { return Err(ServerError::internal_error("更新资源失败")); @@ -245,7 +248,12 @@ pub async fn get_resource( if name.is_some() { // 如果name不为空,查询单个resource数据 - log::debug!("get data from kine: {} {} {}", plural, name.as_ref().unwrap(), version); + log::debug!( + "get data from kine: {} {} {}", + plural, + name.as_ref().unwrap(), + version + ); if let Some(data) = get_data_from_kine( db_conn.get_mut(), plural.as_str(), @@ -253,7 +261,8 @@ pub async fn get_resource( version.as_str(), Some("default"), ) - .await? + .await + .map_err(map_diesel_err)? { // 将字符串解析为 JSON 格式 return match serde_json::from_str::(&data) { @@ -271,7 +280,8 @@ pub async fn get_resource( version.as_str(), Some("default"), ) - .await?; + .await + .map_err(map_diesel_err)?; // 将所有字符串解析为 JSON 格式并收集到数组中 let json_array: Vec = data_list @@ -312,7 +322,8 @@ pub async fn patch_resource( version.as_str(), Some("default"), ) - .await?; + .await + .map_err(map_diesel_err)?; if curr_resource.is_none() { return Err(ServerError::not_found("指定资源不存在")); } @@ -338,7 +349,8 @@ pub async fn patch_resource( Some("default"), &curr_resource, ) - .await?; + .await + .map_err(map_diesel_err)?; if !updated { return Err(ServerError::internal_error("在Patch时更新资源失败")); } @@ -410,17 +422,19 @@ fn prepare_ctx( return Err(ServerError::bad_request("kind/plural参数未指定")); } if params.plural.is_none() { - let meta = ResourceMeta::from_kind(params.kind.as_ref().unwrap().as_str()).map_err(|e| { - log::error!("error getting kind from plural: {}", e); - ServerError::bad_request("kind参数不正确") - })?; + let meta = + ResourceMeta::from_kind(params.kind.as_ref().unwrap().as_str()).map_err(|e| { + log::error!("error getting kind from plural: {}", e); + ServerError::bad_request("kind参数不正确") + })?; params.plural = Some(meta.plural); } if params.kind.is_none() { - let meta = ResourceMeta::from_plural(params.plural.as_ref().unwrap().as_str()).map_err(|e| { - log::error!("error getting plural from kind: {}", e); - ServerError::bad_request("plural参数不正确") - })?; + let meta = + ResourceMeta::from_plural(params.plural.as_ref().unwrap().as_str()).map_err(|e| { + log::error!("error getting plural from kind: {}", e); + ServerError::bad_request("plural参数不正确") + })?; params.kind = Some(meta.kind); } let db_conn = app_state.db_pool.get_connection(); @@ -444,7 +458,7 @@ async fn check_metadata_exists( api_version_str: &str, ) -> ServerResult<()> { // let metadata_exists = check_metadata(db_conn, plural, api_version_str).await?; - // + // // if !metadata_exists { // return Err(ServerError::not_found( // "该plural不存在,请检查plural版本以及是否需要namespace", diff --git a/src/cores/handlers/cluster_info.rs b/src/cores/handlers/cluster_info.rs index 31eaed1163a3f9142d3f25a68078e6a99b962176..d5f3bd61d24c69b41db9ac699a3d179bb957b1f8 100644 --- a/src/cores/handlers/cluster_info.rs +++ b/src/cores/handlers/cluster_info.rs @@ -1,10 +1,9 @@ -use std::ffi::CStr; -use crate::cores::models::{ServerRequest, ServerResult}; -use serde_json::{json, Value}; -use std::sync::Arc; use crate::cores::handlers::datamgr::datamgr_api::{GetNodeId, GetUdpPort}; use crate::cores::state::AppState; -use crate::ServerError; +use fleetmodv2::api_server::{ServerError, ServerRequest, ServerResult}; +use serde_json::{json, Value}; +use std::ffi::CStr; +use std::sync::Arc; pub async fn get_cluster_info(app_state: Arc, _: ServerRequest) -> ServerResult { Ok(json!({ @@ -13,29 +12,30 @@ pub async fn get_cluster_info(app_state: Arc, _: ServerRequest) -> Ser } pub async fn get_eventbus_info(app_state: Arc, _: ServerRequest) -> ServerResult { - let message_cli = app_state.message_cli.clone(); let cluster_plugin_manager = message_cli.get_plugin_manager().unwrap(); - let node_id_ptr = unsafe { GetNodeId(cluster_plugin_manager) }; + let node_id_ptr = unsafe { GetNodeId(cluster_plugin_manager) }; if node_id_ptr.is_null() { - return Err(ServerError::internal_error("GetNodeId returned a null pointer")) + return Err(ServerError::internal_error( + "GetNodeId returned a null pointer", + )); } let node_id = unsafe { CStr::from_ptr(node_id_ptr) } .to_string_lossy() .into_owned(); - let udp_port = unsafe { GetUdpPort(cluster_plugin_manager) }; + let udp_port = unsafe { GetUdpPort(cluster_plugin_manager) }; if udp_port == 0 { - return Err(ServerError::internal_error("GetUdpPort error")) + return Err(ServerError::internal_error("GetUdpPort error")); } Ok(json!({ "node_id": node_id, "udp_port": udp_port.to_string() })) -} \ No newline at end of file +} diff --git a/src/cores/handlers/datamgr/route.rs b/src/cores/handlers/datamgr/route.rs index 11ef65e6f2e411f2fedc35be4f1b4e032d4dba09..5fa8445677801ef90564da92393000d36cd5286a 100644 --- a/src/cores/handlers/datamgr/route.rs +++ b/src/cores/handlers/datamgr/route.rs @@ -1,20 +1,20 @@ -use std::collections::HashMap; use crate::cores::handlers::datamgr::datamgr_api; -use crate::cores::models::ServerRequest; +use crate::cores::state::AppState; +use base64::engine::general_purpose::STANDARD; +use base64::Engine; +use fleetmodv2::api_server::{ + ServerError, ServerRawResponse, ServerRequest, ServerResponse, ServerResult, +}; +use once_cell::sync::Lazy; +use serde_json::{json, Value}; +use std::collections::HashMap; use std::ffi::{CStr, CString}; use std::os::raw::c_char; use std::ptr::null_mut; use std::slice; use std::sync::{Arc, Mutex}; -use once_cell::sync::Lazy; -use serde_json::{json, Value}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use crate::cores::state::AppState; -use crate::models::ServerResult; -use crate::{ServerError, ServerRawResponse, ServerResponse}; -use base64::Engine; -use base64::engine::general_purpose::STANDARD; pub static mut DATA_PLUGIN_MANAGER: *mut std::os::raw::c_void = null_mut(); @@ -142,12 +142,10 @@ pub async fn upload_data_handler( } } - pub async fn query_data_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let headers = server_request.headers; let params_data_type = server_request.params.get("data_type").unwrap(); @@ -184,7 +182,7 @@ pub async fn query_data_handler( if data.is_null() { return Err(ServerError::internal_error("Received null pointer")); } - + let result = { let slice = std::slice::from_raw_parts(data as *const u8, ret as usize); slice.to_vec() @@ -224,7 +222,6 @@ pub async fn download_data_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let headers = server_request.headers; let params_data_type = server_request.params.get("data_type").unwrap(); @@ -264,7 +261,7 @@ pub async fn download_data_handler( cloud_from.as_ptr(), &mut data, ); - + if ret <= 0 { return Err(ServerError::internal_error("DownloadData failed")); } @@ -289,7 +286,6 @@ pub async fn receive_telemetry_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let headers = server_request.headers; let header_cloud_from = match headers.get("x-cloud-from") { @@ -303,10 +299,7 @@ pub async fn receive_telemetry_handler( }; unsafe { - let ret = datamgr_api::ReceiveTelemetry( - DATA_PLUGIN_MANAGER, - cloud_from.as_ptr(), - ); + let ret = datamgr_api::ReceiveTelemetry(DATA_PLUGIN_MANAGER, cloud_from.as_ptr()); if ret == 1 { Ok("Receive telemetry success".into()) } else { @@ -319,7 +312,6 @@ pub async fn report_telemetry_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let headers = server_request.headers; let body = server_request.body.as_ref().unwrap().as_slice(); @@ -355,7 +347,6 @@ pub async fn send_remote_control_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let headers = server_request.headers; let body = server_request.body.as_ref().unwrap().as_slice(); @@ -391,7 +382,6 @@ pub async fn sync_data_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let headers = server_request.headers; let params_data_type = server_request.params.get("data_type").unwrap(); @@ -451,7 +441,6 @@ pub async fn backup_data_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let headers = server_request.headers; let params_data_type = server_request.params.get("data_type").unwrap(); @@ -522,54 +511,53 @@ pub async fn recover_data_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let headers = server_request.headers; let params_data_type = server_request.params.get("data_type").unwrap(); let data_type = match CString::new(params_data_type.as_str()) { Ok(cstr) => cstr, - Err(_) => return Err(ServerError::bad_request("Invalid data type")), + Err(_) => return Err(ServerError::bad_request("Invalid data type")), }; let header_cloud_name = match headers.get("x-cloud-name") { Some(value) => value.to_string(), - None => return Err(ServerError::bad_request("Missing x-cloud-name header")), + None => return Err(ServerError::bad_request("Missing x-cloud-name header")), }; let cloud_name = match CString::new(header_cloud_name) { Ok(cstr) => cstr, - Err(_) => return Err(ServerError::bad_request("Invalid cloud name")), + Err(_) => return Err(ServerError::bad_request("Invalid cloud name")), }; let header_cloud_from = match headers.get("x-cloud-from") { Some(value) => value.to_string(), - None => return Err(ServerError::bad_request("Missing x-cloud-from header")), + None => return Err(ServerError::bad_request("Missing x-cloud-from header")), }; let cloud_from = match CString::new(header_cloud_from) { Ok(cstr) => cstr, - Err(_) => return Err(ServerError::bad_request("Invalid cloud from")), + Err(_) => return Err(ServerError::bad_request("Invalid cloud from")), }; let header_cloud_version = match headers.get("x-cloud-version") { Some(value) => value.to_string(), - None => return Err(ServerError::bad_request("Missing x-cloud-version header")), + None => return Err(ServerError::bad_request("Missing x-cloud-version header")), }; let cloud_version = match CString::new(header_cloud_version) { Ok(cstr) => cstr, - Err(_) => return Err(ServerError::bad_request("Invalid cloud version")), + Err(_) => return Err(ServerError::bad_request("Invalid cloud version")), }; let header_cloud_node = match headers.get("x-cloud-node") { Some(value) => value.to_string(), - None => return Err(ServerError::bad_request("Missing x-cloud-node header")), + None => return Err(ServerError::bad_request("Missing x-cloud-node header")), }; let cloud_node = match CString::new(header_cloud_node) { Ok(cstr) => cstr, - Err(_) => return Err(ServerError::bad_request("Invalid cloud node")), + Err(_) => return Err(ServerError::bad_request("Invalid cloud node")), }; unsafe { @@ -593,7 +581,6 @@ pub async fn observation_order_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); @@ -628,7 +615,6 @@ pub async fn processing_order_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); @@ -663,7 +649,6 @@ pub async fn dispatching_order_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let body = server_request.body.as_ref().unwrap().as_slice(); let data_content = body.to_vec(); @@ -698,7 +683,6 @@ pub async fn order_status_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let params_uuid = server_request.params.get("uuid").unwrap(); if params_uuid.len() != 36 { @@ -713,11 +697,7 @@ pub async fn order_status_handler( unsafe { let mut data: *mut c_char = std::ptr::null_mut(); - let ret = datamgr_api::OrderStatus( - DATA_PLUGIN_MANAGER, - uuid.as_ptr(), - &mut data, - ); + let ret = datamgr_api::OrderStatus(DATA_PLUGIN_MANAGER, uuid.as_ptr(), &mut data); if ret <= 0 { return Err(ServerError::internal_error("Get order status failed")); @@ -743,7 +723,6 @@ pub async fn order_result_handler( _: Arc, server_request: ServerRequest, ) -> ServerResult { - let params_uuid = server_request.params.get("uuid").unwrap(); if params_uuid.len() != 36 { @@ -758,11 +737,7 @@ pub async fn order_result_handler( unsafe { let mut data: *mut c_char = std::ptr::null_mut(); - let ret = datamgr_api::OrderResult( - DATA_PLUGIN_MANAGER, - uuid.as_ptr(), - &mut data, - ); + let ret = datamgr_api::OrderResult(DATA_PLUGIN_MANAGER, uuid.as_ptr(), &mut data); if ret <= 0 { return Err(ServerError::internal_error("Get order status failed")); @@ -890,11 +865,7 @@ pub async fn subscribe_handler( Ok(ReceiverStream::new(rx)) }*/ -pub async fn publish_handler( - _: Arc, - server_request: ServerRequest, -) -> ServerResponse { - +pub async fn publish_handler(_: Arc, server_request: ServerRequest) -> ServerResponse { let headers = server_request.headers; let body = server_request.body.as_ref().unwrap().as_slice(); @@ -952,7 +923,7 @@ extern "C" fn rust_callback( let data_bytes = unsafe { slice::from_raw_parts(data as *const u8, size as usize) }; let data_str = match std::str::from_utf8(data_bytes) { - Ok(s) => s.to_string(), //UTF-8直接转换 + Ok(s) => s.to_string(), //UTF-8直接转换 Err(_) => STANDARD.encode(data_bytes), //非UTF-8数据转换为Base64 }; diff --git a/src/cores/handlers/test.rs b/src/cores/handlers/test.rs index 5be29b2464a1faaf962d819a218ed5d591890a30..784a5f3f67d00739f492752c5d4f18e0f85c57db 100644 --- a/src/cores/handlers/test.rs +++ b/src/cores/handlers/test.rs @@ -1,8 +1,7 @@ -use crate::cores::models::{ServerRequest, ServerResult}; +use crate::cores::state::AppState; +use fleetmodv2::api_server::{ServerRawResponse, ServerRequest, ServerResponse, ServerResult}; use serde_json::{json, Value}; use std::sync::Arc; -use crate::cores::state::AppState; -use crate::{ServerRawResponse, ServerResponse}; pub async fn test_json_route(_: Arc, _: ServerRequest) -> ServerResult { Ok(json!({ @@ -11,5 +10,8 @@ pub async fn test_json_route(_: Arc, _: ServerRequest) -> ServerResult } pub async fn test_raw_route(_: Arc, _: ServerRequest) -> ServerResponse { - ServerRawResponse::ok().body("test_raw_route is success".into()).build().into() -} \ No newline at end of file + ServerRawResponse::ok() + .body("test_raw_route is success".into()) + .build() + .into() +} diff --git a/src/cores/mod.rs b/src/cores/mod.rs index 5f4e60b6378ed0b0c585da5ef560ab0d6692660a..1bdff6afea66f600603655ddead1be2110b7d8da 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -4,8 +4,6 @@ use std::iter::zip; pub mod daemons; #[cfg(feature = "servers")] pub mod handlers; -#[cfg(feature = "models")] -pub mod models; #[cfg(any(feature = "servers"))] pub(crate) mod router; #[cfg(any(feature = "servers"))] @@ -77,8 +75,16 @@ pub async fn start_server(params: ServerStartParams) -> anyhow::Result<()> { // 启动各个server let messaging_server: Option> = Some(Box::new(servers::MessagingServer)); - let actix_web_addresses = vec![params.actix_web_tcp_address, params.actix_web_udp_address, params.actix_web_quic_address]; - let actix_web_new_server_fns = vec![servers::actix_web::tcp, servers::actix_web::udp, servers::actix_web::quic]; + let actix_web_addresses = vec![ + params.actix_web_tcp_address, + params.actix_web_udp_address, + params.actix_web_quic_address, + ]; + let actix_web_new_server_fns = vec![ + servers::actix_web::tcp, + servers::actix_web::udp, + servers::actix_web::quic, + ]; let actix_web_servers = zip(actix_web_addresses, actix_web_new_server_fns).filter_map( |(address, new_server_fn)| address.map(|address| new_server_fn(address.as_str())), ); diff --git a/src/cores/models/api_server.rs b/src/cores/models/api_server.rs deleted file mode 100644 index 65c035d0e1ec8757a5c2d6d0ba4213bff418f6f6..0000000000000000000000000000000000000000 --- a/src/cores/models/api_server.rs +++ /dev/null @@ -1,137 +0,0 @@ -use std::collections::HashMap; -use std::convert::{TryFrom, TryInto}; -use bon::Builder; -use serde::{Serialize, Deserialize}; -use serde_json::Value; -use fleetmodv2::resources::resource::{Resource, ResourceIdentifier}; -use super::shared::*; - -#[derive(Default, Clone, Debug, Serialize, Deserialize, Builder)] -#[builder(on(String, into))] -pub struct ResourcesParams { - #[builder(field)] - pub query: HashMap, - pub version: String, - pub plural: Option, - pub kind: Option, - pub cluster_id: Option, - pub name: Option, - pub body: Option, -} - -impl ResourcesParamsBuilder { - pub fn add_query(mut self, key: impl ToString, value: impl ToString) -> Self { - self.query.insert(key.to_string(), value.to_string()); - self - } - - pub fn query(mut self, query: HashMap) -> Self { - self.query.extend(query); - self - } -} - -impl From<&T> for ResourcesParams -where T: Resource { - fn from(resource: &T) -> Self { - let kind = resource.get_kind(); - let metadata = resource.get_metadata().clone().unwrap_or_default(); - let api_version = resource.get_api_version(); - ResourcesParams::builder() - .kind(kind) - .version(api_version) - .name(metadata.name.as_str()) - .maybe_cluster_id(metadata.cluster_id.clone()) - .body(serde_json::to_value(resource).expect("resource to json failed")) - .build() - } -} - -impl From for ResourcesParams { - fn from(value: ResourceIdentifier) -> Self { - ResourcesParams { - kind: Some(value.kind), - version: value.api_version.to_string(), - cluster_id: Some(value.cluster_id), - name: value.name, - ..Default::default() - } - } -} - -// impl TryFrom<&T> for ResourcesParams -// where T: Resource { -// type Error = ServerError; -// fn try_from(resource: &T) -> ServerResult { -// let kind = resource.get_kind().as_ref().ok_or(ServerError::internal_error("resource has not kind"))?; -// let metadata = resource.get_metadata().as_ref().ok_or(ServerError::internal_error("resource has not metadata"))?; -// let api_version = resource.get_api_version().clone().unwrap_or_default(); -// Ok(ResourcesParams::builder() -// .kind(kind.as_str()) -// .version(api_version.as_str()) -// .name(&metadata.name) -// .maybe_cluster_id(metadata.cluster_id.clone()) -// .body(serde_json::to_value(resource).expect("resource to json failed")) -// .build()) -// } -// } - -impl TryInto for ResourcesParams { - type Error = ServerError; - - fn try_into(self) -> ServerResult { - let mut params = HashMap::new(); - let version = self.version; - let name = self.name; - if self.plural.is_some() { - params.insert("plural".to_string(), self.plural.unwrap()); - } - if self.kind.is_some() { - params.insert("kind".to_string(), self.kind.unwrap()); - } - params.insert("version".to_string(), version); - if name.is_some() { - params.insert("name".to_string(), name.unwrap()); - } - Ok(ServerRequest { - headers: HashMap::new(), - params, - body: self - .body - .and_then(|b| Some(serde_json::to_vec(&b).expect("body to json failed"))), - }) - } -} - -impl TryFrom for ResourcesParams { - type Error = ServerError; - fn try_from(server_request: ServerRequest) -> ServerResult { - let extract_required_param = |key: &str| { - server_request - .params - .get(key) - .ok_or_else(|| ServerError::bad_request(format!("{}参数未指定", key).as_str())) - .cloned() - }; - let extract_optional_param = |key: &str| server_request.params.get(key).cloned(); - let version = extract_required_param("version")?; - let plural = extract_optional_param("plural"); - let kind = extract_optional_param("kind"); - let name = extract_optional_param("name"); - - let body = if server_request.body.is_none() { - None - } else { - server_request.body - }; - Ok(ResourcesParams::builder() - .maybe_plural(plural) - .maybe_kind(kind) - .version(version) - .maybe_name(name) - .maybe_body( - body.map(|b| serde_json::from_slice(b.as_slice()).expect("body to json failed")), - ) - .build()) - } -} \ No newline at end of file diff --git a/src/cores/models/messaging.rs b/src/cores/models/messaging.rs deleted file mode 100644 index aac0fa2910b8a6df75efbf938f53ca7abd3f6fc5..0000000000000000000000000000000000000000 --- a/src/cores/models/messaging.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::fmt::{Display, Formatter}; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use strum::EnumIter; -use crate::models::ServerResult; -use crate::ServerError; -use fleetmodv2::resources::resource::{ResourceIdentifier, Resource}; - -impl P2PEventTopic { - /// 返回带默认值的 P2PEventTopic 枚举 - pub fn create_p2p_topic(cluster_id: String) -> Vec { - vec![ - P2PEventTopic::Create(cluster_id.clone()), - P2PEventTopic::Update(cluster_id.clone()), - P2PEventTopic::Patch(cluster_id.clone()), - P2PEventTopic::Delete(cluster_id.clone()), - P2PEventTopic::List(cluster_id.clone()), - P2PEventTopic::Watch(cluster_id.clone()), - ] - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub enum EventTopic { - // P2P events表示其他组件向api server发送的点对点事件 - P2P(P2PEventTopic), - // PubSub events表示api server向其他组件发送的广播事件 - PubSub(PubSubEventTopic), -} - -impl Display for EventTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - EventTopic::P2P(topic) => write!(f, "P2P.{}", topic), - EventTopic::PubSub(topic) => write!(f, "PubSub.{}", topic), - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, EnumIter, PartialEq)] -pub enum P2PEventTopic { - Create(String), - Update(String), - Patch(String), - Delete(String), - List(String), - Watch(String), -} - -impl Display for P2PEventTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - P2PEventTopic::Create(s) => write!(f, "Create-{}", s), - P2PEventTopic::Update(s) => write!(f, "Update-{}", s), - P2PEventTopic::Patch(s) => write!(f, "Patch-{}", s), - P2PEventTopic::Delete(s) => write!(f, "Delete-{}", s), - P2PEventTopic::List(s) => write!(f, "List-{}", s), - P2PEventTopic::Watch(s) => write!(f, "Watch-{}", s), - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub enum PubSubEventTopic { - Watch(ResourceIdentifier), -} - -impl Display for PubSubEventTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - fn resource_identifier_to_topic_str(ri: &ResourceIdentifier) -> String { - let api_version_str = format!("{}", ri.api_version); - let kind_str = format!(".{}", ri.kind); - let cluster_id = format!(".{}", ri.cluster_id); - format!("{}{}{}", api_version_str, kind_str, cluster_id) - } - match self { - PubSubEventTopic::Watch(ri) => write!(f, "Watch.{}", resource_identifier_to_topic_str(&ri)), - } - } -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct WatchEventMessage { - pub values: Vec, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum WatchEventMessageValue { - Created(Value), - Updated(Value), - Deleted(Value), -} - -#[derive(Debug, Serialize, Clone)] -pub enum WatchEventMessageResource -where - T: Resource, -{ - Created(T), - Updated(T), - Deleted(T), -} - -impl TryFrom for WatchEventMessageResource -where - T: Resource, -{ - type Error = ServerError; - - fn try_from(value: WatchEventMessageValue) -> ServerResult { - match value { - WatchEventMessageValue::Created(v) => { - let r = serde_json::from_value(v)?; - Ok(WatchEventMessageResource::Created(r)) - } - WatchEventMessageValue::Updated(v) => { - let r = serde_json::from_value(v)?; - Ok(WatchEventMessageResource::Updated(r)) - } - WatchEventMessageValue::Deleted(v) => { - let r = serde_json::from_value(v)?; - Ok(WatchEventMessageResource::Deleted(r)) - } - } - } -} - -impl WatchEventMessageValue { - pub fn get_value(&self) -> &Value { - match self { - WatchEventMessageValue::Created(v) => v, - WatchEventMessageValue::Updated(v) => v, - WatchEventMessageValue::Deleted(v) => v, - } - } -} \ No newline at end of file diff --git a/src/cores/models/mod.rs b/src/cores/models/mod.rs deleted file mode 100644 index 486fb941578b46bb77823e843ac360f4bbd5701a..0000000000000000000000000000000000000000 --- a/src/cores/models/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod shared; -pub mod api_server; -pub mod messaging; - -pub use api_server::*; -pub use shared::*; \ No newline at end of file diff --git a/src/cores/models/shared.rs b/src/cores/models/shared.rs deleted file mode 100644 index 4b3e3c091ed649e755fad62f950c708273094cba..0000000000000000000000000000000000000000 --- a/src/cores/models/shared.rs +++ /dev/null @@ -1,219 +0,0 @@ -use bon::Builder; -use enum_as_inner::EnumAsInner; -use paperclip::actix::Apiv2Schema; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::collections::HashMap; -use std::error::Error; -use std::fmt::{Display, Formatter}; -use std::num::NonZeroU16; -use derive_more::From; -use serde::de::DeserializeOwned; -use tokio_stream::wrappers::ReceiverStream; - -#[derive(Debug, Clone)] -pub struct ServerRequest { - pub headers: HashMap, - pub params: HashMap, - pub body: Option>, -} - -#[derive(Debug, Serialize, Deserialize, EnumAsInner, From)] -pub enum ServerResponse { - Json(ServerJsonResponse), - Raw(ServerRawResponse), - JsonStream(ServerJsonStreamResponse), -} - -#[derive(Default, Debug, Serialize, Deserialize, Builder, Apiv2Schema)] -pub struct ServerJsonResponse { - pub status_code: ServerStatusCode, - #[builder(into)] - pub message: String, - pub data: Option, -} - -impl Into> for ServerJsonResponse -where T: DeserializeOwned { - fn into(self) -> ServerResult { - match self.status_code { - ServerStatusCode::OK => { - let obj: T = serde_json::from_value(self.data.unwrap_or_default()).map_err(|e| ServerError::internal_error(e.to_string().as_str()))?; - Ok(obj) - }, - _ => Err(ServerError::new(self.status_code, self.message.as_str())) - } - } -} - -#[derive(Debug, Serialize, Deserialize, Builder)] -pub struct ServerRawResponse { - #[builder(field)] - pub headers: HashMap, - #[builder(field)] - pub body: Option>, - pub status_code: NonZeroU16, - #[builder(into)] - pub content_type: Option, -} - -impl ServerRawResponseBuilder { - pub fn append_header(mut self, key: impl ToString, value: impl ToString) -> Self { - self.headers.insert(key.to_string(), value.to_string()); - self - } - - pub fn headers(mut self, headers: HashMap) -> Self { - self.headers.extend(headers); - self - } - - pub fn body(mut self, body: Vec) -> Self { - self.body = Some(body); - self.into() - } - - pub fn json(self, body: Value) -> Self { - self.body(serde_json::to_vec(&body).expect("serialize json failed")) - } -} - -impl Default for ServerRawResponse { - fn default() -> Self { - Self { - status_code: NonZeroU16::new(200).unwrap(), - headers: HashMap::new(), - body: None, - content_type: None, - } - } -} - -macro_rules! define_status_code_builder_for_raw { - ($name:ident, $code:expr) => { - pub fn $name() -> ServerRawResponseBuilder { - ServerRawResponse::builder().status_code(NonZeroU16::new($code).unwrap()) - } - }; -} - -impl ServerRawResponse { - define_status_code_builder_for_raw!(ok, 200); - define_status_code_builder_for_raw!(not_found, 404); - define_status_code_builder_for_raw!(bad_request, 400); - define_status_code_builder_for_raw!(internal_error, 500); -} - -#[derive(Debug, Builder, Serialize, Deserialize)] -pub struct ServerJsonStreamResponse { - #[serde(skip)] - pub stream: Option>, -} - -pub type ServerResult = Result; - -impl From> for ServerResponse { - fn from(result: ServerResult) -> Self { - match result { - Ok(data) => ServerJsonResponse::builder() - .status_code(ServerStatusCode::OK) - .message("success".to_string()) - .data(serde_json::to_value(data).expect("serialize data failed")) - .build() - .into(), - Err(err) => ServerJsonResponse::builder() - .status_code(err.status_code) - .message(err.message) - .build() - .into(), - } - } -} - -impl From>> for ServerResponse { - fn from(result: ServerResult>) -> Self { - match result { - Ok(stream) => ServerJsonStreamResponse::builder() - .stream(stream) - .build() - .into(), - Err(err) => ServerJsonResponse::builder() - .status_code(err.status_code) - .message(err.message) - .build() - .into(), - } - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Apiv2Schema)] -pub enum ServerStatusCode { - // 正常流程 - OK = 20000, - - // 内部错误流程 - InternalError = 50000, - - // 请求方错误流程 - NotFound = 40004, - BadRequest = 40000, - Duplicated = 40001, - Timeout = 40002, -} - -impl Default for ServerStatusCode { - fn default() -> Self { - ServerStatusCode::OK - } -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ServerError { - pub status_code: ServerStatusCode, - pub message: String, -} - -macro_rules! define_server_error_fn { - ($func:ident, $code:ident) => { - pub fn $func(message: &str) -> Self { - Self::new(ServerStatusCode::$code, message) - } - }; -} - -impl ServerError { - pub fn new(t: ServerStatusCode, msg: &str) -> Self { - Self { - status_code: t, - message: msg.to_string(), - } - } - define_server_error_fn!(internal_error, InternalError); - define_server_error_fn!(not_found, NotFound); - define_server_error_fn!(bad_request, BadRequest); - define_server_error_fn!(duplicated, Duplicated); - define_server_error_fn!(timeout, Timeout); -} - -impl Display for ServerError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - std::fmt::Debug::fmt(&self, f) - } -} - -impl Error for ServerError {} - -impl From for ServerError { - fn from(value: serde_json::Error) -> Self { - ServerError::internal_error(format!("serde error: {}", value).as_str()) - } -} - -impl Into for ServerError { - fn into(self) -> ServerJsonResponse { - ServerJsonResponse::builder() - .status_code(self.status_code) - .message(self.message) - .build() - } -} diff --git a/src/cores/router.rs b/src/cores/router.rs index 35370ae5d1566ea52912304954cbe413b62e37e5..37760712be450847ab723e3bfbe1c32b4b43130c 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -1,18 +1,18 @@ use crate::cores::handlers; -use crate::cores::models::{ServerRequest, ServerResponse}; +use crate::cores::state::AppState; +use fleetmodv2::api_server::{ServerRequest, ServerResponse}; use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use strum::Display; use tokio::sync::RwLock; -use crate::cores::state::AppState; #[derive(Debug, Clone, Eq, Hash, PartialEq, Display)] pub enum RouterKey { TestJsonResponse, TestRawResponse, - + // api server router key ResourceCreate, ResourceDelete, @@ -89,7 +89,7 @@ impl Router { let router = Router { routes: RwLock::new(HashMap::new()), }; - + // test routers { add_route!( @@ -156,7 +156,6 @@ impl Router { ); } - // data mgr routers { add_route!( diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index e6c4321a9d117de3ff5e94f2a26c1db957e7dc60..60fd552b6740b618462e6731ef434662aa6af578 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -4,19 +4,20 @@ mod udp; mod quic; mod utils; -use crate::cores::models::{ - ServerJsonResponse, ServerJsonStreamResponse, ServerRawResponse, ServerRequest, ServerResponse, -}; use crate::cores::router::RouterKey; +use crate::cores::servers::actix_web::quic::QuicImpl; use crate::cores::servers::actix_web::tcp::TCPImpl; use crate::cores::servers::actix_web::udp::UDPImpl; use crate::cores::servers::Server; use crate::cores::state::AppState; use crate::utils::headers; -use crate::ServerStatusCode; use actix_web::http::header; use actix_web::http::StatusCode; use actix_web::{App, Error, HttpRequest, HttpResponse}; +use fleetmodv2::api_server::{ + ServerJsonResponse, ServerJsonStreamResponse, ServerRawResponse, ServerRequest, ServerResponse, + ServerStatusCode, +}; use futures::StreamExt; use paperclip::actix::Apiv2Schema; use paperclip::actix::{ @@ -29,7 +30,6 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; use tokio_stream::wrappers::ReceiverStream; -use crate::cores::servers::actix_web::quic::QuicImpl; pub fn tcp(addr: &str) -> Box { Box::new(TCPImpl::new(addr)) @@ -429,9 +429,10 @@ pub mod cluster_info { use super::*; pub fn configure_routes(app: &mut web::ServiceConfig) { - app.service(web::scope("/cluster") - .service(get_cluster_info) - .service(get_eventbus_info) + app.service( + web::scope("/cluster") + .service(get_cluster_info) + .service(get_eventbus_info), ); } @@ -443,16 +444,20 @@ pub mod test { use super::*; pub fn configure_routes(app: &mut web::ServiceConfig) { - app.service( - web::scope("/test") - .service(test_json) - .service(test_raw), - ); + app.service(web::scope("/test").service(test_json).service(test_raw)); } define_json_response_method!(body_unrequired test_json, get, "/json", (), (), RouterKey::TestJsonResponse, "Test"); - - define_raw_response_method!(test_raw, get, "/raw", (), (), RouterKey::TestRawResponse, "Test"); + + define_raw_response_method!( + test_raw, + get, + "/raw", + (), + (), + RouterKey::TestRawResponse, + "Test" + ); } pub mod datamgr { @@ -471,7 +476,7 @@ pub mod datamgr { .service(recover_data) .service(upload_data) .service(query_data) - .service(download_data) + .service(download_data), ); app.service( web::scope("/Order") @@ -479,7 +484,7 @@ pub mod datamgr { .service(processing_order) .service(dispatching_order) .service(order_status) - .service(order_result) + .service(order_result), ); app.service(publish); app.service(subscribe); @@ -649,7 +654,7 @@ pub mod datamgr { ); define_json_response_method!( - body_unrequired + body_unrequired order_result, get, "/Result/{uuid}", diff --git a/src/cores/servers/message.rs b/src/cores/servers/message.rs index c3afd2d370b200e610455e37339f1fc787c73bf2..57a1932b920cda234159de514c9b91d01290605f 100644 --- a/src/cores/servers/message.rs +++ b/src/cores/servers/message.rs @@ -1,16 +1,17 @@ -use crate::cores::models::{ResourcesParams, ServerRequest, ServerResponse}; +use crate::cores::router::RouterKey; use crate::cores::servers::Server; +use crate::cores::state::AppState; use async_trait::async_trait; use feventbus::err::Error as FEventBusError; use feventbus::message::Message; use feventbus::traits::consumer::{Consumer, MessageHandler}; +use fleetmodv2::api_server::{ + EventTopic, P2PEventTopic, ResourcesParams, ServerRequest, ServerResponse, +}; use serde::de::DeserializeOwned; use serde_json::{Error as SerdeError, Value}; use std::collections::HashMap; use std::sync::Arc; -use crate::cores::router::RouterKey; -use crate::cores::state::AppState; -use crate::models::messaging::{EventTopic, P2PEventTopic}; pub struct MessagingServer; @@ -36,10 +37,10 @@ impl Server for MessagingServer { log::info!("Registering reply handler for topic {}", topic_str); if let Err(e) = msg_cli.reply(topic_str.as_str(), reply_handler).await { log::error!( - "Failed to register reply handler for topic {}: {}", - topic_str, - e - ); + "Failed to register reply handler for topic {}: {}", + topic_str, + e + ); } } tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; diff --git a/src/lib.rs b/src/lib.rs index 6e023692e3020880e60d0ae919144c508feeca31..99764aae8281ddb44f81213741ceadfd250b7666 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,9 +12,7 @@ pub mod schema; #[cfg(any(feature = "servers", feature = "utils"))] pub mod utils; -#[cfg(any(feature = "servers"))] -pub use cores::{prepare_app_state, start_server}; #[cfg(any(feature = "servers", feature = "messaging"))] pub use cores::daemons::messaging; -#[cfg(any(feature = "models", feature = "servers"))] -pub use cores::models::{self, ResourcesParams, ServerResponse, ServerRawResponse, ServerJsonStreamResponse, ServerJsonResponse, ServerError, ServerStatusCode}; \ No newline at end of file +#[cfg(any(feature = "servers"))] +pub use cores::{prepare_app_state, start_server};