diff --git a/Cargo.toml b/Cargo.toml index e581cdbfb62ea0e1d1f853fc4f0eb006c0efa7f7..b47536a2aeab06ba2eca4dfda96996a68847545a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ default = ["eventbus", "servers", "messaging", "os_socket"] eventbus = ["feventbus"] servers = ["utils", "eventbus", "messaging", "diesel", "diesel_migrations", "actix-http", "actix-web"] utils = [] -test = ["utils", "eventbus"] +test = ["utils", "eventbus", "os_socket"] messaging = ["eventbus"] os_socket = ["os_socket_comms"] diff --git a/src/cores/daemons/consensus.rs b/src/cores/daemons/consensus.rs index 3de9dd8d38cd64da5621fd5bc2f96752733e503e..21cffa643f85aeb1564856397c386d92f8aaec5f 100644 --- a/src/cores/daemons/consensus.rs +++ b/src/cores/daemons/consensus.rs @@ -5,107 +5,112 @@ use client_rust::{ actix_web_client::{ActixWebAPICaller, TransportProtocol}, config, }; -use consensus_kv::{ - raft::{ - network::{ - AddLearnerRequest, AddLearnerResponse, ChangeMembershipRequest, - ChangeMembershipResponse, GetMetricsResponse, SnapshotRequest, - }, - types::{ - AppendEntriesRequest, AppendEntriesResponse, Fatal, RaftError, SnapshotResponse, - VoteRequest, VoteResponse, - }, +use consensus_kv::{raft::{ + network::{ + AddLearnerRequest, AddLearnerResponse, ChangeMembershipRequest, + ChangeMembershipResponse, GetMetricsResponse, SnapshotRequest, }, - ConsensusConfig, ConsensusResult, ConsensusService, Peer, PeerId, RPCSender, - RaftConsensusService, -}; + types::{ + AppendEntriesRequest, AppendEntriesResponse, Fatal, RaftError, SnapshotResponse, + VoteRequest, VoteResponse, + }, +}, ConsensusConfig, ConsensusError, ConsensusRPCError, ConsensusResult, ConsensusService, Peer, PeerId, RPCSender, RaftConsensusService}; +use tokio::sync::Mutex; pub struct ConsensusDaemon { consensus_config: ConsensusConfig, - consensus_svc: Arc, + consensus_svc: Arc>>>, } impl ConsensusDaemon { pub async fn new(consensus_config: ConsensusConfig) -> Self { + Self { + consensus_config, + consensus_svc: Arc::new(Mutex::new(None)), + } + } + + pub async fn new_consensus_svc(&self) -> Arc { let rpc_sender: Box = Box::new(ActixWebAPICaller::new(None, None, TransportProtocol::TCP)); let consensus_svc = consensus_kv::init_consensus_service( - &consensus_config, + &self.consensus_config, rpc_sender, consensus_kv::ConsensusServiceImpls::Raft, ) - .await - .expect("Failed to init consensus service"); - Self { - consensus_config, - consensus_svc, - } + .await + .expect("Failed to init consensus service"); + consensus_svc } - pub fn start(&self) { - let svc_inner = self.consensus_svc.clone(); - let config_inner = self.consensus_config.clone(); - tokio::spawn(async move { - async fn sleep_interval() { - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - }; - 'next: loop { - if svc_inner.is_initialized().await.is_ok_and(|i| i) { - sleep_interval().await; - continue; - } - log::info!( - "Consensus service not initialized, start init, self peer: {:?}", - config_inner.self_peer - ); - // initialize the service - let start_pristine_result = svc_inner - .start_pristine( - &config_inner - .init_peers - .clone() - .expect("init peers not provided"), - ) - .await; - if start_pristine_result.is_ok() { - log::info!("Consensus service start pritine successfully"); - continue; - } - // use join to wait for the service to be initialized - // todo! health check for current peer? - log::info!( - "Consensus service start pritine failed, use joining to initialize the consensus service" - ); - let init_peers = config_inner.init_peers.clone().unwrap_or_default(); - for (any_peer_id, any_peer) in init_peers.iter() { - let join_result = svc_inner.join(&any_peer).await; - if join_result.is_ok() { - log::info!( - "Consensus service join successfully, joined peer_id={}, joined peer_address={}", - any_peer_id, any_peer.address - ); - continue 'next; - } - } - log::info!( - "Consensus service join failed, sleep until next attemp to initialized..." - ) - } - }); + pub async fn start(&self) { + let consensus_svc = self.new_consensus_svc().await; + log::info!("Consensus service start init, self peer: {:?}", self.consensus_config.self_peer); + // initialize the service + let start_pristine_result = consensus_svc + .start_pristine( + &self.consensus_config + .init_peers + .clone() + .expect("init peers not provided"), + ) + .await; + if start_pristine_result.is_ok() { + log::info!("Consensus service start pritine successfully"); + self.consensus_svc + .lock() + .await + .replace(consensus_svc); + return; + } + log::error!("Failed to start consensus service: {:?}", start_pristine_result); } pub async fn handle_requests( &self, requests: Vec, ) -> Vec> { + let consensus_svc = self.consensus_svc.lock().await; + if consensus_svc.is_none() { + return requests.into_iter().map(|_| { + Err(ConsensusError::RPCError(ConsensusRPCError::Application("Not Initialized".to_string()))) + }).collect(); + } + let consensus_svc = consensus_svc.as_ref().unwrap(); let mut responses = Vec::new(); for request in requests { - let response = self.consensus_svc.db().handle_request(&request).await; + let response = consensus_svc.db().handle_request(&request).await; responses.push(response); } responses } + pub async fn handle_join(&self, peer: Peer) -> ConsensusResult<()> { + let mut curr_svc = self.consensus_svc.lock().await; + if curr_svc.is_some() { + // 如果当前服务已经存在,则先关闭当前服务 + log::info!("Consensus service shutdown before join, self peer: {:?}", self.consensus_config.self_peer); + curr_svc.as_ref().unwrap().shutdown().await; + } + let consensus_svc = self.new_consensus_svc().await; + curr_svc.replace(consensus_svc.clone()); + log::info!("Consensus service start join, self peer: {:?}", self.consensus_config.self_peer); + // initialize the service + let curr_svc = curr_svc.as_ref().unwrap(); + let result = curr_svc.join(&peer).await; + result + } + + pub async fn handle_leave(&self) -> ConsensusResult<()> { + let consensus_svc = self.consensus_svc.lock().await; + if consensus_svc.is_none() { + return Self::uninitialized_error(); + } + let consensus_svc = consensus_svc.as_ref().unwrap(); + let result = consensus_svc.shutdown().await; + result + } + async fn cast_to_concrete_service<'a>( g: &'a Arc, ) -> &'a RaftConsensusService { @@ -116,19 +121,34 @@ impl ConsensusDaemon { &self, req: AppendEntriesRequest, ) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(RaftError::Fatal(Fatal::Stopped)); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.append(req).await; result } pub async fn vote(&self, req: VoteRequest) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(RaftError::Fatal(Fatal::Stopped)); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.vote(req).await; result } pub async fn snapshot(&self, req: SnapshotRequest) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(Fatal::Stopped); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.snapshot(req).await; result } @@ -136,28 +156,56 @@ impl ConsensusDaemon { pub async fn add_learner( &self, req: AddLearnerRequest, - ) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + ) -> AddLearnerResponse { + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(RaftError::Fatal(Fatal::Stopped)); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.add_learner(&req).await; - Ok(result) + result } pub async fn change_membership( &self, req: ChangeMembershipRequest, - ) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + ) -> ChangeMembershipResponse { + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(RaftError::Fatal(Fatal::Stopped)); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.change_membership(req).await; - Ok(result) + result } pub async fn metrics(&self) -> Result { - let svc = Self::cast_to_concrete_service(&self.consensus_svc).await; + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return Err(RaftError::Fatal(Fatal::Stopped)); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_concrete_service(svc).await; let result = svc.metrics().await; Ok(result) } pub async fn get_peers(&self) -> BTreeMap { - self.consensus_svc.get_peers() + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return BTreeMap::from([ + (self.consensus_config.self_peer.id, self.consensus_config.self_peer.clone()) + ]); + } + let svc = svc.as_ref().unwrap(); + svc.get_peers() + } + + pub fn uninitialized_error() -> ConsensusResult<()> { + Err(ConsensusError::RPCError(ConsensusRPCError::Application( + "Not Initialized".to_string(), + ))) } } diff --git a/src/cores/daemons/messaging.rs b/src/cores/daemons/messaging.rs index 2a3f3581928a3df1d146337965300fbd0e6c490a..75c7b17e770b4eecdc49037de5cdc2bcc92c7f8c 100644 --- a/src/cores/daemons/messaging.rs +++ b/src/cores/daemons/messaging.rs @@ -34,7 +34,7 @@ impl WatchDaemon { } } - pub fn start(&self) { + pub async fn start(&self) { let rx = self.receiver.clone(); let msg_cli = self.msg_cli.clone(); let topics = self.topics.clone(); diff --git a/src/cores/handlers/consensus.rs b/src/cores/handlers/consensus.rs index 4ac2411dda714a078de78bf8d1077d4dadbc02b1..268c86f3d5c4a4dff76aed6f74e2e28dfd35cf1c 100644 --- a/src/cores/handlers/consensus.rs +++ b/src/cores/handlers/consensus.rs @@ -27,6 +27,8 @@ macro_rules! define_service_fn { // consensus/requests define_service_fn!(handle_requests, handle_requests); +// consensus/join +define_service_fn!(handle_join, handle_join); // consensus/peers pub async fn handle_get_peers(app_state: Arc, _: ServerRequest) -> ServerResponse { @@ -37,6 +39,15 @@ pub async fn handle_get_peers(app_state: Arc, _: ServerRequest) -> Ser ServerRawResponse::ok().body(response_vec).build().into() } +// consensus/leave +pub async fn handle_leave(app_state: Arc, _: ServerRequest) -> ServerResponse { + let response = app_state.consensus_daemon.handle_leave().await; + let Ok(response_vec) = serde_json::to_vec(&response) else { + return bad_request("invalid response body, serializing response failed"); + }; + ServerRawResponse::ok().body(response_vec).build().into() +} + fn bad_request(msg: &str) -> ServerResponse { ServerRawResponse::bad_request() .body(Vec::from(msg.as_bytes())) diff --git a/src/cores/mod.rs b/src/cores/mod.rs index b8c7e7a42df2d4a3706a7fd86fa35207ce432a9a..a388519dafe2059ee1dd39b2fde9a56bbcb12477 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -105,8 +105,8 @@ pub async fn start_server(params: ServerStartParams) -> anyhow::Result<()> { .await?; // 启动watch相关事件监听协程 // 启动consensus相关协程 - app_state.watch_daemon.start(); - app_state.consensus_daemon.start(); + app_state.watch_daemon.start().await; + app_state.consensus_daemon.start().await; // 启动网络状态聚合器 if app_state.network_status_config.enabled { diff --git a/src/cores/router.rs b/src/cores/router.rs index 3c95573a566289a330b9f1bafe0ecfa4895edbf5..cb6164a4c2830de59749a9bc70f897ebae73bd4e 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -66,6 +66,8 @@ pub enum RouterKey { // Consensus Service Router Key ConsensusRequests, ConsensusPeers, + ConsensusJoin, + ConsensusLeave, ConsensusRaftAppendEntries, ConsensusRaftVote, ConsensusRaftSnapshot, @@ -106,6 +108,8 @@ impl RouterKey { match key { RouterKey::ConsensusRequests | RouterKey::ConsensusPeers + | RouterKey::ConsensusJoin + | RouterKey::ConsensusLeave | RouterKey::ConsensusRaftAppendEntries | RouterKey::ConsensusRaftVote | RouterKey::ConsensusRaftSnapshot @@ -408,6 +412,16 @@ impl Router { RouterKey::ConsensusPeers, handlers::consensus::handle_get_peers ); + add_route!( + router, + RouterKey::ConsensusJoin, + handlers::consensus::handle_join + ); + add_route!( + router, + RouterKey::ConsensusLeave, + handlers::consensus::handle_leave + ); add_route!( router, RouterKey::ConsensusRaftAppendEntries, diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 6d90ff9f62daabb5bb440914043d081f85047efd..d4fc3e56d744e74ce240cbdbbd59c3f8e9ca83b9 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -944,6 +944,8 @@ pub mod consensus { web::scope("/consensus") .service(requests) .service(get_peers) + .service(join) + .service(leave) .service(append_entries) .service(vote) .service(snapshot) @@ -971,6 +973,24 @@ pub mod consensus { RouterKey::ConsensusPeers, "Consensus" ); + define_raw_response_method!( + join, + post, + "/join", + (), + (), + RouterKey::ConsensusJoin, + "Consensus" + ); + define_raw_response_method!( + leave, + get, + "/leave", + (), + (), + RouterKey::ConsensusLeave, + "Consensus" + ); define_raw_response_method!( append_entries, post, diff --git a/src/cores/servers/message.rs b/src/cores/servers/message.rs index 8767ceb3c94246edce2f5955731036eebe778192..d6b4b94fdfef24b4ff90d933035bbd78217673d4 100644 --- a/src/cores/servers/message.rs +++ b/src/cores/servers/message.rs @@ -158,6 +158,8 @@ impl MessagingServer { P2PEventTopic::Watch(_) => RouterKey::ResourceWatch, P2PEventTopic::ConsensusRequests(_) => RouterKey::ConsensusRequests, P2PEventTopic::ConsensusPeers(_) => RouterKey::ConsensusPeers, + P2PEventTopic::ConsensusJoin(_) => RouterKey::ConsensusJoin, + P2PEventTopic::ConsensusLeave(_) => RouterKey::ConsensusLeave, } } } diff --git a/src/utils/test.rs b/src/utils/test.rs index 0cef05ab9d954036b77451e293e91a46b41c068b..70beeb3cb7389cb91cf8e24ad30b6ecfe099128d 100644 --- a/src/utils/test.rs +++ b/src/utils/test.rs @@ -95,8 +95,8 @@ pub async fn start_test_api_server( rs422_address: Option, ) -> AppState { // 启动watch相关事件监听协程 - app_state.watch_daemon.start(); - app_state.consensus_daemon.start(); + app_state.watch_daemon.start().await; + app_state.consensus_daemon.start().await; log::info!("Starting test API server"); // 启动各个server