From b501365b406a575457e9ef6728d29bac04b4921f Mon Sep 17 00:00:00 2001 From: lxq <191991518@qq.com> Date: Mon, 18 Aug 2025 16:54:05 +0800 Subject: [PATCH] feat: Implement the dependecy_coordiantor ,only support the process writen in config.yaml, but also support the extension of detect dependency Refactor the publisher trait from monitor trait --- config.yaml | 34 +- healer-ebpf/Cargo.toml | 7 +- healer-ebpf/src/main.rs | 103 +++-- healer/src/config.rs | 74 +++- healer/src/coordinator.rs | 1 + .../src/coordinator/dependency_coordinator.rs | 418 ++++++++++++++++++ healer/src/core_logic.rs | 12 +- healer/src/event_bus.rs | 21 +- healer/src/main.rs | 3 +- healer/src/monitor.rs | 1 - healer/src/monitor/ebpf_monitor.rs | 33 +- healer/src/monitor/network_monitor.rs | 30 +- healer/src/monitor/pid_monitor.rs | 15 +- healer/src/publisher.rs | 9 + healer/src/service_manager.rs | 29 +- 15 files changed, 691 insertions(+), 99 deletions(-) create mode 100644 healer/src/coordinator.rs create mode 100644 healer/src/coordinator/dependency_coordinator.rs create mode 100644 healer/src/publisher.rs diff --git a/config.yaml b/config.yaml index f3c9398..dcefb16 100644 --- a/config.yaml +++ b/config.yaml @@ -29,20 +29,26 @@ processes: # retry_window_secs: 60 # cooldown_secs: 180 # 如果发生熔断,冷却3分钟 - # - name: "simple_test_process" - # enabled: true - # command: "/home/lxq/ospp/simple_test_process/target/debug/simple_test_process" - # args: [] - # run_as_root: false - # run_as_user: "lxq" - # monitor: - # type: "ebpf" - # recovery: - # type: "regular" - # retries: 3 # 60秒内最多重试3次 - # retry_window_secs: 60 - # cooldown_secs: 180 # 如果发生熔断,冷却3分钟 - + - name: "simple_test_process" + enabled: true + command: "/home/lxq/ospp/simple_test_process/target/debug/simple_test_process" + #ebpf由于每次重启后的pid都是变化的,所以检测是否是被监测进程需要依靠于进程名作为监测主键,默认名字是应用名,后续扩展 + args: [] + run_as_root: false + run_as_user: "lxq" + monitor: + type: "ebpf" + recovery: + type: "regular" + retries: 3 # 60秒内最多重试3次 + retry_window_secs: 60 + cooldown_secs: 180 # 如果发生熔断,冷却3分钟 + dependencies: + - target: "simple_net_process" + kind: "requires" + hard: true + max_wait_secs: 25 + on_failure: "abort" - name: "simple_net_process" enabled: true diff --git a/healer-ebpf/Cargo.toml b/healer-ebpf/Cargo.toml index 498ba1c..0fd5266 100644 --- a/healer-ebpf/Cargo.toml +++ b/healer-ebpf/Cargo.toml @@ -14,4 +14,9 @@ which = { version = "8.0.0", default-features = false, features = ["real-sys"] } [[bin]] name = "healer" -path = "src/main.rs" \ No newline at end of file +path = "src/main.rs" +required-features = ["build-ebpf"] + +[features] +# 默认不构建 eBPF 二进制,除非显式开启该 feature。 +build-ebpf = [] diff --git a/healer-ebpf/src/main.rs b/healer-ebpf/src/main.rs index ef06ee7..be71b73 100644 --- a/healer-ebpf/src/main.rs +++ b/healer-ebpf/src/main.rs @@ -1,57 +1,68 @@ -#![no_std] -#![no_main] - -use aya_ebpf::{ - EbpfContext, - macros::{map, tracepoint}, - maps::{HashMap, PerfEventArray}, - programs::TracePointContext, -}; -use aya_log_ebpf::info; -use healer_common::ProcessExitEvent; - -// 存储要监控的进程名(截断到15个字符) -#[map] -static PROCESS_NAMES_TO_MONITOR: HashMap<[u8; 16], u8> = HashMap::with_max_entries(1024, 0); - -#[map] -static EVENTS: PerfEventArray = PerfEventArray::new(0); - -#[tracepoint] -pub fn healer_exit(ctx: TracePointContext) -> u32 { - match try_healer_exit(ctx) { - Ok(ret) => ret, - Err(ret) => ret, - } -} +#![cfg_attr(feature = "build-ebpf", no_std)] +#![cfg_attr(feature = "build-ebpf", no_main)] + +// 将 eBPF 相关代码放到一个 cfg 模块里,避免到处写 #[cfg(...)] +#[cfg(feature = "build-ebpf")] +mod ebpf { + use aya_ebpf::{ + EbpfContext, + macros::{map, tracepoint}, + maps::{HashMap, PerfEventArray}, + programs::TracePointContext, + }; + use aya_log_ebpf::info; + use healer_common::ProcessExitEvent; + + // 存储要监控的进程名(截断到15个字符) + #[map] + static PROCESS_NAMES_TO_MONITOR: HashMap<[u8; 16], u8> = HashMap::with_max_entries(1024, 0); -fn try_healer_exit(ctx: TracePointContext) -> Result { - let pid = ctx.pid(); - let tgid = ctx.tgid(); + #[map] + static EVENTS: PerfEventArray = PerfEventArray::new(0); - if pid != tgid { - return Ok(0); + #[tracepoint] + pub fn healer_exit(ctx: TracePointContext) -> u32 { + match try_healer_exit(ctx) { + Ok(ret) => ret, + Err(ret) => ret, + } } - let comm = match aya_ebpf::helpers::bpf_get_current_comm() { - Ok(comm_array) => comm_array, - Err(_) => return Ok(0), - }; + fn try_healer_exit(ctx: TracePointContext) -> Result { + let pid = ctx.pid(); + let tgid = ctx.tgid(); + + if pid != tgid { + return Ok(0); + } + + let comm = match aya_ebpf::helpers::bpf_get_current_comm() { + Ok(comm_array) => comm_array, + Err(_) => return Ok(0), + }; - // 检查这个进程名是否在监控列表中 - if unsafe { PROCESS_NAMES_TO_MONITOR.get(&comm) }.is_some() { - info!(&ctx, "Monitored process detected"); + // 检查这个进程名是否在监控列表中 + if unsafe { PROCESS_NAMES_TO_MONITOR.get(&comm) }.is_some() { + info!(&ctx, "Monitored process detected"); - // 发送包含进程名的事件 - let event = ProcessExitEvent { pid, comm }; - EVENTS.output(&ctx, &event, 0); + // 发送包含进程名的事件 + let event = ProcessExitEvent { pid, comm }; + EVENTS.output(&ctx, &event, 0); + } + + Ok(0) } - Ok(0) + // 提供 panic 处理,仅在 eBPF 特性开启时使用(no_std 环境) + #[cfg(not(test))] + #[panic_handler] + fn panic(_info: &core::panic::PanicInfo) -> ! { + loop {} + } } -#[cfg(not(test))] -#[panic_handler] -fn panic(_info: &core::panic::PanicInfo) -> ! { - loop {} +// 未启用 eBPF 时,提供一个空的 std main,避免宿主构建报错。 +#[cfg(not(feature = "build-ebpf"))] +fn main() { + eprintln!("healer-ebpf built without 'build-ebpf' feature; skipping eBPF program"); } diff --git a/healer/src/config.rs b/healer/src/config.rs index 7aa0505..d074bc6 100644 --- a/healer/src/config.rs +++ b/healer/src/config.rs @@ -1,9 +1,6 @@ use crate::daemon_handler::DaemonConfig; -use crate::monitor::ebpf_monitor; -use nix::libc::SKF_NET_OFF; use serde::Deserialize; use std::fs; -use std::ops::Not; use std::path::{Path, PathBuf}; // 顶层配置结构体 @@ -30,6 +27,58 @@ pub struct ProcessConfig { pub monitor: MonitorConfig, #[serde(default)] pub recovery: RecoveryConfig, + #[serde(default)] + pub dependencies: Vec, +} + +// ---------------- Dependency Config ---------------- + +#[derive(Deserialize, Debug, Clone)] +#[serde(untagged)] +pub enum RawDependency { + Simple(String), + Detailed(DependencyConfig), +} + +#[derive(Deserialize, Debug, Clone)] +pub struct DependencyConfig { + pub target: String, + #[serde(default = "default_kind")] + pub kind: DependencyKind, + #[serde(default = "default_true")] + pub hard: bool, + #[serde(default = "default_max_wait_secs")] + pub max_wait_secs: u64, + #[serde(default = "default_on_failure")] + pub on_failure: OnFailure, +} + +#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum DependencyKind { + Requires, + After, +} + +#[derive(Deserialize, Debug, Clone, Copy, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum OnFailure { + Abort, + Skip, + Degrade, +} + +fn default_kind() -> DependencyKind { + DependencyKind::Requires +} +fn default_true() -> bool { + true +} +fn default_max_wait_secs() -> u64 { + 30 +} +fn default_on_failure() -> OnFailure { + OnFailure::Abort } #[derive(Deserialize, Debug, Clone)] @@ -92,7 +141,6 @@ pub struct EbpfMonitorConfig { #[derive(Debug, Clone)] pub struct NetworkMonitorConfig { pub name: String, - // pub pid_file_path: PathBuf, pub target_url: String, // 目标URL pub interval_secs: u64, //检查的频率间隔 } @@ -124,7 +172,7 @@ impl ProcessConfig { } pub fn get_ebpf_monitor_config(&self) -> Option { - if let MonitorConfig::Ebpf(ebpf_fields) = &self.monitor { + if let MonitorConfig::Ebpf(_ebpf_fields) = &self.monitor { Some(EbpfMonitorConfig { name: self.name.clone(), command: self.command.clone(), @@ -145,6 +193,22 @@ impl ProcessConfig { None } } + + pub fn resolved_dependencies(&self) -> Vec { + self.dependencies + .iter() + .map(|raw| match raw { + RawDependency::Simple(name) => DependencyConfig { + target: name.clone(), + kind: default_kind(), + hard: default_true(), + max_wait_secs: default_max_wait_secs(), + on_failure: default_on_failure(), + }, + RawDependency::Detailed(d) => d.clone(), + }) + .collect() + } } impl AppConfig { diff --git a/healer/src/coordinator.rs b/healer/src/coordinator.rs new file mode 100644 index 0000000..ddd1ec6 --- /dev/null +++ b/healer/src/coordinator.rs @@ -0,0 +1 @@ +pub mod dependency_coordinator; diff --git a/healer/src/coordinator/dependency_coordinator.rs b/healer/src/coordinator/dependency_coordinator.rs new file mode 100644 index 0000000..fab1e6f --- /dev/null +++ b/healer/src/coordinator/dependency_coordinator.rs @@ -0,0 +1,418 @@ +use crate::{ + config::{AppConfig, DependencyConfig, DependencyKind, OnFailure}, + event_bus::ProcessEvent, + publisher::Publisher, + subscriber::Subscriber, +}; +use async_trait::async_trait; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::RwLock; +use tokio::sync::broadcast; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; + +// no dependency graph imports needed + +pub struct DependencyCoordinator { + /// 下游事件总线(发给 Healer 等消费者) + pub out_tx: broadcast::Sender, + /// 上游事件接收器(来自各 Monitor) + pub in_rx: broadcast::Receiver, + pub app_config: Arc>, + // 受管目标集合(来自配置 processes.name),用于区分已托管与未知目标 + managed_targets: HashSet, + deferred: HashMap, // 延迟恢复状态表 + retry_tx: UnboundedSender, + retry_rx: UnboundedReceiver, + /// 处于recovering的目标及其过期时间(用于为简单依赖提供阻塞) + recovering_until: HashMap, +} + +#[derive(Debug, Clone)] +struct DeferredState { + original_event: ProcessEvent, // 初次触发保存,用于最终放行 + deferred_count: u32, + first_deferred_at: Instant, + last_eval_at: Instant, + next_retry_at: Instant, + // 仅跟踪会阻塞的依赖(Requires + hard=true) + deps: Vec, + // 最近一次评估时仍在阻塞的依赖名(仅用于日志展示) + waiting_on: Vec, +} + +#[derive(Debug, Clone)] +struct PerDepState { + cfg: DependencyConfig, + status: DepWaitStatus, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum DepWaitStatus { + Waiting, + TimedOut, +} + +#[derive(Debug, Clone)] +enum InternalMsg { + Retry(String), +} + +impl DependencyCoordinator { + pub fn new( + in_rx: broadcast::Receiver, + out_tx: broadcast::Sender, + app_config: Arc>, + ) -> Self { + let (retry_tx, retry_rx) = unbounded_channel(); + Self { + out_tx, + in_rx, + app_config, + managed_targets: HashSet::new(), + deferred: HashMap::new(), + retry_tx, + retry_rx, + recovering_until: HashMap::new(), + } + } + + /// 用于阻塞窗口(秒):把收到 Down/Disconnected 的目标短暂标记为recovering + const RECOVERING_HOLD_SECS: u64 = 10; + + fn mark_recovering_until(&mut self, name: &str, now: Instant) { + self.recovering_until.insert( + name.to_string(), + now + Duration::from_secs(Self::RECOVERING_HOLD_SECS), + ); + } + + fn is_recovering(&self, name: &str, now: Instant) -> bool { + self.recovering_until + .get(name) + .map(|&until| now < until) + .unwrap_or(false) + } + + fn prune_recovering(&mut self) { + let now = Instant::now(); + self.recovering_until.retain(|_, &mut until| now < until); + } + + async fn refresh_snapshot(&mut self) { + // 读取配置(限制作用域,避免与后续 &mut self 冲突) + let managed: HashSet = { + let cfg = self.app_config.read().await; + cfg.processes.iter().map(|p| p.name.clone()).collect() + }; + // 同步managed集合:任何未出现在这里的进程名都视为未受管/unknown + self.managed_targets = managed; + // 清理过期的recovering标记 + self.prune_recovering(); + } + + async fn decide_and_publish(&mut self, evt: &ProcessEvent) { + match evt { + ProcessEvent::ProcessDown { name, .. } + | ProcessEvent::ProcessDisconnected { name, .. } => { + // 刷新一次快照 + self.refresh_snapshot().await; + + // 标记该进程进入recovering窗口,用于阻塞其依赖者(不阻塞自身) + let now = Instant::now(); + self.mark_recovering_until(name, now); + + // 已存在延迟状态则忽略重复原始事件(监控高频触发),仅记录日志 + if self.deferred.contains_key(name) { + tracing::debug!(target="dep_coord", process=%name, "event ignored (already deferred)"); + return; + } + + let all_manual = self + .manual_requires(name) + .into_iter() + .filter(|d| d.hard) + .collect::>(); + // 分离已受管与未知目标 + let (manual_deps, unknown): (Vec<_>, Vec<_>) = all_manual + .into_iter() + .partition(|d| self.managed_targets.contains(&d.target)); + if !unknown.is_empty() { + let unknown_names: Vec = + unknown.into_iter().map(|d| d.target).collect(); + tracing::warn!(target="dep_coord", process=%name, unknown_deps=?unknown_names, "dependencies refer to unmanaged/unknown targets; they will not block"); + } + let deps: Vec = manual_deps.iter().map(|d| d.target.clone()).collect(); + if deps.is_empty() { + // 没有受管依赖,直接放行 + tracing::info!(target="dep_coord", process=%name, "no managed dependencies -> forward now"); + let _ = self.publish(evt.clone()); + return; + } + + // 计算阻塞:依赖中是否有目标处于recovering窗口 + let blocking: Vec = deps + .iter() + .filter(|d| d.as_str() != name) + .filter(|d| self.is_recovering(d, now)) + .cloned() + .collect(); + + if blocking.is_empty() { + // 首次出现依赖但当前无阻塞 -> 放行并提示 + tracing::info!(target="dep_coord", process=%name, deps=?deps, "dependencies present, none blocking -> forward"); + let _ = self.publish(evt.clone()); + } else { + // 进入延迟(记录每个依赖的 max_wait_secs / on_failure) + self.defer_process(name.clone(), evt.clone(), manual_deps, blocking) + .await; + } + } + // 其它事件(例如恢复成功/失败)目前直接透传 + _ => { + let _ = self.publish(evt.clone()); + } + } + } + + fn manual_requires(&self, name: &str) -> Vec { + if let Some(proc_cfg) = self + .app_config + .try_read() + .ok() + .and_then(|cfg| cfg.processes.iter().find(|p| p.name == name).cloned()) + { + return proc_cfg + .resolved_dependencies() + .into_iter() + .filter(|d| d.kind == DependencyKind::Requires) + .collect(); + } + Vec::new() + } + + async fn defer_process( + &mut self, + name: String, + original_event: ProcessEvent, + manual_deps: Vec, + blocking: Vec, + ) { + let now = Instant::now(); + let per_deps: Vec = manual_deps + .into_iter() + .filter(|d| d.hard && d.kind == DependencyKind::Requires) + .map(|cfg| PerDepState { + cfg, + status: DepWaitStatus::Waiting, + }) + .collect(); + let deps: Vec = per_deps.iter().map(|d| d.cfg.target.clone()).collect(); + let state = DeferredState { + original_event, + deferred_count: 1, + first_deferred_at: now, + last_eval_at: now, + next_retry_at: now + Duration::from_secs(5), + deps: per_deps, + waiting_on: blocking.clone(), + }; + self.deferred.insert(name.clone(), state); + tracing::warn!(target="dep_coord", process=%name, deps=?deps, waiting_on=?blocking, backoff_s=5, "deferred recovery (blocking dependencies)" ); + // 通过内部通道安排一次定时重试(见 run_loop 的 retry 分支) + self.schedule_retry(name, Duration::from_secs(5)); + } + + fn schedule_retry(&self, name: String, delay: Duration) { + let tx = self.retry_tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(delay).await; + // 发送内部重试消息,唤醒协调器评估该进程是否可放行 + let _ = tx.send(InternalMsg::Retry(name)); + }); + } + + fn compute_backoff(prev_attempts: u32) -> Duration { + match prev_attempts { + 0 | 1 => Duration::from_secs(5), + 2 => Duration::from_secs(10), + 3 => Duration::from_secs(20), + _ => Duration::from_secs(30), + } + } + + /// 计算当前仍在阻塞的依赖集合: + /// 条件:依赖状态为 Waiting + Requires + hard;目标不是自身; + /// 目标是受管(或配置热更后忽略未受管);且该目标当前也在 deferred 中。 + fn compute_currently_blocking(&self, name: &str, state: &DeferredState) -> HashSet { + let mut set = HashSet::new(); + for d in &state.deps { + if d.status == DepWaitStatus::Waiting + && d.cfg.kind == DependencyKind::Requires + && d.cfg.hard + && d.cfg.target != name + && self.managed_targets.contains(&d.cfg.target) + && self.deferred.contains_key(&d.cfg.target) + { + set.insert(d.cfg.target.clone()); + } + } + set + } + + async fn handle_retry(&mut self, name: String) { + let mut remove_and_forward = None; + let mut drop_due_to_abort = false; + // 拿到当前状态的克隆关键信息(避免持有可变引用期间再借 self) + let (first_deferred_at, orig_event_opt, prev_attempts) = + if let Some(state) = self.deferred.get(&name) { + ( + state.first_deferred_at, + Some(state.original_event.clone()), + state.deferred_count, + ) + } else { + (Instant::now(), None, 0) + }; + + if orig_event_opt.is_none() { + return; + } + + self.refresh_snapshot().await; + // 评估当前哪些依赖仍然在阻塞(仍处于 deferred 集合中) + let mut currently_blocking: HashSet = HashSet::new(); + if let Some(state) = self.deferred.get(&name) { + // 使用recovering窗口作为阻塞依据 + let now = Instant::now(); + for d in &state.deps { + if d.status == DepWaitStatus::Waiting + && d.cfg.kind == DependencyKind::Requires + && d.cfg.hard + && d.cfg.target != name + && self.managed_targets.contains(&d.cfg.target) + && self.is_recovering(&d.cfg.target, now) + { + currently_blocking.insert(d.cfg.target.clone()); + } + } + } + + // 应用超时策略:针对仍阻塞的依赖,若超过 max_wait_secs 则根据 on_failure 处理 + if let Some(state) = self.deferred.get_mut(&name) { + let now = Instant::now(); + for dep in &mut state.deps { + if dep.status == DepWaitStatus::Waiting + && currently_blocking.contains(&dep.cfg.target) + { + let deadline = first_deferred_at + Duration::from_secs(dep.cfg.max_wait_secs); + if now >= deadline { + match dep.cfg.on_failure { + OnFailure::Abort => { + drop_due_to_abort = true; + tracing::error!(target="dep_coord", process=%name, dependency=%dep.cfg.target, waited_s=%dep.cfg.max_wait_secs, attempts=prev_attempts, "dependency timeout -> abort handling this process event"); + // 不再等待其他依赖,直接退出循环 + } + OnFailure::Skip | OnFailure::Degrade => { + dep.status = DepWaitStatus::TimedOut; + let action = match dep.cfg.on_failure { + OnFailure::Skip => "skip", + OnFailure::Degrade => "degrade", + _ => "", + }; + tracing::warn!(target="dep_coord", process=%name, dependency=%dep.cfg.target, waited_s=%dep.cfg.max_wait_secs, policy=%action, "dependency timeout -> will ignore this dependency"); + } + } + } + } + if drop_due_to_abort { + break; + } + } + } + + if drop_due_to_abort { + // 丢弃事件并移除延迟状态(Abort 策略) + self.deferred.remove(&name); + return; + } + + // 重新计算是否仍有阻塞的依赖(状态仍为 Waiting 且当前在阻塞集内) + let mut still_blocking: Vec = Vec::new(); + if let Some(state) = self.deferred.get(&name) { + for d in &state.deps { + if d.status == DepWaitStatus::Waiting && currently_blocking.contains(&d.cfg.target) + { + still_blocking.push(d.cfg.target.clone()); + } + } + } + + if still_blocking.is_empty() { + if let Some(state) = self.deferred.get(&name) { + tracing::info!(target="dep_coord", process=%name, deferred_for=?state.first_deferred_at.elapsed(), "release deferred process (no more blocking or timed out per policy)"); + } + remove_and_forward = orig_event_opt; + } else { + if let Some(state) = self.deferred.get_mut(&name) { + state.deferred_count += 1; + state.waiting_on = still_blocking.clone(); + state.last_eval_at = Instant::now(); + let backoff = Self::compute_backoff(state.deferred_count); + state.next_retry_at = Instant::now() + backoff; + tracing::warn!(target="dep_coord", process=%name, attempts=state.deferred_count, waiting_on=?state.waiting_on, next_retry_s=backoff.as_secs(), "still blocked, reschedule retry"); + self.schedule_retry(name.clone(), backoff); + } + } + if let Some(evt) = remove_and_forward { + self.deferred.remove(&name); + let _ = self.publish(evt); + } + } + + /// 运行主事件循环:持续从上游接收事件并决策/转发 + pub async fn run_loop(mut self) { + loop { + tokio::select! { + biased; + maybe_msg = self.retry_rx.recv() => { + if let Some(InternalMsg::Retry(name)) = maybe_msg { + // 接收内部重试消息,进入一次评估/重试周期 + self.handle_retry(name).await; + } else if maybe_msg.is_none() { + tracing::warn!(target="dep_coord", "internal retry channel closed"); + } + } + recv_res = self.in_rx.recv() => { + match recv_res { + Ok(evt) => self.decide_and_publish(&evt).await, + Err(broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(target = "dep_coord", missed = n, "lagged, missed events"); + } + Err(broadcast::error::RecvError::Closed) => { + tracing::warn!(target = "dep_coord", "upstream channel closed, exiting run_loop"); + break; + } + } + } + } + } + } +} + +impl Publisher for DependencyCoordinator { + fn publish( + &self, + event: ProcessEvent, + ) -> Result> { + self.out_tx.send(event) + } +} + +#[async_trait] +impl Subscriber for DependencyCoordinator { + async fn handle_event(&mut self, event: ProcessEvent) { + self.decide_and_publish(&event).await; + } +} diff --git a/healer/src/core_logic.rs b/healer/src/core_logic.rs index aac45ac..97df798 100644 --- a/healer/src/core_logic.rs +++ b/healer/src/core_logic.rs @@ -39,15 +39,21 @@ async fn daemon_core_logic(config: Arc>, config_path: PathBuf) info!("Application Core Logic: Starting up and initializing components..."); // 1. 创建事件总线 - let event_sender = event_bus::create_event_sender(); + // 事件通道拆分:monitors -> coordinator_in, coordinator_out -> healer + let monitor_event_sender = event_bus::create_event_sender(); + let coordinator_event_sender = event_bus::create_event_sender(); info!("Application Core Logic: Event bus created."); // 2. 初始化各个管理器,包括配置管理器喝监视器管理器 let config_manager = ConfigManager::new(Arc::clone(&config), config_path); - let mut monitor_manager = MonitorManager::new(event_sender.clone()).await?; + let mut monitor_manager = MonitorManager::new(monitor_event_sender.clone()).await?; // 3. 启动持久性后台服务 - ServiceManager::spawn_persistent_services(&event_sender, &config); + ServiceManager::spawn_persistent_services( + &monitor_event_sender, + &coordinator_event_sender, + &config, + ); info!("Application Core Logic: Persistent services started."); // 4. 进行初始配置协调 diff --git a/healer/src/event_bus.rs b/healer/src/event_bus.rs index f0460fa..5c9effe 100644 --- a/healer/src/event_bus.rs +++ b/healer/src/event_bus.rs @@ -1,4 +1,3 @@ -use crate::config::ProcessConfig; use std::path::PathBuf; use tokio::sync::broadcast; @@ -6,9 +5,25 @@ const CHANNEL_CAPACITY: usize = 128; #[derive(Clone, Debug)] pub enum ProcessEvent { - ProcessDown { name: String, pid: u32 }, - ProcessDisconnected { name: String, url: String }, + ProcessDown { + name: String, + pid: u32, + }, + ProcessDisconnected { + name: String, + url: String, + }, + #[allow(dead_code)] + ProcessDependencyDetected { + name: String, + dependencies: Vec, + }, + #[allow(dead_code)] + ProcessRestartSuccess {}, + #[allow(dead_code)] + ProcessRestartFailed {}, } +#[allow(dead_code)] pub struct RestartProcessConfig { pub name: String, pub command: String, diff --git a/healer/src/main.rs b/healer/src/main.rs index e47ce24..abdbcd6 100644 --- a/healer/src/main.rs +++ b/healer/src/main.rs @@ -1,11 +1,13 @@ mod config; mod config_manager; +mod coordinator; // expose dependency coordinator mod core_logic; mod daemon_handler; mod event_bus; mod logger; mod monitor; mod monitor_manager; +mod publisher; mod service_manager; mod signal_handler; mod subscriber; @@ -17,7 +19,6 @@ use tokio::sync::RwLock; fn main() { println!("Attempting to load the config"); - // let config_file_path = Path::new("config.yaml"); let config_file_path_str = "config.yaml"; let absolue_config_path = match std::fs::canonicalize(config_file_path_str) { Ok(path) => path, diff --git a/healer/src/monitor.rs b/healer/src/monitor.rs index 570ef1f..0dc0e80 100644 --- a/healer/src/monitor.rs +++ b/healer/src/monitor.rs @@ -2,7 +2,6 @@ use async_trait::async_trait; pub mod ebpf_monitor; pub mod network_monitor; pub mod pid_monitor; -use sysinfo::{RefreshKind, System}; #[async_trait] pub trait Monitor: Send + Sync { // 启动并运行监控任务。 diff --git a/healer/src/monitor/ebpf_monitor.rs b/healer/src/monitor/ebpf_monitor.rs index 93c4efc..5fd39a5 100644 --- a/healer/src/monitor/ebpf_monitor.rs +++ b/healer/src/monitor/ebpf_monitor.rs @@ -1,5 +1,5 @@ use super::Monitor; -use crate::{config::EbpfMonitorConfig, event_bus::ProcessEvent, utils}; +use crate::{config::EbpfMonitorConfig, event_bus::ProcessEvent, publisher::Publisher, utils}; use anyhow::Result; use anyhow::anyhow; use async_trait::async_trait; @@ -23,6 +23,21 @@ pub struct EbpfMonitor { process_name_mapping: Arc>>, // truncated_name -> full_config_name task_handles: Vec>, // 保存后台任务句柄 shutdown_flag: Arc, // 关闭标志 + out_tx: broadcast::Sender, // 发布通道 +} + +#[derive(Clone)] +struct TxPublisher { + tx: broadcast::Sender, +} + +impl Publisher for TxPublisher { + fn publish( + &self, + event: ProcessEvent, + ) -> Result> { + self.tx.send(event) + } } impl EbpfMonitor { @@ -52,7 +67,9 @@ impl EbpfMonitor { let process_name_mapping = Arc::new(Mutex::new(collections::HashMap::new())); for cpu_id in online_cpus().unwrap() { let perf_buf = events.open(cpu_id, None)?; - let tx_clone = event_tx.clone(); + let publisher = TxPublisher { + tx: event_tx.clone(), + }; let fd = perf_buf.as_raw_fd(); let async_fd = AsyncFd::new(fd)?; let shutdown_flag_clone = shutdown_flag.clone(); @@ -100,7 +117,7 @@ impl EbpfMonitor { cpu_id, event.pid, comm_str ); let send_result = - tx_clone.send(ProcessEvent::ProcessDown { + publisher.publish(ProcessEvent::ProcessDown { name: process_name.clone(), pid: event.pid, }); @@ -173,6 +190,7 @@ impl EbpfMonitor { process_name_mapping, task_handles, shutdown_flag, + out_tx: event_tx, }) } pub async fn wait_and_publish(&mut self) {} @@ -357,3 +375,12 @@ impl Monitor for EbpfMonitor { "EbpfMonitor".to_string() } } + +impl Publisher for EbpfMonitor { + fn publish( + &self, + event: ProcessEvent, + ) -> Result> { + self.out_tx.send(event) + } +} diff --git a/healer/src/monitor/network_monitor.rs b/healer/src/monitor/network_monitor.rs index 688572a..575cf97 100644 --- a/healer/src/monitor/network_monitor.rs +++ b/healer/src/monitor/network_monitor.rs @@ -1,24 +1,15 @@ -use crate::{ - config::{self, NetworkMonitorConfig}, - event_bus::ProcessEvent, - monitor::Monitor, -}; +use crate::publisher::Publisher; +use crate::{config::NetworkMonitorConfig, event_bus::ProcessEvent, monitor::Monitor}; use async_trait::async_trait; -use reqwest::Response; -use std::{io::Read, result}; -use tokio::{io::AsyncReadExt, sync::broadcast}; -use tokio::{net::TcpStream, time}; -use tracing::{debug, error, info, warn}; +use tokio::{sync::broadcast, time}; +use tracing::{debug, info, warn}; pub struct NetworkMonitor { config: NetworkMonitorConfig, event_tx: broadcast::Sender, } impl NetworkMonitor { pub fn new(config: NetworkMonitorConfig, event_tx: broadcast::Sender) -> Self { - Self { - config, - event_tx, - } + Self { config, event_tx } } pub fn check_interval(&self) -> u64 { self.config.interval_secs @@ -75,7 +66,7 @@ impl NetworkMonitor { self.config.name, self.config.target_url ); - match self.event_tx.send(event) { + match self.publish(event) { Ok(receiver_count) => { debug!( "[{}] Sent ProcessDisconnected event for HTTP {} to {} receivers", @@ -101,3 +92,12 @@ impl Monitor for NetworkMonitor { self.config.name.clone() } } + +impl Publisher for NetworkMonitor { + fn publish( + &self, + event: ProcessEvent, + ) -> Result> { + self.event_tx.send(event) + } +} diff --git a/healer/src/monitor/pid_monitor.rs b/healer/src/monitor/pid_monitor.rs index 558ecfc..e1c5c84 100644 --- a/healer/src/monitor/pid_monitor.rs +++ b/healer/src/monitor/pid_monitor.rs @@ -1,9 +1,8 @@ // src/monitor/pid_monitor.rs use async_trait::async_trait; -use nix::Error as NixError; use nix::errno::Errno; -use nix::sys::signal::{Signal as NixSignal, kill}; +use nix::sys::signal::kill; use nix::unistd::Pid; use tokio::fs; use tokio::sync::broadcast; @@ -13,6 +12,7 @@ use tracing::{debug, warn}; use super::Monitor; use crate::config::PidMonitorConfig; use crate::event_bus::ProcessEvent; +use crate::publisher::Publisher; use tracing::info; pub struct PidMonitor { config: PidMonitorConfig, @@ -36,7 +36,7 @@ impl PidMonitor { self.config.name, pid ); - match self.event_tx.send(event) { + match self.publish(event) { Ok(receiver_count) => { debug!( "[{}] Sent ProcessDown event for PID {} to {} receivers", @@ -134,3 +134,12 @@ impl Monitor for PidMonitor { self.monitor_task_loop().await; } } + +impl Publisher for PidMonitor { + fn publish( + &self, + event: ProcessEvent, + ) -> Result> { + self.event_tx.send(event) + } +} diff --git a/healer/src/publisher.rs b/healer/src/publisher.rs new file mode 100644 index 0000000..6d5d2fe --- /dev/null +++ b/healer/src/publisher.rs @@ -0,0 +1,9 @@ +use crate::event_bus::ProcessEvent; +use tokio::sync::broadcast; + +pub trait Publisher { + fn publish( + &self, + event: ProcessEvent, + ) -> Result>; +} diff --git a/healer/src/service_manager.rs b/healer/src/service_manager.rs index fe04c79..b0ab7d1 100644 --- a/healer/src/service_manager.rs +++ b/healer/src/service_manager.rs @@ -1,5 +1,6 @@ use crate::{ config::AppConfig, + coordinator::dependency_coordinator::DependencyCoordinator, event_bus::ProcessEvent, subscriber::{Subscriber, process_healer::ProcessHealer}, }; @@ -16,19 +17,23 @@ pub struct ServiceManager; impl ServiceManager { /// 启动所有持久性后台服务 pub fn spawn_persistent_services( - event_sender: &broadcast::Sender, + monitor_event_sender: &broadcast::Sender, + coordinator_event_sender: &broadcast::Sender, config: &Arc>, ) { - Self::spawn_process_healer(event_sender, config); + // 先启动协调器(监听 monitor_event_sender,输出到 coordinator_event_sender) + Self::spawn_dependency_coordinator(monitor_event_sender, coordinator_event_sender, config); + // Healer 监听协调器输出通道 + Self::spawn_process_healer(coordinator_event_sender, config); Self::spawn_zombie_reaper(); } /// 启动进程自愈服务 fn spawn_process_healer( - event_sender: &broadcast::Sender, + coordinator_event_sender: &broadcast::Sender, config: &Arc>, ) { - let healer_receiver = event_sender.subscribe(); + let healer_receiver = coordinator_event_sender.subscribe(); let healer_config = Arc::clone(config); tokio::spawn(async move { @@ -54,6 +59,22 @@ impl ServiceManager { }); } + /// 启动依赖协调器 + fn spawn_dependency_coordinator( + monitor_event_sender: &broadcast::Sender, + coordinator_event_sender: &broadcast::Sender, + config: &Arc>, + ) { + let in_rx = monitor_event_sender.subscribe(); + let out_tx = coordinator_event_sender.clone(); + let cfg = Arc::clone(config); + tokio::spawn(async move { + let coordinator = DependencyCoordinator::new(in_rx, out_tx, cfg); + tracing::info!("ServiceManager: DependencyCoordinator service started."); + coordinator.run_loop().await; + }); + } + /// 启动僵尸进程清理服务 fn spawn_zombie_reaper() { tokio::spawn(async { -- Gitee