diff --git a/Cargo.toml b/Cargo.toml index eda95e37d3e464d66ef0bff336395d8b7a6db0bc..dc8b7b7ff7e6c7bbf9ac58642e7be009f255fbaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,9 +29,9 @@ path = "examples/custom_route_example/src/main.rs" [dependencies] #feventbus = "0.3.0" -feventbus = { git = "https://gitee.com/iscas-system/eventbus.git" } +feventbus = { git = "https://gitee.com/iscas-system/eventbus.git", branch = "messaging" } fleetmod = { git = "https://gitee.com/iscas-system/fleetmod.git" } -client-rust = { git = "https://gitee.com/iscas-system/client-rust.git"} +client-rust = { git = "https://gitee.com/iscas-system/client-rust.git", branch = "messaging" } r2d2 = "0.8.10" dotenv = "0.15.0" diesel = { version = "2.2.0", features = ["sqlite", "postgres", "r2d2"] } diff --git a/examples/event_client_example/src/main.rs b/examples/event_client_example/src/main.rs index d4bd9fe6a33dd76739386e51db9b683a7b8d10fa..b92a2235776a96c5a3ef987e26d0f6d2e25bdb53 100644 --- a/examples/event_client_example/src/main.rs +++ b/examples/event_client_example/src/main.rs @@ -4,7 +4,7 @@ use std::time; use client_rust::event_client::{EventClient, EventTopic, PubSubEventTopic, ResourceCollectionIdentifier, ResourceDefinitionBuilder, WatchEventMessageResource}; use client_rust::result::ServerResult; use env_logger::{Builder, Target}; -use feventbus::impls::nats::nats::NatsCli; +use feventbus::impls::messaging::messaging::Messaging; use feventbus::traits::controller::EventBus; use fleetmod::pod::Pod; use fleetmod::utils::APIVersion; @@ -46,7 +46,7 @@ async fn main() { #[allow(dead_code)] async fn test_p2p() { // 新建NatsClient - let nats_client = NatsCli::new().await.unwrap(); + let nats_client = Messaging::new().await.unwrap(); // 新建EventClient. 30s表示客户端设置的超时时间 let event_cli = EventClient::new(Arc::new(nats_client), Some(time::Duration::from_secs(30))); @@ -165,7 +165,7 @@ async fn test_watch() { let got_watch_msg_cnt = Arc::new(AtomicUsize::new(0)); let got_watch_msg_cnt_cloned = got_watch_msg_cnt.clone(); - let nats_cli = Arc::new(NatsCli::new().await.unwrap()); + let nats_cli = Arc::new(Messaging::new().await.unwrap()); let event_cli = EventClient::new(nats_cli.clone(), None); // 在进行watch之前先删除,保证资源不存在 diff --git a/src/cores/apiserver.rs b/src/cores/apiserver.rs index 6877eccf68dd31a7dad125f211bc8fb356debca0..7ba17e3cd635952ef94d9eb2f14b5d680ac6d43c 100644 --- a/src/cores/apiserver.rs +++ b/src/cores/apiserver.rs @@ -4,7 +4,7 @@ use crate::db::db::DbPool; use actix_web::dev::Server; use actix_web::web::route; use actix_web::{http, web, App, Error, HttpResponse, HttpServer}; -use feventbus::impls::nats::nats::NatsCli; +use feventbus::impls::messaging::messaging::Messaging; use futures::StreamExt; use http::Method; use serde::{Deserialize, Serialize}; @@ -17,7 +17,7 @@ pub struct APIServer {} #[derive(Clone)] pub struct AppState { pub db_pool: Arc, - pub nats_cli: Arc, + pub nats_cli: Arc, pub handler: Arc, pub watch_event_publisher: Arc, pub cluster_id: String diff --git a/src/cores/events.rs b/src/cores/events.rs index 306483508fbf031b2e0cdc5c8b38a55d9213ab04..7488dc537941e2f516a8974492f0e5558dbadc51 100644 --- a/src/cores/events.rs +++ b/src/cores/events.rs @@ -2,7 +2,7 @@ use crate::cores::apiserver::AppState; use crate::cores::services::{APIServerError, APIServerResult, APIServerService, APIServerServiceParams, PluralOrKind}; use anyhow::Result; use feventbus::err::Error as FEventBusError; -use feventbus::impls::nats::nats::NatsCli; +use feventbus::impls::messaging::messaging::Messaging; use feventbus::message::{Message}; use feventbus::traits::consumer::{Consumer, MessageHandler}; use serde::de::DeserializeOwned; @@ -168,11 +168,11 @@ impl P2PEventServer { Ok(serde_json::to_string(&res).unwrap()) } - pub fn get_reply_handler(&self, event_topic: P2PEventTopic) -> MessageHandler + pub fn get_reply_handler(&self, event_topic: P2PEventTopic) -> MessageHandler { let service = self.service.clone(); let app_state = self.app_state.clone(); - let reply_handler: MessageHandler = Arc::new(move |msg: Message| { + let reply_handler: MessageHandler = Arc::new(move |msg: Message| { let event_topic = event_topic.clone(); let service = service.clone(); let app_state = app_state.clone(); @@ -198,7 +198,7 @@ impl P2PEventServer { reply_handler } - pub async fn start_reply(&self, nats_cli: Arc, cluster_id: String) { + pub async fn start_reply(&self, nats_cli: Arc, cluster_id: String) { let mut topics = Vec::new(); let p2p_topics =P2PEventTopic::create_p2p_topic(cluster_id); for p2p in p2p_topics { diff --git a/src/cores/mod.rs b/src/cores/mod.rs index 3767b31bfe483e8721f90aef03534fb385e56d0d..2bce73738053c2474b20838edc82379eb7ed04bc 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -10,7 +10,7 @@ use crate::cores::handlers::DefaultHandler; use crate::cores::plugin::PluginManager; use crate::db::db::DbPool; use client_rust::event_client::WatchEventPublisher; -use feventbus::impls::nats::nats::NatsCli; +use feventbus::impls::messaging::messaging::Messaging; use feventbus::traits::controller::EventBus; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -61,8 +61,8 @@ fn get_or_create_uuid(file_path: &PathBuf) -> io::Result { pub async fn prepare_app_state(database_url: &str) -> anyhow::Result { let db_pool = Arc::new(DbPool::new(database_url)?); let handler: DefaultHandler = DefaultHandler::new(); - let nats_cli = Arc::new(NatsCli::new().await?); - let watch_event_publisher = Arc::new(WatchEventPublisher::new(nats_cli.clone())); + let messaging = Arc::new(Messaging::new().await?); + let watch_event_publisher = Arc::new(WatchEventPublisher::new(messaging.clone())); // let file_path = "/root/iscas/fleet/uuid.conf"; let config_dir = dirs::config_dir().expect("config dir not found"); let file_path = config_dir.join("iscas").join("fleet").join("uuid.conf"); @@ -71,7 +71,7 @@ pub async fn prepare_app_state(database_url: &str) -> anyhow::Result { Ok(AppState { db_pool, handler: Arc::new(handler), - nats_cli, + nats_cli: messaging, watch_event_publisher, cluster_id, })