From 31479227e46274505723f9e14e39ff4f440abc47 Mon Sep 17 00:00:00 2001 From: yzc1114 Date: Tue, 27 May 2025 17:46:43 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E8=99=9A=E6=8B=9F=E9=9B=86=E7=BE=A4?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 10 +- src/cores/daemons/consensus.rs | 206 ++++++++++++++++++----------- src/cores/daemons/messaging.rs | 2 +- src/cores/handlers/consensus.rs | 11 ++ src/cores/mod.rs | 4 +- src/cores/router.rs | 14 ++ src/cores/servers/actix_web/mod.rs | 20 +++ src/cores/servers/message.rs | 2 + 8 files changed, 182 insertions(+), 87 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e581cdb..23dae61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ default = ["eventbus", "servers", "messaging", "os_socket"] eventbus = ["feventbus"] servers = ["utils", "eventbus", "messaging", "diesel", "diesel_migrations", "actix-http", "actix-web"] utils = [] -test = ["utils", "eventbus"] +test = ["utils", "eventbus", "os_socket"] messaging = ["eventbus"] os_socket = ["os_socket_comms"] @@ -55,10 +55,10 @@ network_info = { path = "crates/network_info" } feventbus = { git = "https://gitee.com/iscas-system/eventbus.git", optional = true } pnet = "0.34.0" pnet_datalink = "0.34.0" -#fleetmodv2 = { path = "../fleetmodv2" } -fleetmodv2 = { git = "https://gitee.com/iscas-system/fleetmodv2.git" } -#client-rust = { path = "../client-rust" } -client-rust = { git = "https://gitee.com/iscas-system/client-rust" } +fleetmodv2 = { path = "../fleetmodv2" } +#fleetmodv2 = { git = "https://gitee.com/iscas-system/fleetmodv2.git" } +client-rust = { path = "../client-rust" } +#client-rust = { git = "https://gitee.com/iscas-system/client-rust" } consensus_kv = { git = "https://gitee.com/iscas-system/consensus-kv" } r2d2 = "0.8.10" diff --git a/src/cores/daemons/consensus.rs b/src/cores/daemons/consensus.rs index 3de9dd8..1c02e7a 100644 --- a/src/cores/daemons/consensus.rs +++ b/src/cores/daemons/consensus.rs @@ -5,107 +5,112 @@ use client_rust::{ actix_web_client::{ActixWebAPICaller, TransportProtocol}, config, }; -use consensus_kv::{ - raft::{ - network::{ - AddLearnerRequest, AddLearnerResponse, ChangeMembershipRequest, - ChangeMembershipResponse, GetMetricsResponse, SnapshotRequest, - }, - types::{ - AppendEntriesRequest, AppendEntriesResponse, Fatal, RaftError, SnapshotResponse, - VoteRequest, VoteResponse, - }, +use consensus_kv::{raft::{ + network::{ + AddLearnerRequest, AddLearnerResponse, ChangeMembershipRequest, + ChangeMembershipResponse, GetMetricsResponse, SnapshotRequest, }, - ConsensusConfig, ConsensusResult, ConsensusService, Peer, PeerId, RPCSender, - RaftConsensusService, -}; + types::{ + AppendEntriesRequest, AppendEntriesResponse, Fatal, RaftError, SnapshotResponse, + VoteRequest, VoteResponse, + }, +}, ConsensusConfig, ConsensusError, ConsensusRPCError, ConsensusResult, ConsensusService, Peer, PeerId, RPCSender, RaftConsensusService}; +use tokio::sync::Mutex; pub struct ConsensusDaemon { consensus_config: ConsensusConfig, - consensus_svc: Arc, + consensus_svc: Arc>>>, } impl ConsensusDaemon { pub async fn new(consensus_config: ConsensusConfig) -> Self { + Self { + consensus_config, + consensus_svc: Arc::new(Mutex::new(None)), + } + } + + pub async fn new_consensus_svc(&self) -> Arc { let rpc_sender: Box = Box::new(ActixWebAPICaller::new(None, None, TransportProtocol::TCP)); let consensus_svc = consensus_kv::init_consensus_service( - &consensus_config, + &self.consensus_config, rpc_sender, consensus_kv::ConsensusServiceImpls::Raft, ) - .await - .expect("Failed to init consensus service"); - Self { - consensus_config, - consensus_svc, - } + .await + .expect("Failed to init consensus service"); + consensus_svc } - pub fn start(&self) { - let svc_inner = self.consensus_svc.clone(); - let config_inner = self.consensus_config.clone(); - tokio::spawn(async move { - async fn sleep_interval() { - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - }; - 'next: loop { - if svc_inner.is_initialized().await.is_ok_and(|i| i) { - sleep_interval().await; - continue; - } - log::info!( - "Consensus service not initialized, start init, self peer: {:?}", - config_inner.self_peer - ); - // initialize the service - let start_pristine_result = svc_inner - .start_pristine( - &config_inner - .init_peers - .clone() - .expect("init peers not provided"), - ) - .await; - if start_pristine_result.is_ok() { - log::info!("Consensus service start pritine successfully"); - continue; - } - // use join to wait for the service to be initialized - // todo! health check for current peer? - log::info!( - "Consensus service start pritine failed, use joining to initialize the consensus service" - ); - let init_peers = config_inner.init_peers.clone().unwrap_or_default(); - for (any_peer_id, any_peer) in init_peers.iter() { - let join_result = svc_inner.join(&any_peer).await; - if join_result.is_ok() { - log::info!( - "Consensus service join successfully, joined peer_id={}, joined peer_address={}", - any_peer_id, any_peer.address - ); - continue 'next; - } - } - log::info!( - "Consensus service join failed, sleep until next attemp to initialized..." - ) - } - }); + pub async fn start(&self) { + let consensus_svc = self.new_consensus_svc().await; + log::info!("Consensus service start init, self peer: {:?}", self.consensus_config.self_peer); + // initialize the service + let start_pristine_result = consensus_svc + .start_pristine( + &self.consensus_config + .init_peers + .clone() + .expect("init peers not provided"), + ) + .await; + if start_pristine_result.is_ok() { + log::info!("Consensus service start pritine successfully"); + self.consensus_svc + .lock() + .await + .replace(consensus_svc); + return; + } + log::error!("Failed to start consensus service: {:?}", start_pristine_result); } pub async fn handle_requests( &self, requests: Vec, ) -> Vec> { + let consensus_svc = self.consensus_svc.lock().await; + if consensus_svc.is_none() { + return requests.into_iter().map(|_| { + Err(ConsensusError::RPCError(ConsensusRPCError::Application("Not Initialized".to_string()))) + }).collect(); + } + let consensus_svc = consensus_svc.as_ref().unwrap(); let mut responses = Vec::new(); for request in requests { - let response = self.consensus_svc.db().handle_request(&request).await; + let response = consensus_svc.db().handle_request(&request).await; responses.push(response); } responses } + pub async fn handle_join(&self, peer: Peer) -> ConsensusResult<()> { + let mut curr_svc = self.consensus_svc.lock().await; + if curr_svc.is_some() { + // 如果当前服务已经存在,则先关闭当前服务 + log::info!("Consensus service shutdown before join, self peer: {:?}", self.consensus_config.self_peer); + curr_svc.as_ref().unwrap().shutdown().await; + } + let consensus_svc = self.new_consensus_svc().await; + curr_svc.replace(consensus_svc.clone()); + log::info!("Consensus service start join, self peer: {:?}", self.consensus_config.self_peer); + // initialize the service + let curr_svc = curr_svc.as_ref().unwrap(); + let result = curr_svc.join(&peer).await; + result + } + + pub async fn handle_leave(&self) -> ConsensusResult<()> { + let consensus_svc = self.consensus_svc.lock().await; + if consensus_svc.is_none() { + return Self::uninitialized_error(); + } + let consensus_svc = consensus_svc.as_ref().unwrap(); + let result = consensus_svc.shutdown().await; + result + } + async fn cast_to_concrete_service<'a>( g: &'a Arc, ) -> &'a RaftConsensusService { @@ -116,19 +121,34 @@ impl ConsensusDaemon { &self, req: AppendEntriesRequest, ) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(RaftError::Fatal(Fatal::Stopped)); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.append(req).await; result } pub async fn vote(&self, req: VoteRequest) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(RaftError::Fatal(Fatal::Stopped)); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.vote(req).await; result } pub async fn snapshot(&self, req: SnapshotRequest) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(Fatal::Stopped); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.snapshot(req).await; result } @@ -137,7 +157,12 @@ impl ConsensusDaemon { &self, req: AddLearnerRequest, ) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(RaftError::Fatal(Fatal::Stopped)); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.add_learner(&req).await; Ok(result) } @@ -146,18 +171,41 @@ impl ConsensusDaemon { &self, req: ChangeMembershipRequest, ) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(RaftError::Fatal(Fatal::Stopped)); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.change_membership(req).await; Ok(result) } pub async fn metrics(&self) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(RaftError::Fatal(Fatal::Stopped)); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.metrics().await; Ok(result) } pub async fn get_peers(&self) -> BTreeMap { - self.consensus_svc.get_peers() + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return BTreeMap::from([ + (self.consensus_config.self_peer.id, self.consensus_config.self_peer.clone()) + ]); + } + let svc = svc.as_ref().unwrap(); + svc.get_peers() + } + + pub fn uninitialized_error() -> ConsensusResult<()> { + Err(ConsensusError::RPCError(ConsensusRPCError::Application( + "Not Initialized".to_string(), + ))) } } diff --git a/src/cores/daemons/messaging.rs b/src/cores/daemons/messaging.rs index 2a3f358..75c7b17 100644 --- a/src/cores/daemons/messaging.rs +++ b/src/cores/daemons/messaging.rs @@ -34,7 +34,7 @@ impl WatchDaemon { } } - pub fn start(&self) { + pub async fn start(&self) { let rx = self.receiver.clone(); let msg_cli = self.msg_cli.clone(); let topics = self.topics.clone(); diff --git a/src/cores/handlers/consensus.rs b/src/cores/handlers/consensus.rs index 4ac2411..268c86f 100644 --- a/src/cores/handlers/consensus.rs +++ b/src/cores/handlers/consensus.rs @@ -27,6 +27,8 @@ macro_rules! define_service_fn { // consensus/requests define_service_fn!(handle_requests, handle_requests); +// consensus/join +define_service_fn!(handle_join, handle_join); // consensus/peers pub async fn handle_get_peers(app_state: Arc, _: ServerRequest) -> ServerResponse { @@ -37,6 +39,15 @@ pub async fn handle_get_peers(app_state: Arc, _: ServerRequest) -> Ser ServerRawResponse::ok().body(response_vec).build().into() } +// consensus/leave +pub async fn handle_leave(app_state: Arc, _: ServerRequest) -> ServerResponse { + let response = app_state.consensus_daemon.handle_leave().await; + let Ok(response_vec) = serde_json::to_vec(&response) else { + return bad_request("invalid response body, serializing response failed"); + }; + ServerRawResponse::ok().body(response_vec).build().into() +} + fn bad_request(msg: &str) -> ServerResponse { ServerRawResponse::bad_request() .body(Vec::from(msg.as_bytes())) diff --git a/src/cores/mod.rs b/src/cores/mod.rs index b8c7e7a..a388519 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -105,8 +105,8 @@ pub async fn start_server(params: ServerStartParams) -> anyhow::Result<()> { .await?; // 启动watch相关事件监听协程 // 启动consensus相关协程 - app_state.watch_daemon.start(); - app_state.consensus_daemon.start(); + app_state.watch_daemon.start().await; + app_state.consensus_daemon.start().await; // 启动网络状态聚合器 if app_state.network_status_config.enabled { diff --git a/src/cores/router.rs b/src/cores/router.rs index 3c95573..cb6164a 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -66,6 +66,8 @@ pub enum RouterKey { // Consensus Service Router Key ConsensusRequests, ConsensusPeers, + ConsensusJoin, + ConsensusLeave, ConsensusRaftAppendEntries, ConsensusRaftVote, ConsensusRaftSnapshot, @@ -106,6 +108,8 @@ impl RouterKey { match key { RouterKey::ConsensusRequests | RouterKey::ConsensusPeers + | RouterKey::ConsensusJoin + | RouterKey::ConsensusLeave | RouterKey::ConsensusRaftAppendEntries | RouterKey::ConsensusRaftVote | RouterKey::ConsensusRaftSnapshot @@ -408,6 +412,16 @@ impl Router { RouterKey::ConsensusPeers, handlers::consensus::handle_get_peers ); + add_route!( + router, + RouterKey::ConsensusJoin, + handlers::consensus::handle_join + ); + add_route!( + router, + RouterKey::ConsensusLeave, + handlers::consensus::handle_leave + ); add_route!( router, RouterKey::ConsensusRaftAppendEntries, diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 6d90ff9..d4fc3e5 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -944,6 +944,8 @@ pub mod consensus { web::scope("/consensus") .service(requests) .service(get_peers) + .service(join) + .service(leave) .service(append_entries) .service(vote) .service(snapshot) @@ -971,6 +973,24 @@ pub mod consensus { RouterKey::ConsensusPeers, "Consensus" ); + define_raw_response_method!( + join, + post, + "/join", + (), + (), + RouterKey::ConsensusJoin, + "Consensus" + ); + define_raw_response_method!( + leave, + get, + "/leave", + (), + (), + RouterKey::ConsensusLeave, + "Consensus" + ); define_raw_response_method!( append_entries, post, diff --git a/src/cores/servers/message.rs b/src/cores/servers/message.rs index 8767ceb..d6b4b94 100644 --- a/src/cores/servers/message.rs +++ b/src/cores/servers/message.rs @@ -158,6 +158,8 @@ impl MessagingServer { P2PEventTopic::Watch(_) => RouterKey::ResourceWatch, P2PEventTopic::ConsensusRequests(_) => RouterKey::ConsensusRequests, P2PEventTopic::ConsensusPeers(_) => RouterKey::ConsensusPeers, + P2PEventTopic::ConsensusJoin(_) => RouterKey::ConsensusJoin, + P2PEventTopic::ConsensusLeave(_) => RouterKey::ConsensusLeave, } } } -- Gitee From 1e83f0b27e6f6effe5f39d109c81d8598d63ddf7 Mon Sep 17 00:00:00 2001 From: yzc1114 Date: Fri, 30 May 2025 10:07:53 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E8=99=9A=E6=8B=9F=E9=9B=86=E7=BE=A4join?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 虚拟集群join --- Cargo.toml | 8 ++++---- src/cores/daemons/consensus.rs | 8 ++++---- src/utils/test.rs | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 23dae61..b47536a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,10 +55,10 @@ network_info = { path = "crates/network_info" } feventbus = { git = "https://gitee.com/iscas-system/eventbus.git", optional = true } pnet = "0.34.0" pnet_datalink = "0.34.0" -fleetmodv2 = { path = "../fleetmodv2" } -#fleetmodv2 = { git = "https://gitee.com/iscas-system/fleetmodv2.git" } -client-rust = { path = "../client-rust" } -#client-rust = { git = "https://gitee.com/iscas-system/client-rust" } +#fleetmodv2 = { path = "../fleetmodv2" } +fleetmodv2 = { git = "https://gitee.com/iscas-system/fleetmodv2.git" } +#client-rust = { path = "../client-rust" } +client-rust = { git = "https://gitee.com/iscas-system/client-rust" } consensus_kv = { git = "https://gitee.com/iscas-system/consensus-kv" } r2d2 = "0.8.10" diff --git a/src/cores/daemons/consensus.rs b/src/cores/daemons/consensus.rs index 1c02e7a..21cffa6 100644 --- a/src/cores/daemons/consensus.rs +++ b/src/cores/daemons/consensus.rs @@ -156,7 +156,7 @@ impl ConsensusDaemon { pub async fn add_learner( &self, req: AddLearnerRequest, - ) -> Result { + ) -> AddLearnerResponse { let svc = self.consensus_svc.lock().await; if svc.is_none() { return Err(RaftError::Fatal(Fatal::Stopped)); @@ -164,13 +164,13 @@ impl ConsensusDaemon { let svc = svc.as_ref().unwrap(); let svc = Self::cast_to_concrete_service(svc).await; let result = svc.add_learner(&req).await; - Ok(result) + result } pub async fn change_membership( &self, req: ChangeMembershipRequest, - ) -> Result { + ) -> ChangeMembershipResponse { let svc = self.consensus_svc.lock().await; if svc.is_none() { return Err(RaftError::Fatal(Fatal::Stopped)); @@ -178,7 +178,7 @@ impl ConsensusDaemon { let svc = svc.as_ref().unwrap(); let svc = Self::cast_to_concrete_service(svc).await; let result = svc.change_membership(req).await; - Ok(result) + result } pub async fn metrics(&self) -> Result { diff --git a/src/utils/test.rs b/src/utils/test.rs index 0cef05a..70beeb3 100644 --- a/src/utils/test.rs +++ b/src/utils/test.rs @@ -95,8 +95,8 @@ pub async fn start_test_api_server( rs422_address: Option, ) -> AppState { // 启动watch相关事件监听协程 - app_state.watch_daemon.start(); - app_state.consensus_daemon.start(); + app_state.watch_daemon.start().await; + app_state.consensus_daemon.start().await; log::info!("Starting test API server"); // 启动各个server -- Gitee