diff --git a/Cargo.toml b/Cargo.toml index e5c33754162e12837025b1a11006fafaae2b45e3..e2b04a6096d75bdddeacacab1f3dffc91074ca11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,22 +14,23 @@ readme = "README.md" name = "event-client-example" path = "examples/event_client_example/src/main.rs" -[[bin]] -name = "reflector-example" -path = "examples/reflector_example/src/main.rs" - -[[bin]] -name = "custom-route-example" -path = "examples/custom_route_example/src/main.rs" - -[[bin]] -name = "datamgr_route_example" -path = "examples/datamgr_route_example/src/main.rs" +#[[bin]] +#name = "reflector-example" +#path = "examples/reflector_example/src/main.rs" +# +#[[bin]] +#name = "custom-route-example" +#path = "examples/custom_route_example/src/main.rs" +# +#[[bin]] +#name = "datamgr_route_example" +#path = "examples/datamgr_route_example/src/main.rs" [dependencies] #feventbus = "0.3.0" feventbus = { git = "https://gitee.com/iscas-system/eventbus.git" } fleetmod = { git = "https://gitee.com/iscas-system/fleetmod.git" } +client-rust = {path = "../client-rust"} r2d2 = "0.8.10" dotenv = "0.15.0" diesel = { version = "2.2.0", features = ["sqlite", "postgres", "r2d2"] } diff --git a/examples/event_client_example/src/main.rs b/examples/event_client_example/src/main.rs index aa527ac4a855595d754aad273a5d047724b2a97b..d4bd9fe6a33dd76739386e51db9b683a7b8d10fa 100644 --- a/examples/event_client_example/src/main.rs +++ b/examples/event_client_example/src/main.rs @@ -1,22 +1,16 @@ use std::sync::Arc; use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::Receiver; use std::time; -use diesel::SqliteExpressionMethods; +use client_rust::event_client::{EventClient, EventTopic, PubSubEventTopic, ResourceCollectionIdentifier, ResourceDefinitionBuilder, WatchEventMessageResource}; +use client_rust::result::ServerResult; use env_logger::{Builder, Target}; use feventbus::impls::nats::nats::NatsCli; -use feventbus::message::Message; -use feventbus::traits::consumer::Consumer; use feventbus::traits::controller::EventBus; use fleetmod::pod::Pod; use fleetmod::utils::APIVersion; -use serde_json::{json, Value}; -use tokio::sync::Mutex; +use serde_json::{Value}; use tokio::time::{sleep, timeout}; -use fleet_apiserver::APIServerEventClient; -use fleet_apiserver::cores::events::{EventTopic, PubSubEventTopic, ResourceCollectionIdentifier, WatchEventMessage, WatchEventMessageResource}; use fleet_apiserver::cores::plugin::PluginManager; -use fleet_apiserver::cores::services::{APIServerResult, APIServerServiceParamsBuilder, APIServerStatusCode}; use fleet_apiserver::custom_route::custom_example; const DATABASE_URL: &str = "sqlite://./test-database.sqlite"; @@ -45,15 +39,16 @@ async fn main() { // 等待服务器启动 todo! 未来添加阻塞等待服务启动完毕的方法 tokio::time::sleep(time::Duration::from_secs(5)).await; - test_p2p().await; - // test_watch().await; + // test_p2p().await; + test_watch().await; } +#[allow(dead_code)] async fn test_p2p() { // 新建NatsClient let nats_client = NatsCli::new().await.unwrap(); // 新建EventClient. 30s表示客户端设置的超时时间 - let event_cli = APIServerEventClient::new(Arc::new(nats_client), Some(time::Duration::from_secs(30))); + let event_cli = EventClient::new(Arc::new(nats_client), Some(time::Duration::from_secs(30))); let pod_name = "test-create-pod".to_string(); let namespace = "ns1".to_string(); @@ -64,23 +59,25 @@ async fn test_p2p() { let pod = mock_pod(pod_name.clone(), Some(namespace.clone())); // with_name_params: 相当于url: /api/v1/namespaces/ns1/pods/test-create-pod - let with_name_params = APIServerServiceParamsBuilder::new() + let with_name_params = ResourceDefinitionBuilder::new() .namespace(namespace.clone()) .version(version.clone()) .kind(kind.clone()) // .plural(plural.clone()) 可以传入plural或kind,选一个即可 + .cluster_id("1234".to_string()) .name(pod_name.clone()).build().unwrap(); // without_name_params: 相当于url: /api/v1/pods - let without_name_params = APIServerServiceParamsBuilder::new() + let without_name_params = ResourceDefinitionBuilder::new() .version(version.clone()) .plural(plural.clone()) // .kind(kind.clone()) 可以传入plural或kind,选一个即可 + .cluster_id("1234".to_string()) .build().unwrap(); // APIServerServiceParamsBuilder 必须传入的参数有version,plural或kind,其他参数name,namespace,group可选 // 在创建前,先删除可能存在的pod - let res: APIServerResult = event_cli.delete(with_name_params.clone()).await; - log::info!("delete before create result: {:?}", res); + let res: ServerResult = event_cli.delete(with_name_params.clone()).await; + log::info!("---delete before create result: {:?}---", res); // 创建pod let res = event_cli @@ -90,14 +87,14 @@ async fn test_p2p() { // let res = event_cli // .create(without_name_params, pod.clone()) // .await; - log::info!("create by resource result: {:?}", res); + log::info!("---create by resource result: {:?}---", res); assert_eq!(pod, res.ok().unwrap()); // 测试获取单个pod:因为需要返回单个列表,所以需要指定具体pod名称,因此传入with_name_params let res = event_cli .get(with_name_params.clone(), Value::Null) // 第二个参数为Value类型的query,后续可支持分页等查询条件,目前传空即可 .await; - log::info!("get one pod result: {:?}", res); + log::info!("---get one pod result: {:?}---", res); assert!(res.is_ok()); assert_eq!(pod, res.ok().unwrap()); @@ -105,7 +102,7 @@ async fn test_p2p() { let res = event_cli .list(without_name_params.clone(), Value::Null) // 第二个参数为Value类型的query,后续可支持分页等查询条件,目前传空即可 .await; - log::info!("list pods result {:?}", res); + log::info!("---list pods result {:?}---", res); assert!(res.is_ok()); let pods_from_list: Vec = res.ok().unwrap(); let found = pods_from_list.iter().any(|p| p.metadata.name == pod_name); @@ -121,7 +118,7 @@ async fn test_p2p() { let patch = serde_json::to_value(&diff).unwrap(); let res = event_cli .patch(with_name_params.clone(), patch).await; - log::info!("patch result: {:?}", res); + log::info!("---patch result: {:?}---", res); assert!(res.is_ok()); assert_eq!(pod_after_pad_should_be, res.ok().unwrap()); @@ -129,18 +126,18 @@ async fn test_p2p() { let res = event_cli .delete(with_name_params.clone()) .await; - log::info!("delete result: {:?}", res); + log::info!("---delete result: {:?}---", res); assert_eq!(pod_after_pad_should_be, res.ok().unwrap()); // 测试是否删除成功 - let res: APIServerResult = event_cli + let res: ServerResult = event_cli .get(with_name_params.clone(), Value::Null) .await; - log::info!("get result after deletion: {:?}", res); + log::info!("---get result after deletion: {:?}---", res); assert!(res.is_err()); - assert_eq!(res.err().unwrap().status_code, APIServerStatusCode::NotFound); + // assert_eq!(res.err().unwrap().status_code, APIServerStatusCode::NotFound); } - +#[allow(dead_code)] async fn test_watch() { let name = "test-create-pod".to_string(); let namespace = "ns1".to_string(); @@ -149,28 +146,31 @@ async fn test_watch() { let kind = "Pod".to_string(); let pod: Pod = mock_pod(name.clone(), Some(namespace.clone())); - let with_name_params = APIServerServiceParamsBuilder::new() + let with_name_params = ResourceDefinitionBuilder::new() .name(name.clone()) .version(version.clone()) .namespace(namespace.clone()) + .cluster_id("1234".to_string()) .plural(plural.clone()).build().unwrap(); let ri = ResourceCollectionIdentifier { api_version: APIVersion { group: None, version: version.to_string() }, kind: kind.to_string(), + cluster_id: "1234".to_string(), namespace: Some(namespace.clone()), }; + let ri_topic: EventTopic = EventTopic::PubSub(PubSubEventTopic::Watch(ri.clone())); let topic_str = ri_topic.to_string(); let got_watch_msg_cnt = Arc::new(AtomicUsize::new(0)); let got_watch_msg_cnt_cloned = got_watch_msg_cnt.clone(); let nats_cli = Arc::new(NatsCli::new().await.unwrap()); - let event_cli = APIServerEventClient::new(nats_cli.clone(), None); + let event_cli = EventClient::new(nats_cli.clone(), None); // 在进行watch之前先删除,保证资源不存在 - let res: APIServerResult = event_cli.delete(with_name_params.clone()).await; - log::info!("event cli delete res {:?}", res.is_ok()); + let res: ServerResult = event_cli.delete(with_name_params.clone()).await; + log::info!("--event cli delete res {:?}---", res.is_ok()); log::info!("watch event subscribing {}", topic_str); let mut watch_value_receiver = event_cli.watch::(ri.clone()).await.unwrap(); @@ -183,14 +183,14 @@ async fn test_watch() { while let Some(resource) = watch_value_receiver.recv().await { match resource { WatchEventMessageResource::Created(pod) => { - log::info!("watched created pod"); - }, + log::info!("watched created pod {}-{}!!!!!!!!!",pod.metadata.name.clone(),pod.metadata.namespace.clone().unwrap()); + } WatchEventMessageResource::Updated(pod) => { - log::info!("watched updated pod"); - }, + log::info!("watched updated pod {}-{}!!!!!!!!!!!",pod.metadata.name.clone(),pod.metadata.namespace.clone().unwrap()); + } WatchEventMessageResource::Deleted(pod) => { - log::info!("watched deleted pod"); - }, + log::info!("watched deleted pod {}-{}!!!!!!!!!!!",pod.metadata.name.clone(),pod.metadata.namespace.clone().unwrap()); + } } got_watch_msg_cnt_cloned.fetch_add(1, std::sync::atomic::Ordering::SeqCst); if got_watch_msg_cnt_cloned.load(std::sync::atomic::Ordering::SeqCst) >= 3 { diff --git a/src/cluster_info_route/cluster_info.rs b/src/cluster_info_route/cluster_info.rs index 6a2aeb3a4d2c6ac9a5d1cea2d6d09cacec21570f..665fca3ba211f7cb1aeeeb86394bd9b8a7dff354 100644 --- a/src/cluster_info_route/cluster_info.rs +++ b/src/cluster_info_route/cluster_info.rs @@ -1,4 +1,4 @@ -use serde_json::{json, Value}; +use serde_json::{json}; use std::sync::Arc; use crate::cores::apiserver::{CustomRouteProvider, APIServerRoute, AppState}; use crate::cores::plugin::PluginManager; diff --git a/src/cores/apiserver.rs b/src/cores/apiserver.rs index 0ef7ceb5e382a75f3787834e6ae94b9784b834f9..6877eccf68dd31a7dad125f211bc8fb356debca0 100644 --- a/src/cores/apiserver.rs +++ b/src/cores/apiserver.rs @@ -1,4 +1,3 @@ -use crate::cores::events::WatchEventPublisher; use crate::cores::handlers::{APIServerResponse, Handler}; use crate::cores::services::{APIServerError, APIServerResult}; use crate::db::db::DbPool; @@ -11,6 +10,7 @@ use http::Method; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::sync::Arc; +use client_rust::event_client::WatchEventPublisher; pub struct APIServer {} diff --git a/src/cores/events.rs b/src/cores/events.rs index a83780229d1eda1e91f17889d38e09a10da7d6ff..465fb63238bcfe4615c134ed7515838727232df2 100644 --- a/src/cores/events.rs +++ b/src/cores/events.rs @@ -3,92 +3,19 @@ use crate::cores::services::{APIServerError, APIServerResult, APIServerService, use anyhow::Result; use feventbus::err::Error as FEventBusError; use feventbus::impls::nats::nats::NatsCli; -use feventbus::message::{Message, NativeEventAction}; +use feventbus::message::{Message}; use feventbus::traits::consumer::{Consumer, MessageHandler}; -use feventbus::traits::producer::Producer; -use fleetmod::utils::{APIVersion, ResourceCommon}; use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; +use serde::{Serialize}; use serde_json::error::Error as SerdeError; use serde_json::Value; -use std::fmt::{Debug, Display, Formatter}; use std::future::Future; use std::sync::Arc; -use std::time; -use strum::{EnumIter, IntoEnumIterator}; -use tokio::sync::{mpsc, Mutex}; -use tokio::time::sleep; -use fleetmod::FleetResource; -use tokio::sync::mpsc::Receiver; - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub enum EventTopic { - // P2P events表示其他组件向api server发送的点对点事件 - P2P(P2PEventTopic), - // PubSub events表示api server向其他组件发送的广播事件 - PubSub(PubSubEventTopic), -} - -impl Display for EventTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - EventTopic::P2P(topic) => write!(f, "P2P.{}", topic), - EventTopic::PubSub(topic) => write!(f, "PubSub.{}", topic), - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, EnumIter, PartialEq)] -pub enum P2PEventTopic { - Create, - Update, - Patch, - Delete, - List, - NotifyWatch, - StopWatch, -} - -impl Display for P2PEventTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - P2PEventTopic::Create => write!(f, "Create"), - P2PEventTopic::Update => write!(f, "Update"), - P2PEventTopic::Patch => write!(f, "Patch"), - P2PEventTopic::Delete => write!(f, "Delete"), - P2PEventTopic::List => write!(f, "List"), - P2PEventTopic::NotifyWatch => write!(f, "NotifyWatch"), - P2PEventTopic::StopWatch => write!(f, "StopWatch"), - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] -pub enum PubSubEventTopic { - Watch(ResourceCollectionIdentifier), -} - -impl Display for PubSubEventTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - PubSubEventTopic::Watch(ri) => write!(f, "Watch.{}", ri), - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] -pub struct ResourceCollectionIdentifier { - pub api_version: APIVersion, - pub kind: String, - pub namespace: Option, - // todo! 未来支持更多字段的过滤,如label,annotation等 -} +use client_rust::event_client::{DeleteEventRequest, DeleteEventResponse, EndsWatchRequest, EndsWatchResponse, EventTopic, P2PEventTopic, PubSubEventTopic, ReadEventRequest, ReadEventResponse, ResourceCollectionIdentifier, ResourceDefinition, StartsWatchRequest, StartsWatchResponse, WriteEventRequest, WriteEventResponse}; +use client_rust::event_client::PluralOrKind as cPluralOrKind; +use client_rust::result::{ServerError, ServerResult}; impl From for APIServerServiceParams { - // 实现从ResourceCollectionIdentifier到APIServerServiceParams的转换 - // ResourceCollectionIdentifier约束了一批资源的特定信息 - // APIServerServiceParams则用于在get时根据特定信息查询这些资源 - // 目前apiserver仅支持对api_version、kind、namespace的查询条件 fn from(ri: ResourceCollectionIdentifier) -> Self { APIServerServiceParams { group: ri.api_version.group, @@ -99,153 +26,41 @@ impl From for APIServerServiceParams { } } } - -impl Display for ResourceCollectionIdentifier { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let api_version_str = if self.api_version.group.is_some() { - format!("{}.{}", self.api_version.group.as_ref().unwrap(), self.api_version.version) - } else { - format!("{}", self.api_version.version) - }; - let namespace_str = if self.namespace.is_some() { - format!(".{}", self.namespace.as_ref().unwrap()) - } else { - "".to_owned() - }; - let kind_str = format!(".{}", self.kind); - write!(f, "{}{}{}", api_version_str, namespace_str, kind_str) - } -} - -impl ResourceCollectionIdentifier { - // 如果value是一个合法的FleetResource,并且符合当前identifier的筛选规则,则返回true - pub fn fits(&self, value: Value) -> bool { - // ResourceCommon包含api_version, kind, metadata等所有资源的共有字段 - let resource_common: ResourceCommon = match serde_json::from_value(value) { - Ok(v) => v, - Err(_) => return false, - }; - // api_version, kind必须匹配,namespace为空时不做匹配 - if resource_common.api_version != self.api_version { - return false; +impl From for PluralOrKind { + fn from(client: cPluralOrKind) -> Self { + match client { + cPluralOrKind::Plural(value) => PluralOrKind::Plural(value), + cPluralOrKind::Kind(value) => PluralOrKind::Kind(value), } - if resource_common.kind != self.kind { - return false; - } - if let Some(namespace) = self.namespace.as_ref() { - return resource_common.metadata.namespace.is_some_and(|n| n.eq(namespace)); - } - true } } -// Create, Update, Patch使用WriteEventRequest -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct WriteEventRequest { - pub params: APIServerServiceParams, - pub data: Value, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct WriteEventResponse { - pub res: APIServerResult, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct DeleteEventRequest { - pub params: APIServerServiceParams, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct DeleteEventResponse { - pub res: APIServerResult, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct ReadEventRequest { - pub params: APIServerServiceParams, - pub query: Value, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct ReadEventResponse { - pub res: APIServerResult, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct StartsWatchRequest { - pub ri: ResourceCollectionIdentifier, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct StartsWatchResponse {} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct EndsWatchRequest { - pub ri: ResourceCollectionIdentifier, -} -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct EndsWatchResponse {} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct WatchEventMessage { - pub values: Vec, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum WatchEventMessageValue { - Created(Value), - Updated(Value), - Deleted(Value), +fn convert_api_error_to_server_error(api_error: APIServerError) -> ServerError { + ServerError { + message: format!( + "StatusCode: {:?}, Message: {}", + api_error.status_code, api_error.message + ), + } } -#[derive(Debug, Serialize, Clone)] -pub enum WatchEventMessageResource -where - T: FleetResource, -{ - Created(T), - Updated(T), - Deleted(T), +pub fn convert_api_result_to_server_result(api_result: APIServerResult) -> ServerResult { + api_result.map_err(convert_api_error_to_server_error) } -impl TryFrom for WatchEventMessageResource -where - T: FleetResource, -{ - type Error = APIServerError; - - fn try_from(value: WatchEventMessageValue) -> APIServerResult { - let to_resource = |v| serde_json::from_value(v).map_err( - |e| APIServerError::internal_error(format!("err serde processing event msg: {}", e).as_str()) - ); - match value { - WatchEventMessageValue::Created(v) => { - let r = to_resource(v)?; - Ok(WatchEventMessageResource::Created(r)) - }, - WatchEventMessageValue::Updated(v) => { - let r = to_resource(v)?; - Ok(WatchEventMessageResource::Updated(r)) - }, - WatchEventMessageValue::Deleted(v) => { - let r = to_resource(v)?; - Ok(WatchEventMessageResource::Deleted(r)) - }, +impl From for APIServerServiceParams { + fn from(rd: ResourceDefinition) -> Self { + APIServerServiceParams { + group: rd.group, + plural_or_kind: rd.plural_or_kind.into(), + version: rd.version, + namespace: rd.namespace, + name: rd.name, } } } -impl WatchEventMessageValue { - pub fn get_value(&self) -> &Value { - match self { - WatchEventMessageValue::Created(v) => v, - WatchEventMessageValue::Updated(v) => v, - WatchEventMessageValue::Deleted(v) => v, - } - } -} pub struct P2PEventServer { app_state: Arc, @@ -262,55 +77,64 @@ impl P2PEventServer { pub async fn start(&self) { let nats_cli = self.app_state.nats_cli.clone(); - self.start_reply(nats_cli.clone()).await; + self.start_reply(nats_cli.clone(), self.app_state.cluster_id.clone()).await; } pub async fn reply_list(service: Arc, app_state: Arc, req: ReadEventRequest) -> ReadEventResponse { - let res = service.get_resource(req.params, req.query, app_state.clone()).await; - ReadEventResponse { res } + let res = service.get_resource(req.params.into(), req.query, app_state.clone()).await; + + let server_result: ServerResult = convert_api_result_to_server_result(res); + ReadEventResponse { res: server_result } } pub async fn reply_create(service: Arc, app_state: Arc, req: WriteEventRequest) -> WriteEventResponse { let _lock = crate::cores::GLOBAL_SQ_LOCK.lock().await; - let res = service.create_resource(req.params, req.data, app_state.clone()).await; + let res = service.create_resource(req.params.into(), req.data, app_state.clone()).await; match res.clone() { - Ok(val) => {log::info!("--------create resource {} succeed--------",val)} - Err(e) => {log::info!("!!!!!!!!!create res failed {}!!!!!!!!!",e)} + Ok(val) => { log::info!("--------create resource {} succeed--------",val) } + Err(e) => { log::info!("!!!!!!!!!create res failed {}!!!!!!!!!",e) } } - WriteEventResponse { res } + let server_result: ServerResult = convert_api_result_to_server_result(res); + + WriteEventResponse { res: server_result } } pub async fn reply_update(service: Arc, app_state: Arc, req: WriteEventRequest) -> WriteEventResponse { let _lock = crate::cores::GLOBAL_SQ_LOCK.lock().await; - let res = service.update_resource(req.params, req.data, app_state.clone()).await; + let res = service.update_resource(req.params.into(), req.data, app_state.clone()).await; match res.clone() { - Ok(val) => {log::info!("--------update resource {} succeed--------",val)} - Err(e) => {log::info!("!!!!!!!!!update resource failed {}!!!!!!!!!",e)} + Ok(val) => { log::info!("--------update resource {} succeed--------",val) } + Err(e) => { log::info!("!!!!!!!!!update resource failed {}!!!!!!!!!",e) } } - WriteEventResponse { res } + let server_result: ServerResult = convert_api_result_to_server_result(res); + + WriteEventResponse { res: server_result } } pub async fn reply_patch(service: Arc, app_state: Arc, req: WriteEventRequest) -> WriteEventResponse { let _lock = crate::cores::GLOBAL_SQ_LOCK.lock().await; - let res = service.patch_resource(req.params, req.data, app_state).await; + let res = service.patch_resource(req.params.into(), req.data, app_state).await; match res.clone() { - Ok(val) => {log::info!("--------patch resource {} succeed--------",val)} - Err(e) => {log::info!("!!!!!!!!!patch resource failed {}!!!!!!!!!",e)} + Ok(val) => { log::info!("--------patch resource {} succeed--------",val) } + Err(e) => { log::info!("!!!!!!!!!patch resource failed {}!!!!!!!!!",e) } } - WriteEventResponse { res } + let server_result: ServerResult = convert_api_result_to_server_result(res); + + WriteEventResponse { res: server_result } } pub async fn reply_delete(service: Arc, app_state: Arc, req: DeleteEventRequest) -> DeleteEventResponse { let _lock = crate::cores::GLOBAL_SQ_LOCK.lock().await; - - let res = service.delete_resource(req.params, app_state).await; + let res = service.delete_resource(req.params.into(), app_state).await; match res.clone() { - Ok(val) => {log::info!("--------delete resource {} succeed--------",val)} - Err(e) => {log::info!("!!!!!!!!!delete resource failed {}!!!!!!!!!",e)} + Ok(val) => { log::info!("--------delete resource {} succeed--------",val) } + Err(e) => { log::info!("!!!!!!!!!delete resource failed {}!!!!!!!!!",e) } } - DeleteEventResponse { res } + let server_result: ServerResult = convert_api_result_to_server_result(res); + + DeleteEventResponse { res: server_result } } pub async fn reply_starts_watch(_: Arc, app_state: Arc, req: StartsWatchRequest) -> StartsWatchResponse { @@ -352,22 +176,23 @@ impl P2PEventServer { } let body = msg.body.unwrap(); match event_topic { - P2PEventTopic::List => Self::do_reply(service, app_state, body, P2PEventServer::reply_list).await, - P2PEventTopic::Create => Self::do_reply(service, app_state, body, P2PEventServer::reply_create).await, - P2PEventTopic::Update => Self::do_reply(service, app_state, body, P2PEventServer::reply_update).await, - P2PEventTopic::Patch => Self::do_reply(service, app_state, body, P2PEventServer::reply_patch).await, - P2PEventTopic::Delete => Self::do_reply(service, app_state, body, P2PEventServer::reply_delete).await, - P2PEventTopic::NotifyWatch => Self::do_reply(service, app_state, body, P2PEventServer::reply_starts_watch).await, - P2PEventTopic::StopWatch => Self::do_reply(service, app_state, body, P2PEventServer::reply_ends_watch).await, + P2PEventTopic::List(_s) => Self::do_reply(service, app_state, body, P2PEventServer::reply_list).await, + P2PEventTopic::Create(_s) => Self::do_reply(service, app_state, body, P2PEventServer::reply_create).await, + P2PEventTopic::Update(_s) => Self::do_reply(service, app_state, body, P2PEventServer::reply_update).await, + P2PEventTopic::Patch(_s) => Self::do_reply(service, app_state, body, P2PEventServer::reply_patch).await, + P2PEventTopic::Delete(_s) => Self::do_reply(service, app_state, body, P2PEventServer::reply_delete).await, + P2PEventTopic::NotifyWatch(_s) => Self::do_reply(service, app_state, body, P2PEventServer::reply_starts_watch).await, + P2PEventTopic::StopWatch(_s) => Self::do_reply(service, app_state, body, P2PEventServer::reply_ends_watch).await, } }) }); reply_handler } - pub async fn start_reply(&self, nats_cli: Arc) { + pub async fn start_reply(&self, nats_cli: Arc, cluster_id: String) { let mut topics = Vec::new(); - for p2p in P2PEventTopic::iter() { + let p2p_topics =P2PEventTopic::create_p2p_topic(cluster_id); + for p2p in p2p_topics { topics.push(EventTopic::P2P(p2p.clone())); } for t in topics { @@ -390,325 +215,4 @@ impl P2PEventServer { } } -pub struct WatchEventPublisher { - nats_cli: Arc, - pub_sub_event_topics: Arc>>, - sender: mpsc::Sender, - receiver: Arc>>, -} - -impl WatchEventPublisher { - pub fn new(nats_cli: Arc) -> Self { - let (sx, rx) = mpsc::channel(100); - - Self { - nats_cli, - // todo! pub_sub_event_topics在重启后会清空,需要持久化 - pub_sub_event_topics: Arc::new(Mutex::new(vec![])), - sender: sx, - receiver: Arc::new(Mutex::new(rx)), - } - } - pub async fn add_pub_sub_event_topic(&self, topic: PubSubEventTopic) { - match topic { - PubSubEventTopic::Watch(_) => {} - _ => panic!("Unsupported PubSubEventTopic: {:?}", topic), - }; - let mut topics = self.pub_sub_event_topics.lock().await; - if topics.contains(&topic) { - return; - } - topics.push(topic.clone()); - log::info!("WatchEventPublisher Added PubSubEventTopic: {:?}", EventTopic::PubSub(topic)); - // todo! 持久化 - } - - pub async fn remove_pub_sub_event_topic(&self, topic: PubSubEventTopic) { - let mut topics = self.pub_sub_event_topics.lock().await; - if let Some(index) = topics.iter().position(|x| *x == topic) { - topics.remove(index); - } - } - - pub fn start(&self) { - let rx = self.receiver.clone(); - let nats_cli = self.nats_cli.clone(); - let pub_sub_event_topics = self.pub_sub_event_topics.clone(); - tokio::spawn(async move { - log::info!("WatchEventPublisher started receiving messages"); - let mut rx = rx.lock().await; // 锁定接收器 - let buffer_limit = 32; - let mut buffer = Vec::with_capacity(buffer_limit); - loop { - sleep(time::Duration::from_millis(500)).await; - buffer.clear(); - let received_cnt = rx.recv_many(&mut buffer, buffer_limit).await; - if received_cnt == 0 { - continue; - } - log::info!("WatchEventPublisher Received {} messages", received_cnt); - Self::publish_events_to_topics(nats_cli.clone(), pub_sub_event_topics.clone(), &buffer).await; - } - }); - } - - pub async fn publish_events_to_topics(nats_cli: Arc, pub_sub_event_topics: Arc>>, msg_values: &Vec) { - let pub_sub_event_topics = pub_sub_event_topics.lock().await.clone(); - for topic in pub_sub_event_topics.iter() { - let mut identified_values = Vec::new(); - for msg_value in msg_values.iter() { - let identifier = match topic { - PubSubEventTopic::Watch(identifier) => { - identifier.clone() - } - }; - let v = msg_value.get_value(); - if identifier.fits(v.clone()) { - identified_values.push(msg_value.clone()); - } - } - let cnt = identified_values.len(); - let content = WatchEventMessage { - values: identified_values, - }; - if cnt > 0 { - let topic = EventTopic::PubSub(topic.clone()); - let to_publish = Message::new( - topic.to_string(), - NativeEventAction::Other, // 该字段暂时无用 - None, // metadata - Some(content), // body - None, // created_at - ); - let nats_cli = nats_cli.clone(); - // 另起一个协程来发布消息 - tokio::spawn(async move { - log::debug!("WatchEventPublisher Publishing {} message(s) to topic {}", cnt, topic); - if let Err(e) = nats_cli.publish(to_publish).await { - log::error!("WatchEventPublisher Failed to publish event: {}", e); - } - log::debug!("WatchEventPublisher Published {} message(s) to topic {}", cnt, topic); - }); - } - } - } - - pub async fn publish_create_event(&self, data: Value) { - if let Err(e) = self.sender.send(WatchEventMessageValue::Created(data)).await { - log::error!("Failed to publish create event: {}", e); - } - } - - pub async fn publish_update_event(&self, data: Value) { - if let Err(e) = self.sender.send(WatchEventMessageValue::Updated(data)).await { - log::error!("Failed to publish update event: {}", e); - } - } - - pub async fn publish_delete_event(&self, data: Value) { - if let Err(e) = self.sender.send(WatchEventMessageValue::Deleted(data)).await { - log::error!("Failed to publish delete event: {}", e); - } - } -} - -pub struct APIServerEventClient { - nats_cli: Arc, - timeout: time::Duration, -} - -impl APIServerEventClient { - pub fn new(nats_cli: Arc, timeout: Option) -> Self { - Self { - nats_cli, - timeout: timeout.unwrap_or(time::Duration::from_secs(60)), - } - } - - fn new_message(topic: P2PEventTopic, req: R) -> Message - where - R: Clone + Serialize + DeserializeOwned, - { - let msg = Message::new( - EventTopic::P2P(topic).to_string(), - NativeEventAction::Other, - None, - Some(req), - None, - ); - msg - } - - fn map_serde_err(e: SerdeError) -> APIServerError { - APIServerError::internal_error(format!("err serde processing event msg: {}", e).as_str()) - } - - fn data_to_value(data: T) -> APIServerResult - where - T: Serialize, - { - Ok(serde_json::to_value(data).map_err(Self::map_serde_err)?) - } - - fn value_to_data(value: Value) -> APIServerResult - where - T: DeserializeOwned, - { - Ok(serde_json::from_value(value).map_err(Self::map_serde_err)?) - } - - fn parse_msg_raw_string(res: String) -> APIServerResult - where - T: DeserializeOwned, - { - let res: T = serde_json::from_str(&res).map_err(Self::map_serde_err)?; - Ok(res) - } - - async fn write_event(&self, topic: P2PEventTopic, params: APIServerServiceParams, data: Value) -> APIServerResult - where - T: Serialize + DeserializeOwned + Clone, - { - let msg = Self::new_message(topic, WriteEventRequest { params, data }); - let resp_str = self.nats_cli.request(msg, self.timeout).await?; - log::debug!("write event resp_str: {}", resp_str); - let resp: WriteEventResponse = Self::parse_msg_raw_string(resp_str)?; - Self::value_to_data(resp.res?) - } - - pub async fn create(&self, params: APIServerServiceParams, data: T) -> APIServerResult - where - T: Serialize + DeserializeOwned + Clone, - { - let mut params = params.clone(); - params.name = None; // 创建资源时在路径参数中不要包含name - self.write_event(P2PEventTopic::Create, params, Self::data_to_value(data)?).await - } - - pub async fn create_by_resource(&self, data: T) -> APIServerResult - where - T: Serialize + DeserializeOwned + Clone + FleetResource, - { - let mut params = APIServerServiceParams::from_resource(&data); - params.name = None; // 创建资源时在路径参数中不要包含name - self.create(params, data).await - } - - pub async fn delete(&self, params: APIServerServiceParams) -> APIServerResult - where - T: Serialize + DeserializeOwned + Clone, - { - let msg = Self::new_message(P2PEventTopic::Delete, DeleteEventRequest { params }); - let resp_str = self.nats_cli.request(msg, self.timeout).await?; - let resp: WriteEventResponse = Self::parse_msg_raw_string(resp_str)?; - Self::value_to_data(resp.res?) - } - - pub async fn delete_by_resource(&self, data: T) -> APIServerResult - where - T: Serialize + DeserializeOwned + Clone + FleetResource, - { - let params = APIServerServiceParams::from_resource(&data); - self.delete(params).await - } - - pub async fn update(&self, params: APIServerServiceParams, data: T) -> APIServerResult - where - T: Serialize + DeserializeOwned + Clone + FleetResource, - { - self.write_event(P2PEventTopic::Update, params, Self::data_to_value(data)?).await - } - - pub async fn update_by_resource(&self, data: T) -> APIServerResult - where - T: Serialize + DeserializeOwned + Clone + FleetResource, - { - let params = APIServerServiceParams::from_resource(&data); - self.write_event(P2PEventTopic::Update, params, Self::data_to_value(data)?).await - } - - pub async fn patch(&self, params: APIServerServiceParams, data: Value) -> APIServerResult - where - T: Serialize + DeserializeOwned + Clone, - { - self.write_event(P2PEventTopic::Patch, params, data).await - } - - async fn read_event(&self, topic: P2PEventTopic, params: APIServerServiceParams, query: Value) -> APIServerResult - where - T: Serialize + DeserializeOwned + Clone, - { - let msg = Self::new_message(topic, ReadEventRequest { params, query }); - let resp_str = self.nats_cli.request(msg, self.timeout).await?; - let resp: ReadEventResponse = Self::parse_msg_raw_string(resp_str)?; - Self::value_to_data(resp.res?) - } - - pub async fn get(&self, params: APIServerServiceParams, query: Value) -> APIServerResult - where - T: Serialize + DeserializeOwned + Clone, - { - self.read_event(P2PEventTopic::List, params, query).await - } - - pub async fn list(&self, params: APIServerServiceParams, query: Value) -> APIServerResult> - where - T: Serialize + DeserializeOwned + Clone, - { - self.read_event(P2PEventTopic::List, params, query).await - } - - // notify_watch负责通知apiserver开始监听某个资源的变化 - pub async fn notify_watch(&self, ri: ResourceCollectionIdentifier) -> APIServerResult<()> { - let msg = Self::new_message(P2PEventTopic::NotifyWatch, StartsWatchRequest { ri }); - let resp_str = self.nats_cli.request(msg, self.timeout).await?; - let _resp: StartsWatchResponse = Self::parse_msg_raw_string(resp_str)?; - Ok(()) - } - - pub async fn watch(&self, ri: ResourceCollectionIdentifier) -> APIServerResult>> - where - T: Serialize + DeserializeOwned + Clone + FleetResource + 'static, - { - self.notify_watch(ri.clone()).await?; - let nats_cli = self.nats_cli.clone(); - let topic = EventTopic::PubSub(PubSubEventTopic::Watch(ri)); - let topic_str = topic.to_string(); - let topic_str_clone = topic_str.clone(); - let (sx, rx) = mpsc::channel(32); - tokio::spawn(async move { - let subscribe_result = nats_cli.subscribe(topic_str.as_str(), Arc::new(move |message: Message| { - let sx = sx.clone(); - let topic_str = topic_str_clone.clone(); - Box::pin(async move { - if message.body.is_none() { - log::warn!("Watcher on {} Received message with no body", topic_str.as_str()); - return Ok("".to_string()); - } - let body = message.body.unwrap(); - for value in body.values.into_iter() { - let resource = WatchEventMessageResource::try_from(value); - if let Err(e) = resource { - log::error!("Watcher on {} Failed to parse WatchEventMessage: {}", topic_str.as_str(), e); - continue; - } - let resource = resource.unwrap(); - if let Err(e) = sx.send(resource).await { - log::warn!("Watcher on {} Failed to send message to channel: {}", topic_str.as_str(), e); - } - } - Ok("".to_string()) - }) - })).await; - if let Err(e) = subscribe_result { - log::error!("Watcher on {} Failed to subscribe: {}.", topic_str.as_str(), e); - } - }); - Ok(rx) - } - - pub async fn stops_watch(&self, _ri: ResourceCollectionIdentifier) -> APIServerResult<()> { - todo!() - } -} \ No newline at end of file diff --git a/src/cores/mod.rs b/src/cores/mod.rs index 609b1b8029ac4a3179d488f36380f4f3dfa782d7..1690bea0ec71c9d51cfb6dad62aaeb7dc77b3747 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -6,13 +6,14 @@ **/ use crate::cores::apiserver::{APIServer, AppState}; -use crate::cores::events::{P2PEventServer, WatchEventPublisher}; +use crate::cores::events::{P2PEventServer}; use crate::cores::handlers::DefaultHandler; use crate::db::db::DbPool; use feventbus::impls::nats::nats::NatsCli; use feventbus::traits::controller::EventBus; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use client_rust::event_client::WatchEventPublisher; use crate::cores::plugin::PluginManager; pub mod apiserver; pub mod handlers; @@ -20,7 +21,6 @@ pub mod checker; pub mod events; pub mod services; pub mod plugin; -pub mod reflector; use tokio::sync::Mutex; use uuid::Uuid; use std::fs; diff --git a/src/cores/reflector.rs b/src/cores/reflector.rs deleted file mode 100644 index c20310caef973ab9e8164409356371c5026cf5e8..0000000000000000000000000000000000000000 --- a/src/cores/reflector.rs +++ /dev/null @@ -1,321 +0,0 @@ -use async_trait::async_trait; -use fleetmod::utils::APIVersion; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; -use std::fmt::Debug; -use std::sync::Arc; -use std::{fmt, time}; -use std::str::FromStr; -use tokio::sync::mpsc::Receiver; -use serde_json::Value; -use tokio::sync::Mutex; -use crate::APIServerEventClient; -use crate::cores::events::{ResourceCollectionIdentifier, WatchEventMessageResource}; -use crate::cores::services::{APIServerError, APIServerResult, APIServerServiceParams, PluralOrKind}; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct ResourceKey { - pub api_version: APIVersion, - pub kind: String, - pub name: String, - pub namespace: Option, - pub uid: Option, -} - -impl From for APIServerServiceParams { - fn from(key: ResourceKey) -> Self { - APIServerServiceParams { - group: key.api_version.group, - plural_or_kind: PluralOrKind::Kind(key.kind), - version: key.api_version.version, - namespace: key.namespace, - name: Some(key.name), - } - } -} - -impl fmt::Display for ResourceKey { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let s = serde_json::to_string(self).map_err(|_| fmt::Error)?; - write!(f, "{}", s) - } -} - -impl ResourceKey { - pub fn new(api_version: &APIVersion, kind: &str, name: &str, namespace: Option<&str>, uid: Option<&str>) -> Self { - ResourceKey { - api_version: api_version.clone(), - kind: kind.to_string(), - name: name.to_string(), - namespace: namespace.map(|s| s.to_string()), - uid: uid.map(|s| s.to_string()), - } - } - - pub fn from(t: &T) -> Self - where - T: fleetmod::FleetResource, - { - let metadata = t.get_metadata(); - ResourceKey { - api_version: t.get_api_version().clone(), - kind: t.get_kind().to_string(), - name: metadata.name.clone(), - namespace: metadata.namespace.clone(), - uid: metadata.uid.clone(), - } - } -} - -impl FromStr for ResourceKey { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - let key: ResourceKey = serde_json::from_str(s)?; - Ok(key) - } -} - -#[async_trait] -pub trait ResourceReflector -where - T: Clone + DeserializeOwned + Serialize + fleetmod::FleetResource + 'static, -{ - async fn get(&self) -> BTreeMap; - - async fn get_by_key(&self, key: &ResourceKey) -> Option; - - async fn patch(&self, patched: &T) -> APIServerResult<()>; - - async fn update(&self, obj: &T) -> APIServerResult<()>; - - async fn create(&self, obj: &T) -> APIServerResult<()>; - - async fn delete(&self, obj: &T) -> APIServerResult<()>; - - async fn delete_by_key(&self, key: &ResourceKey) -> APIServerResult<()>; -} - - -pub struct Reflector -where - T: Clone + DeserializeOwned + Serialize + fleetmod::FleetResource + Send + Sync + 'static, -{ - reflector_runner: Arc>, -} - -impl Reflector -where - T: Clone + DeserializeOwned + Serialize + fleetmod::FleetResource + Send + Sync + 'static, -{ - pub fn new(event_client: Arc, resource_collection_identifier: ResourceCollectionIdentifier, resync_period: Option) -> Self { - Reflector { - reflector_runner: Arc::new(ReflectorRunner::init(event_client, resource_collection_identifier, resync_period)), - } - } - - pub async fn run(&self) { - let watch_runner = self.reflector_runner.clone(); - tokio::spawn(async move { - let resync_period = watch_runner.resync_period.unwrap_or(time::Duration::from_secs(5)); - loop { - let _ = watch_runner.watch().await; - // watch again if runner returns - log::warn!("Reflector on: {} watch runner returned. Restarting the watch", watch_runner.resource_collection_identifier); - tokio::time::sleep(resync_period).await; - } - }); - - let list_runner = self.reflector_runner.clone(); - tokio::spawn(async move { - let resync_period = list_runner.resync_period.unwrap_or(time::Duration::from_secs(5)); - loop { - list_runner.resync().await; - tokio::time::sleep(resync_period).await; - } - }); - } -} - -#[async_trait] -impl ResourceReflector for Reflector -where - T: Clone + DeserializeOwned + Serialize + fleetmod::FleetResource + Send + Sync + 'static, -{ - async fn get(&self) -> BTreeMap { - self.reflector_runner.get().await - } - - async fn get_by_key(&self, key: &ResourceKey) -> Option { - self.reflector_runner.get_by_key(key).await - } - - async fn patch(&self, patched: &T) -> APIServerResult<()> { - self.reflector_runner.patch(patched).await - } - - async fn update(&self, obj: &T) -> APIServerResult<()> { - self.reflector_runner.update(obj).await - } - - async fn create(&self, obj: &T) -> APIServerResult<()> { - self.reflector_runner.create(obj).await - } - - async fn delete(&self, obj: &T) -> APIServerResult<()> { - self.reflector_runner.delete(obj).await - } - - async fn delete_by_key(&self, key: &ResourceKey) -> APIServerResult<()> { - self.reflector_runner.delete_by_key(key).await - } -} - -#[derive(Clone)] -pub struct ReflectorRunner -where - T: Clone + DeserializeOwned + Serialize + fleetmod::FleetResource + Send + Sync + 'static, -{ - resource_collection_identifier: ResourceCollectionIdentifier, - event_client: Arc, - resources: Arc>>, - resync_period: Option, -} - -impl ReflectorRunner -where - T: Clone + DeserializeOwned + Serialize + fleetmod::FleetResource + Send + Sync + 'static, -{ - pub fn init(event_client: Arc, resource_collection_identifier: ResourceCollectionIdentifier, resync_period: Option) -> Self { - ReflectorRunner { - resource_collection_identifier, - event_client, - resources: Arc::new(Mutex::new(BTreeMap::new())), - resync_period, - } - } - - async fn watch(&self) -> APIServerResult<()> { - let watch_res = self.event_client.watch(self.resource_collection_identifier.clone()).await; - if watch_res.is_err() { - log::error!("Reflector on: {:?} Failed to watch the resources: {:?}", self.resource_collection_identifier, watch_res); - return Err(watch_res.err().unwrap()); - } - let mut watch_chan: Receiver> = watch_res?; - while let Some(res) = watch_chan.recv().await { - match res { - WatchEventMessageResource::Created(resource) => self.handle_upserted(resource).await, - WatchEventMessageResource::Updated(resource) => self.handle_upserted(resource).await, - WatchEventMessageResource::Deleted(resource) => self.handle_deleted(resource).await, - } - } - if watch_chan.is_closed() { - return Err(APIServerError::internal_error("watch channel is closed")); - } - Ok(()) - } - - async fn handle_upserted(&self, resource: T) { - let mut guard = self.resources.lock().await; - guard.insert(ResourceKey::from(&resource), resource); - } - - async fn handle_deleted(&self, resource: T) { - let mut guard = self.resources.lock().await; - guard.remove(&ResourceKey::from(&resource)); - } - - async fn handle_deleted_by_key(&self, key: &ResourceKey) { - let mut guard = self.resources.lock().await; - guard.remove(key); - } - - // todo: handle the atomicity of the resync. Is there another way? - async fn resync(&self) { - // at the start of resync, lock the objects - let mut guard = self.resources.lock().await; - // get the resources from the api server - let res: APIServerResult> = self.event_client.list(APIServerServiceParams::from(self.resource_collection_identifier.clone()), Value::Null).await; - if res.is_err() { - log::error!("Failed to get resources from the api server: {:?}", res.err()); - return; - } - let resources = res.unwrap(); - // overwrite the objects - guard.clear(); - for resource in resources.into_iter() { - guard.insert(ResourceKey::from(&resource), resource); - } - drop(guard); - } - - fn restricted_params(&self, obj: &T) -> APIServerServiceParams { - let mut params = APIServerServiceParams::from(self.resource_collection_identifier.clone()); - params.name = Some(obj.get_metadata().name.clone()); - params.namespace = obj.get_metadata().namespace.clone(); - params - } -} - -// important! 在写入数据时对本地缓存采取不更新策略 -// 利用watch自动更新存在延迟,会导致本地缓存和实际数据在一段时间内不一致。 -// 但是手动更新可能会导致如下情况: -// 本地连续创建、删除资源A,此时缓存中不存在A。但是watch在收到created事件时,会将A加入缓存 -// 在watch收到deleted事件之前,本地缓存中存在A,这违反了顺序一致性。 -// 因此,我们选择在写入数据时,不更新本地缓存,而是等待watch事件更新。 -#[async_trait] -impl ResourceReflector for ReflectorRunner -where - T: Clone + DeserializeOwned + Serialize + fleetmod::FleetResource + Send + Sync + 'static, -{ - async fn get(&self) -> BTreeMap { - let guard = self.resources.lock().await; - guard.clone() - } - - async fn get_by_key(&self, key: &ResourceKey) -> Option { - let guard = self.resources.lock().await; - guard.get(key).cloned() - } - - async fn patch(&self, patched: &T) -> APIServerResult<()> { - let original = self.get_by_key(&ResourceKey::from(patched)).await; - if original.is_none() { - return Err(APIServerError::not_found("要patch的资源不存在")); - } - let original = original.unwrap(); - let original_json = serde_json::to_value(original)?; - let patched_json = serde_json::to_value(patched)?; - let diff = json_patch::diff(&original_json, &patched_json); - let diff_json = serde_json::to_value(diff)?; - let restricted_params = self.restricted_params(patched); - self.event_client.patch::(restricted_params, diff_json).await?; - // self.handle_upserted(updated.clone()).await; - Ok(()) - } - - async fn update(&self, obj: &T) -> APIServerResult<()> { - let named_params = self.restricted_params(obj); - let _: T = self.event_client.update(named_params, obj.clone()).await?; - Ok(()) - } - - async fn create(&self, obj: &T) -> APIServerResult<()> { - let named_params = self.restricted_params(obj); - let _: T = self.event_client.create(named_params, obj.clone()).await?; - Ok(()) - } - - async fn delete(&self, obj: &T) -> APIServerResult<()> { - let named_params = self.restricted_params(obj); - let _: T = self.event_client.delete(named_params).await?; - Ok(()) - } - - async fn delete_by_key(&self, key: &ResourceKey) -> APIServerResult<()> { - let named_params = APIServerServiceParams::from(key.clone()); - let _: T = self.event_client.delete(named_params).await?; - Ok(()) - } -} diff --git a/src/cores/services.rs b/src/cores/services.rs index f3c85dab4974aacb0508848911669ef7346d9645..ed4e16182d29c81be9b8816b2942da82ac85f8d5 100644 --- a/src/cores/services.rs +++ b/src/cores/services.rs @@ -1,5 +1,4 @@ use crate::cores::apiserver::{AppState, K8sStylePathParams}; -use crate::cores::events::WatchEventPublisher; use crate::db::check_exist::{check_kine, check_metadata}; use crate::db::db::DbConnection; use crate::db::delete::delete_from_kine; @@ -15,6 +14,7 @@ use std::collections::HashMap; use std::error::Error; use std::fmt::{Display, Formatter}; use std::sync::{Arc, LazyLock, Mutex}; +use client_rust::event_client::WatchEventPublisher; use fleetmod::FleetResource; use json_patch::Patch; @@ -39,6 +39,9 @@ pub struct APIServerError { pub message: String, } + + + impl From for APIServerError { fn from(err: FEventBusError) -> Self { match err { diff --git a/src/lib.rs b/src/lib.rs index db2264e9208bdbb4721a45a9149ced551f912011..6eb2a4dec3bdb4a98eb87fcef313186fc768495d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,4 +13,3 @@ pub mod datamgr_route; pub mod cluster_info_route; pub use cores::{prepare_app_state, start_server}; -pub use cores::events::APIServerEventClient;