From 3bd3167807769b74a6713dba649956ad73eaf8fd Mon Sep 17 00:00:00 2001 From: yzc1114 Date: Fri, 21 Feb 2025 13:10:12 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=BB=86=E7=B2=92=E5=BA=A6?= =?UTF-8?q?=E6=9D=A1=E4=BB=B6=E7=BC=96=E8=AF=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 13 ++- src/cores/daemons/messaging.rs | 136 +------------------------------ src/cores/handlers/api_server.rs | 9 +- src/cores/models/messaging.rs | 136 +++++++++++++++++++++++++++++++ src/cores/models/mod.rs | 1 + src/cores/models/shared.rs | 6 -- src/cores/servers/message.rs | 2 +- tests/server_tests.rs | 13 ++- 8 files changed, 158 insertions(+), 158 deletions(-) create mode 100644 src/cores/models/messaging.rs diff --git a/Cargo.toml b/Cargo.toml index 02dd9c4..3a7fd22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,24 +18,22 @@ required-features = ["models", "eventbus", "servers", "test"] default = ["models", "eventbus", "servers", "messaging"] models = [] eventbus = ["feventbus"] -servers = ["utils"] +servers = ["utils", "eventbus", "messaging", "models", "diesel", "diesel_migrations", "actix-http", "actix-web"] utils = [] test = ["utils", "eventbus"] messaging = ["eventbus"] [dependencies] -#feventbus = "0.3.0" -#feventbus = { path = "../eventbus", optional = true } feventbus = { git = "https://gitee.com/iscas-system/eventbus.git", optional = true } #fleetmod = { git = "https://gitee.com/iscas-system/fleetmod.git" } #fleetmod = { path = "../fleetmod" } #fleetmodv2 = { path = "../fleetmodv2" } fleetmodv2 = { git = "https://gitee.com/iscas-system/fleetmodv2.git" } r2d2 = "0.8.10" -diesel = { version = "2.2.0", features = ["sqlite", "postgres", "r2d2"] } -diesel_migrations = "2.2.0" -actix-http = "3.9.0" -actix-web = "4.9.0" +diesel = { version = "2.2.0", features = ["sqlite", "postgres", "r2d2"], optional = true } +diesel_migrations = { version = "2.2.0", optional = true } +actix-http = { version = "3.9.0", optional = true } +actix-web = { version = "4.9.0", optional = true } async-trait = "0.1.74" env_logger = "0.11.5" serde = { version = "1.0.209", features = ["derive"] } @@ -56,4 +54,3 @@ tokio-stream = "0.1.17" enum-as-inner = "0.6.1" bon = "3.3.2" derive_more = { version = "2.0.1", features = ["full"] } -serde_yaml = "0.9.34" diff --git a/src/cores/daemons/messaging.rs b/src/cores/daemons/messaging.rs index b1fa263..2eeed25 100644 --- a/src/cores/daemons/messaging.rs +++ b/src/cores/daemons/messaging.rs @@ -1,150 +1,18 @@ -use crate::cores::models::{ServerError, ServerResult}; +use crate::cores::models::ServerResult; use feventbus::impls::messaging::messaging::Messaging; use feventbus::message::Message; use feventbus::message::NativeEventAction; use feventbus::traits::consumer::Consumer; use feventbus::traits::producer::Producer; -use serde::Deserialize; -use serde::Serialize; use serde_json::Value; -use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; use std::time; use serde::de::DeserializeOwned; -use strum::EnumIter; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, Mutex}; use tokio::time::sleep; use fleetmodv2::resources::resource::{Resource, ResourceCommon, ResourceIdentifier}; - -impl P2PEventTopic { - /// 返回带默认值的 P2PEventTopic 枚举 - pub fn create_p2p_topic(cluster_id: String) -> Vec { - vec![ - P2PEventTopic::Create(cluster_id.clone()), - P2PEventTopic::Update(cluster_id.clone()), - P2PEventTopic::Patch(cluster_id.clone()), - P2PEventTopic::Delete(cluster_id.clone()), - P2PEventTopic::List(cluster_id.clone()), - P2PEventTopic::Watch(cluster_id.clone()), - ] - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub enum EventTopic { - // P2P events表示其他组件向api server发送的点对点事件 - P2P(P2PEventTopic), - // PubSub events表示api server向其他组件发送的广播事件 - PubSub(PubSubEventTopic), -} - -impl Display for EventTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - EventTopic::P2P(topic) => write!(f, "P2P.{}", topic), - EventTopic::PubSub(topic) => write!(f, "PubSub.{}", topic), - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, EnumIter, PartialEq)] -pub enum P2PEventTopic { - Create(String), - Update(String), - Patch(String), - Delete(String), - List(String), - Watch(String), -} - -impl Display for P2PEventTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - P2PEventTopic::Create(s) => write!(f, "Create-{}", s), - P2PEventTopic::Update(s) => write!(f, "Update-{}", s), - P2PEventTopic::Patch(s) => write!(f, "Patch-{}", s), - P2PEventTopic::Delete(s) => write!(f, "Delete-{}", s), - P2PEventTopic::List(s) => write!(f, "List-{}", s), - P2PEventTopic::Watch(s) => write!(f, "Watch-{}", s), - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub enum PubSubEventTopic { - Watch(ResourceIdentifier), -} - -impl Display for PubSubEventTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - fn resource_identifier_to_topic_str(ri: &ResourceIdentifier) -> String { - let api_version_str = format!("{}", ri.api_version); - let kind_str = format!(".{}", ri.kind); - let cluster_id = format!(".{}", ri.cluster_id); - format!("{}{}{}", api_version_str, kind_str, cluster_id) - } - match self { - PubSubEventTopic::Watch(ri) => write!(f, "Watch.{}", resource_identifier_to_topic_str(&ri)), - } - } -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct WatchEventMessage { - pub values: Vec, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum WatchEventMessageValue { - Created(Value), - Updated(Value), - Deleted(Value), -} - -#[derive(Debug, Serialize, Clone)] -pub enum WatchEventMessageResource -where - T: Resource, -{ - Created(T), - Updated(T), - Deleted(T), -} - -impl TryFrom for WatchEventMessageResource -where - T: Resource, -{ - type Error = ServerError; - - fn try_from(value: WatchEventMessageValue) -> ServerResult { - match value { - WatchEventMessageValue::Created(v) => { - let r = serde_json::from_value(v)?; - Ok(WatchEventMessageResource::Created(r)) - } - WatchEventMessageValue::Updated(v) => { - let r = serde_json::from_value(v)?; - Ok(WatchEventMessageResource::Updated(r)) - } - WatchEventMessageValue::Deleted(v) => { - let r = serde_json::from_value(v)?; - Ok(WatchEventMessageResource::Deleted(r)) - } - } - } -} - -impl WatchEventMessageValue { - pub fn get_value(&self) -> &Value { - match self { - WatchEventMessageValue::Created(v) => v, - WatchEventMessageValue::Updated(v) => v, - WatchEventMessageValue::Deleted(v) => v, - } - } -} +use crate::models::messaging::*; pub struct WatchDaemon { pub msg_cli: Arc, diff --git a/src/cores/handlers/api_server.rs b/src/cores/handlers/api_server.rs index 8bfa55d..f86b02a 100644 --- a/src/cores/handlers/api_server.rs +++ b/src/cores/handlers/api_server.rs @@ -1,5 +1,5 @@ use crate::cores::daemons::messaging::{ - watch_raw, PubSubEventTopic, WatchDaemon, + watch_raw, WatchDaemon, }; use crate::cores::models::{ResourcesParams, ServerError, ServerRequest, ServerResult}; use crate::db::check_exist::check_kine; @@ -15,6 +15,13 @@ use std::sync::Arc; use tokio_stream::wrappers::ReceiverStream; use fleetmodv2::resources::resource::{ResourceIdentifier, ResourceMeta}; use crate::cores::state::AppState; +use crate::models::messaging::PubSubEventTopic; + +impl From for ServerError { + fn from(error: diesel::result::Error) -> Self { + Self::internal_error(error.to_string().as_str()) + } +} pub async fn create_resource( app_state: Arc, diff --git a/src/cores/models/messaging.rs b/src/cores/models/messaging.rs new file mode 100644 index 0000000..aac0fa2 --- /dev/null +++ b/src/cores/models/messaging.rs @@ -0,0 +1,136 @@ +use std::fmt::{Display, Formatter}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use strum::EnumIter; +use crate::models::ServerResult; +use crate::ServerError; +use fleetmodv2::resources::resource::{ResourceIdentifier, Resource}; + +impl P2PEventTopic { + /// 返回带默认值的 P2PEventTopic 枚举 + pub fn create_p2p_topic(cluster_id: String) -> Vec { + vec![ + P2PEventTopic::Create(cluster_id.clone()), + P2PEventTopic::Update(cluster_id.clone()), + P2PEventTopic::Patch(cluster_id.clone()), + P2PEventTopic::Delete(cluster_id.clone()), + P2PEventTopic::List(cluster_id.clone()), + P2PEventTopic::Watch(cluster_id.clone()), + ] + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub enum EventTopic { + // P2P events表示其他组件向api server发送的点对点事件 + P2P(P2PEventTopic), + // PubSub events表示api server向其他组件发送的广播事件 + PubSub(PubSubEventTopic), +} + +impl Display for EventTopic { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + EventTopic::P2P(topic) => write!(f, "P2P.{}", topic), + EventTopic::PubSub(topic) => write!(f, "PubSub.{}", topic), + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, EnumIter, PartialEq)] +pub enum P2PEventTopic { + Create(String), + Update(String), + Patch(String), + Delete(String), + List(String), + Watch(String), +} + +impl Display for P2PEventTopic { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + P2PEventTopic::Create(s) => write!(f, "Create-{}", s), + P2PEventTopic::Update(s) => write!(f, "Update-{}", s), + P2PEventTopic::Patch(s) => write!(f, "Patch-{}", s), + P2PEventTopic::Delete(s) => write!(f, "Delete-{}", s), + P2PEventTopic::List(s) => write!(f, "List-{}", s), + P2PEventTopic::Watch(s) => write!(f, "Watch-{}", s), + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub enum PubSubEventTopic { + Watch(ResourceIdentifier), +} + +impl Display for PubSubEventTopic { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn resource_identifier_to_topic_str(ri: &ResourceIdentifier) -> String { + let api_version_str = format!("{}", ri.api_version); + let kind_str = format!(".{}", ri.kind); + let cluster_id = format!(".{}", ri.cluster_id); + format!("{}{}{}", api_version_str, kind_str, cluster_id) + } + match self { + PubSubEventTopic::Watch(ri) => write!(f, "Watch.{}", resource_identifier_to_topic_str(&ri)), + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct WatchEventMessage { + pub values: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum WatchEventMessageValue { + Created(Value), + Updated(Value), + Deleted(Value), +} + +#[derive(Debug, Serialize, Clone)] +pub enum WatchEventMessageResource +where + T: Resource, +{ + Created(T), + Updated(T), + Deleted(T), +} + +impl TryFrom for WatchEventMessageResource +where + T: Resource, +{ + type Error = ServerError; + + fn try_from(value: WatchEventMessageValue) -> ServerResult { + match value { + WatchEventMessageValue::Created(v) => { + let r = serde_json::from_value(v)?; + Ok(WatchEventMessageResource::Created(r)) + } + WatchEventMessageValue::Updated(v) => { + let r = serde_json::from_value(v)?; + Ok(WatchEventMessageResource::Updated(r)) + } + WatchEventMessageValue::Deleted(v) => { + let r = serde_json::from_value(v)?; + Ok(WatchEventMessageResource::Deleted(r)) + } + } + } +} + +impl WatchEventMessageValue { + pub fn get_value(&self) -> &Value { + match self { + WatchEventMessageValue::Created(v) => v, + WatchEventMessageValue::Updated(v) => v, + WatchEventMessageValue::Deleted(v) => v, + } + } +} \ No newline at end of file diff --git a/src/cores/models/mod.rs b/src/cores/models/mod.rs index f162108..486fb94 100644 --- a/src/cores/models/mod.rs +++ b/src/cores/models/mod.rs @@ -1,5 +1,6 @@ pub mod shared; pub mod api_server; +pub mod messaging; pub use api_server::*; pub use shared::*; \ No newline at end of file diff --git a/src/cores/models/shared.rs b/src/cores/models/shared.rs index 5efb313..07d9d70 100644 --- a/src/cores/models/shared.rs +++ b/src/cores/models/shared.rs @@ -203,12 +203,6 @@ impl Display for ServerError { impl Error for ServerError {} -impl From for ServerError { - fn from(error: diesel::result::Error) -> Self { - Self::internal_error(error.to_string().as_str()) - } -} - impl From for ServerError { fn from(value: serde_json::Error) -> Self { ServerError::internal_error(format!("serde error: {}", value).as_str()) diff --git a/src/cores/servers/message.rs b/src/cores/servers/message.rs index 867dfef..c3afd2d 100644 --- a/src/cores/servers/message.rs +++ b/src/cores/servers/message.rs @@ -1,4 +1,3 @@ -use crate::cores::daemons::messaging::{EventTopic, P2PEventTopic}; use crate::cores::models::{ResourcesParams, ServerRequest, ServerResponse}; use crate::cores::servers::Server; use async_trait::async_trait; @@ -11,6 +10,7 @@ use std::collections::HashMap; use std::sync::Arc; use crate::cores::router::RouterKey; use crate::cores::state::AppState; +use crate::models::messaging::{EventTopic, P2PEventTopic}; pub struct MessagingServer; diff --git a/tests/server_tests.rs b/tests/server_tests.rs index f0348e8..f0169cf 100644 --- a/tests/server_tests.rs +++ b/tests/server_tests.rs @@ -2,20 +2,17 @@ #[cfg(feature = "test")] mod tests { use std::collections::HashMap; - use super::*; - use actix_web::{test, web, App, Error, HttpResponse}; - use anyhow::Result; + use actix_web::{test, App}; use env_logger::{Builder, Target}; - use fleet_apiserver::{prepare_app_state, start_server}; - use serde_json::{json, Value}; + use fleet_apiserver::prepare_app_state; + use serde_json::Value; use serial_test::serial; - use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time; use actix_web::web::Data; use paperclip::actix::OpenApiExt; - use tokio::time::{sleep, timeout}; - use fleet_apiserver::cores::models::{ServerJsonResponse, ServerResponse, ServerStatusCode}; + use tokio::time::sleep; + use fleet_apiserver::cores::models::{ServerJsonResponse, ServerStatusCode}; use fleet_apiserver::cores::servers; use fleet_apiserver::cores::state::AppState; use fleetmodv2::resources::models::{Metadata, Pod}; -- Gitee