From c95dd7970c4be7a45fa7586febe97ada646dd991 Mon Sep 17 00:00:00 2001 From: yzc1114 Date: Thu, 12 Jun 2025 17:36:46 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0gossip=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/daemons/consensus.rs | 100 ++++++++++++++++++++++------- src/cores/handlers/consensus.rs | 7 ++ src/cores/mod.rs | 2 + src/cores/router.rs | 16 ++++- src/cores/servers/actix_web/mod.rs | 44 +++++++++---- src/utils/test.rs | 8 +-- 6 files changed, 135 insertions(+), 42 deletions(-) diff --git a/src/cores/daemons/consensus.rs b/src/cores/daemons/consensus.rs index ac00cda..1f110a1 100644 --- a/src/cores/daemons/consensus.rs +++ b/src/cores/daemons/consensus.rs @@ -6,6 +6,7 @@ use client_rust::{ config, }; use consensus_kv::{ + gossip::service::{GossipConsensusService, GossipMessage, JoinRequest, JoinResponse}, raft::{ network::{ AddLearnerRequest, AddLearnerResponse, ChangeMembershipRequest, @@ -37,13 +38,17 @@ impl ConsensusDaemon { pub async fn new_consensus_svc(&self) -> Arc { let rpc_sender: Box = Box::new(ActixWebAPICaller::new(None, None, TransportProtocol::TCP)); - let consensus_svc = consensus_kv::init_consensus_service( - &self.consensus_config, - rpc_sender, - consensus_kv::ConsensusServiceImpls::Raft, - ) - .await - .expect("Failed to init consensus service"); + let impl_type = if self.consensus_config.raft_config.is_some() { + consensus_kv::ConsensusServiceImpls::Raft + } else if self.consensus_config.gossip_config.is_some() { + consensus_kv::ConsensusServiceImpls::Gossip + } else { + panic!("No consensus service implementation provided"); + }; + let consensus_svc = + consensus_kv::init_consensus_service(&self.consensus_config, rpc_sender, impl_type) + .await + .expect("Failed to init consensus service"); consensus_svc } @@ -130,12 +135,39 @@ impl ConsensusDaemon { result } - async fn cast_to_concrete_service<'a>( + async fn cast_to_raft_service<'a>( g: &'a Arc, ) -> &'a RaftConsensusService { g.as_any().downcast_ref::().unwrap() } + async fn cast_to_gossip_service<'a>( + g: &'a Arc, + ) -> &'a GossipConsensusService { + g.as_any().downcast_ref::().unwrap() + } + + pub async fn get_peers(&self) -> BTreeMap { + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + return BTreeMap::from([( + self.consensus_config.self_peer.id, + self.consensus_config.self_peer.clone(), + )]); + } + let svc = svc.as_ref().unwrap(); + svc.get_peers() + } + + pub fn uninitialized_error() -> ConsensusResult<()> { + Err(ConsensusError::RPCError(ConsensusRPCError::Application( + "Not Initialized".to_string(), + ))) + } +} + +// raft consensus service methods +impl ConsensusDaemon { pub async fn append_entries( &self, req: AppendEntriesRequest, @@ -145,7 +177,7 @@ impl ConsensusDaemon { return Err(RaftError::Fatal(Fatal::Stopped)); } let svc = svc.as_ref().unwrap(); - let svc = Self::cast_to_concrete_service(svc).await; + let svc = Self::cast_to_raft_service(svc).await; let result = svc.append(req).await; result } @@ -156,7 +188,7 @@ impl ConsensusDaemon { return Err(RaftError::Fatal(Fatal::Stopped)); } let svc = svc.as_ref().unwrap(); - let svc = Self::cast_to_concrete_service(svc).await; + let svc = Self::cast_to_raft_service(svc).await; let result = svc.vote(req).await; result } @@ -167,7 +199,7 @@ impl ConsensusDaemon { return Err(Fatal::Stopped); } let svc = svc.as_ref().unwrap(); - let svc = Self::cast_to_concrete_service(svc).await; + let svc = Self::cast_to_raft_service(svc).await; let result = svc.snapshot(req).await; result } @@ -178,7 +210,7 @@ impl ConsensusDaemon { return Err(RaftError::Fatal(Fatal::Stopped)); } let svc = svc.as_ref().unwrap(); - let svc = Self::cast_to_concrete_service(svc).await; + let svc = Self::cast_to_raft_service(svc).await; let result = svc.add_learner(&req).await; result } @@ -192,7 +224,7 @@ impl ConsensusDaemon { return Err(RaftError::Fatal(Fatal::Stopped)); } let svc = svc.as_ref().unwrap(); - let svc = Self::cast_to_concrete_service(svc).await; + let svc = Self::cast_to_raft_service(svc).await; let result = svc.change_membership(req).await; result } @@ -203,26 +235,46 @@ impl ConsensusDaemon { return Err(RaftError::Fatal(Fatal::Stopped)); } let svc = svc.as_ref().unwrap(); - let svc = Self::cast_to_concrete_service(svc).await; + let svc = Self::cast_to_raft_service(svc).await; let result = svc.metrics().await; Ok(result) } +} - pub async fn get_peers(&self) -> BTreeMap { +// gossip consensus service methods +impl ConsensusDaemon { + pub async fn handle_gossip_message(&self, req: GossipMessage) -> Result<(), ConsensusError> { + log::debug!("ApiServer received Gossip message: {:?}", req); let svc = self.consensus_svc.lock().await; if svc.is_none() { - return BTreeMap::from([( - self.consensus_config.self_peer.id, - self.consensus_config.self_peer.clone(), - )]); + log::error!("ApiServer Gossip message: not initialized"); + return Err(ConsensusError::RPCError(ConsensusRPCError::Application( + "Not Initialized".to_string(), + ))); } let svc = svc.as_ref().unwrap(); - svc.get_peers() + let svc = Self::cast_to_gossip_service(svc).await; + let result = svc.receive_gossip(req).await; + log::debug!("ApiServer received Gossip message finished"); + Ok(()) } - pub fn uninitialized_error() -> ConsensusResult<()> { - Err(ConsensusError::RPCError(ConsensusRPCError::Application( - "Not Initialized".to_string(), - ))) + pub async fn handle_gossip_join( + &self, + req: JoinRequest, + ) -> Result { + log::debug!("ApiServer received Gossip join request: {:?}", req); + let svc = self.consensus_svc.lock().await; + if svc.is_none() { + log::error!("Gossip join: not initialized"); + return Err(ConsensusError::RPCError(ConsensusRPCError::Application( + "Not Initialized".to_string(), + ))); + } + let svc = svc.as_ref().unwrap(); + let svc = Self::cast_to_gossip_service(svc).await; + let result = svc.receive_join(req).await; + log::debug!("ApiServer Gossip join result: {:?}", result); + Ok(result) } } diff --git a/src/cores/handlers/consensus.rs b/src/cores/handlers/consensus.rs index 6d5485b..a4362ec 100644 --- a/src/cores/handlers/consensus.rs +++ b/src/cores/handlers/consensus.rs @@ -65,6 +65,13 @@ fn bad_request(msg: &str) -> ServerResponse { .into() } +pub mod gossip { + use super::*; + + define_service_fn!(handle_gossip_message, handle_gossip_message); + define_service_fn!(handle_gossip_join, handle_gossip_join); +} + pub mod raft { use fleetmodv2::api_server::{ServerRawResponse, ServerResponse}; diff --git a/src/cores/mod.rs b/src/cores/mod.rs index a02a28c..4e53b31 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -194,12 +194,14 @@ pub fn single_node_consensus_config(address: &str) -> consensus_kv::ConsensusCon let self_peer = consensus_kv::Peer { id: 1, address: address.to_string(), + last_seen: tokio::time::Instant::now().elapsed().as_secs(), }; let init_peers = BTreeMap::from([( 1, consensus_kv::Peer { id: 1, address: address.to_string(), + last_seen: tokio::time::Instant::now().elapsed().as_secs(), }, )]); let raft_config = RaftConfig { diff --git a/src/cores/router.rs b/src/cores/router.rs index c5e4075..f03718f 100644 --- a/src/cores/router.rs +++ b/src/cores/router.rs @@ -103,6 +103,8 @@ pub enum RouterKey { ConsensusRaftAddLearner, ConsensusRaftChangeMembership, ConsensusRaftMetrics, + ConsensusGossipMessage, + ConsensusGossipJoin, // Payload Mgr Router Key InsertPayload, @@ -144,7 +146,9 @@ impl RouterKey { | RouterKey::ConsensusRaftSnapshot | RouterKey::ConsensusRaftAddLearner | RouterKey::ConsensusRaftChangeMembership - | RouterKey::ConsensusRaftMetrics => true, + | RouterKey::ConsensusRaftMetrics + | RouterKey::ConsensusGossipMessage + | RouterKey::ConsensusGossipJoin => true, _ => false, } } @@ -641,6 +645,16 @@ impl Router { RouterKey::ConsensusRaftMetrics, handlers::consensus::raft::metrics ); + add_route!( + router, + RouterKey::ConsensusGossipMessage, + handlers::consensus::gossip::handle_gossip_message + ); + add_route!( + router, + RouterKey::ConsensusGossipJoin, + handlers::consensus::gossip::handle_gossip_join + ); } { diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index ef240da..39e37cf 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -1300,12 +1300,14 @@ pub mod consensus { .service(get_peers) .service(join) .service(leave) - .service(append_entries) - .service(vote) - .service(snapshot) - .service(add_learner) - .service(change_membership) - .service(metrics), + .service(raft_append_entries) + .service(raft_vote) + .service(raft_snapshot) + .service(raft_add_learner) + .service(raft_change_membership) + .service(raft_metrics) + .service(gossip_message) + .service(gossip_join), ); } @@ -1346,7 +1348,7 @@ pub mod consensus { "Consensus" ); define_raw_response_method!( - append_entries, + raft_append_entries, post, "/raft/append-entries", (), @@ -1355,7 +1357,7 @@ pub mod consensus { "Consensus" ); define_raw_response_method!( - vote, + raft_vote, post, "/raft/vote", (), @@ -1364,7 +1366,7 @@ pub mod consensus { "Consensus" ); define_raw_response_method!( - snapshot, + raft_snapshot, post, "/raft/snapshot", (), @@ -1373,7 +1375,7 @@ pub mod consensus { "Consensus" ); define_raw_response_method!( - add_learner, + raft_add_learner, post, "/raft/add-learner", (), @@ -1382,7 +1384,7 @@ pub mod consensus { "Consensus" ); define_raw_response_method!( - change_membership, + raft_change_membership, post, "/raft/change-membership", (), @@ -1391,7 +1393,7 @@ pub mod consensus { "Consensus" ); define_raw_response_method!( - metrics, + raft_metrics, get, "/raft/metrics", (), @@ -1399,6 +1401,24 @@ pub mod consensus { RouterKey::ConsensusRaftMetrics, "Consensus" ); + define_raw_response_method!( + gossip_message, + post, + "/gossip/message", + (), + (), + RouterKey::ConsensusGossipMessage, + "Consensus" + ); + define_raw_response_method!( + gossip_join, + post, + "/gossip/join", + (), + (), + RouterKey::ConsensusGossipJoin, + "Consensus" + ); } pub mod payload { diff --git a/src/utils/test.rs b/src/utils/test.rs index d9df234..2bb0bb5 100644 --- a/src/utils/test.rs +++ b/src/utils/test.rs @@ -3,7 +3,7 @@ use crate::cores::servers::{self, MessagingServer, Server}; use crate::cores::state::AppState; use crate::prepare_app_state; use bon::Builder; -use consensus_kv::raft_config::{RaftConfig, SnapshotType, StorageType}; +use consensus_kv::gossip::config::GossipConfig; use consensus_kv::{ConsensusConfig, Peer}; use feventbus::impls::messaging::messaging::Messaging; use feventbus::traits::controller::EventBus; @@ -151,14 +151,12 @@ pub fn single_node_consensus_config(address: Option) -> ConsensusConfig let self_peer = Peer { id: 1, address: address.clone().unwrap_or("0.0.0.0:39093".to_string()), + last_seen: tokio::time::Instant::now().elapsed().as_secs(), }; let consensus_config = ConsensusConfig::builder() .self_peer(self_peer.clone()) .init_peers(BTreeMap::from([(1, self_peer)])) - .raft_config(RaftConfig { - storage_type: StorageType::Mem, - snapshot_type: SnapshotType::File, - }) + .gossip_config(GossipConfig {}) .build(); consensus_config } -- Gitee