From 0410783ea51214153c4c76da68bee444806d7717 Mon Sep 17 00:00:00 2001 From: lxq <191991518@qq.com> Date: Sun, 28 Sep 2025 09:20:34 +0800 Subject: [PATCH] fix: fix test bug by removing the ensure_test_binaries fix hot-update bug fix bug that after hot-update, once shut down the tet process,it will not be healed since circuit open test: add regression test about hot reload add reconcile test --- README.md | 84 ++++ healer-ebpf/src/main.rs | 2 +- healer/build.rs | 4 +- healer/src/config_manager.rs | 45 ++- .../src/coordinator/dependency_coordinator.rs | 4 +- healer/src/core_logic.rs | 27 +- healer/src/daemon_handler.rs | 6 +- healer/src/main.rs | 48 ++- healer/src/monitor/ebpf_monitor.rs | 4 +- healer/src/monitor_manager.rs | 76 +++- healer/src/service_manager.rs | 6 +- healer/src/subscriber/process_healer.rs | 375 +++++++++--------- healer/tests/coordinator.rs | 2 +- healer/tests/ebpf_e2e.rs | 35 +- healer/tests/hot_reload.rs | 95 +++++ healer/tests/monitor_reconcile.rs | 112 ++++++ healer/tests/process_e2e.rs | 52 +-- 17 files changed, 700 insertions(+), 277 deletions(-) create mode 100644 healer/tests/hot_reload.rs create mode 100644 healer/tests/monitor_reconcile.rs diff --git a/README.md b/README.md index fe05d3f..5a74f5b 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,90 @@ HEALER_TEST_INHERIT_STDIO=1 RUST_LOG=info cargo test -p healer --test process_e2 # ebpf测试检查的命令示例(需要HEALER_EBPF_E2E=1,同时可执行文件要以sudo权限执行) HEALER_EBPF_E2E=1 HEALER_TEST_INHERIT_STDIO=1 RUST_LOG=info CARGO_TERM_COLOR=always cargo test -p healer --test ebpf_e2e --config 'target."cfg(all())".runner="sudo -E"' -- --ignored --nocapture --color=always ``` + +### 测试用例 + +#### 单元测试 +1. **defers_then_releases_on_timeout_skip** + - 用两条进程配置:A 依赖 B(Requires,hard=true,max_wait_secs=1,on_failure=Skip)。 + - 先发出 B 下线事件,标记 B 正在恢复。 + - 再发出 A 下线事件,期望此时 A 被“延后”(短超时内不应被转发)。 + - 等待一段时间(约 5–8 秒窗口),因为策略是 Skip 且 B 没恢复,A 会在超时后被放行转发。 + - 断言:前期未转发 A,后期成功转发 A(验证“延后 + 超时释放”的策略生效)。 + +#### 集成测试 + +1. **restart_on_pid_exit_and_circuit_breaker** + - 步骤 + - 启动被测进程与 Healer(前台模式便于观测)。 + - 第一次 kill 被测进程,验证 Healer 自动拉起(PID 变化、日志出现 “Successfully restarted process”)。 + - 在 retry_window_secs 窗口内连续多次 kill,触发 retries 上限。 + - 观察进入 Open(冷却)期,期间再次 kill 不应触发立即拉起。 + - 冷却结束后再次 kill,Healer 应恢复重试并拉起。 + - 期望 + - 第一次 kill 后成功拉起(PID 变化)。 + - 窗口内多次 kill 触发熔断,冷却期间不再拉起。 + - 冷却结束后恢复拉起。 + - 断言要点 + - 读取 PID 文件比对前后 PID。 + - 日志含 “Circuit breaker is open” 与恢复成功日志各至少一次。 + +2. **network_monitor_detects_crash_and_recovers** + - 步骤 + - 正常启动服务与 Healer,健康检查通过。 + - 让 HTTP 服务自杀/退出,使连接失败(例如监听套接字关闭)。 + - NetworkMonitor 发送 ProcessDisconnected 事件,Healer 尝试拉起目标进程(按 command+args)。 + - 期望 + - 服务退出后,收到 ProcessDisconnected 事件。 + - Healer 触发恢复流程并拉起(若此服务即为被监控与被恢复的同一目标)。 + - 断言要点 + - 订阅 coordinator_event_sender,捕获 ProcessDisconnected { name, url }。 + - 拉起后新进程输出被重定向到日志文件。 + +3. **ebpf_detects_exit_and_recovers** + - 步骤 + - 检查环境变量 HEALER_EBPF_E2E=1,仅在显式开启时运行(需要 root 权限)。 + - 清理可能残留的测试进程(healer 和 test_process)。 + - 构建 eBPF 监控配置,指定 monitor.type: "ebpf",recovery 策略为 retries=3, retry_window_secs=10, cooldown_secs=5。 + - 先启动被监控的 test_process 进程,等待其创建 PID 文件。 + - 启动 Healer(前台模式,便于观测),等待 3 秒让 eBPF 初始化和 watch 生效。 + - 记录基线 PID,通过 kill -9 强制终止被监控进程。 + - 等待最多 20 秒,检查 PID 文件变化,验证 Healer 通过 eBPF 事件检测到进程退出并拉起新进程。 + - 期望 + - eBPF tracepoint 成功附加到内核 sched_process_exit 事件。 + - 进程被 kill 后,eBPF 监控器捕获退出事件并发送 ProcessDown 事件。 + - Healer 接收到事件后成功拉起新进程(PID 发生变化)。 + - 整个恢复过程在 20 秒内完成。 + - 断言要点 + - 基线 PID 必须大于 0。 + - 恢复后的新 PID 必须不同于原 PID。 + - 测试结束时正确清理所有测试进程。 + - 测试环境要求 + - 必须设置环境变量 `HEALER_EBPF_E2E=1` 才会执行,需要 root 权限。 + +4. **hot_reload_allows_new_process_recovery** + - 步骤 + - 启动 `ProcessHealer` 并加载仅包含进程 `alpha` 的初始配置。 + - 通过热更新替换为只包含进程 `beta` 的新配置,其恢复命令会在临时目录中写入标记文件。 + - 先对移除的 `alpha` 执行一次恢复以确保熔断状态被清理,再对 `beta` 调用恢复。 + - 期望 + - 旧进程的熔断状态被移除,不再阻塞新的恢复请求。 + - `beta` 的恢复命令被执行,标记文件被正确写入。 + - 断言要点 + - 临时目录中不存在旧标记文件。 + - 恢复后标记文件出现且内容包含 `hot` 字样。 + +5. **reconcile_starts_stops_pid_and_network_monitors** + - 步骤 + - 构造包含 PID 与 Network 两类启用进程的配置,调用 `MonitorManager::reconcile`,确认对应监控器被创建。 + - 再次调用 `reconcile` 时移除 PID 进程,仅保留 Network 进程。 + - 期望 + - 初次调度后,PID 与网络监控任务均处于运行状态,禁用进程不会被启动。 + - 第二次调度后,移除的 PID 监控任务被停止,仅保留网络监控任务。 + - 断言要点 + - 通过 `running_monitor_names` 检查当前活跃监控器集合的变化。 + - 测试结束调用 `shutdown` 释放后台任务。 + ## 软件架构 Healer 是一个面向关键进程自愈场景的轻量守护进程,当前已实现的核心要点: diff --git a/healer-ebpf/src/main.rs b/healer-ebpf/src/main.rs index be71b73..590f735 100644 --- a/healer-ebpf/src/main.rs +++ b/healer-ebpf/src/main.rs @@ -5,10 +5,10 @@ #[cfg(feature = "build-ebpf")] mod ebpf { use aya_ebpf::{ - EbpfContext, macros::{map, tracepoint}, maps::{HashMap, PerfEventArray}, programs::TracePointContext, + EbpfContext, }; use aya_log_ebpf::info; use healer_common::ProcessExitEvent; diff --git a/healer/build.rs b/healer/build.rs index 666f802..ca62600 100644 --- a/healer/build.rs +++ b/healer/build.rs @@ -1,5 +1,5 @@ -use anyhow::{Context as _, anyhow}; -use aya_build::{Toolchain, cargo_metadata}; +use anyhow::{anyhow, Context as _}; +use aya_build::{cargo_metadata, Toolchain}; fn main() -> anyhow::Result<()> { let cargo_metadata::Metadata { packages, .. } = cargo_metadata::MetadataCommand::new() diff --git a/healer/src/config_manager.rs b/healer/src/config_manager.rs index b67ee2f..f99bd84 100644 --- a/healer/src/config_manager.rs +++ b/healer/src/config_manager.rs @@ -2,7 +2,7 @@ use crate::config::AppConfig; use anyhow::Result; use std::sync::Arc; use tokio::sync::RwLock; -use tracing::{error, info}; +use tracing::{debug, error, info, warn}; // 配置管理器,负责配置的加载和热更新 pub struct ConfigManager { @@ -24,10 +24,49 @@ impl ConfigManager { "ConfigManager: Reloading configuration from {:?}", self.config_path ); + // 先加载到临时变量,避免持锁期间做IO + let load_start = std::time::Instant::now(); + let load_result = AppConfig::load_from_file(&self.config_path); + debug!( + elapsed_ms = load_start.elapsed().as_millis() as u64, + "ConfigManager: load_from_file completed" + ); - match AppConfig::load_from_file(&self.config_path) { + match load_result { Ok(new_config) => { - let mut config_guard = self.config.write().await; + debug!("ConfigManager: Attempting to acquire write lock for config swap"); + let lock_start = std::time::Instant::now(); + let mut config_guard = match tokio::time::timeout( + std::time::Duration::from_secs(3), + self.config.write(), + ) + .await + { + Ok(g) => g, + Err(_) => { + warn!("ConfigManager: Timeout waiting for write lock (possible read-lock held across await). Falling back to blocking acquire with periodic logging."); + let mut waited_ms = 0u64; + loop { + match self.config.try_write() { + Ok(g) => break g, + Err(_) => { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + waited_ms += 50; + if waited_ms % 500 == 0 { + debug!( + waited_ms, + "ConfigManager: still waiting for write lock..." + ); + } + } + } + } + } + }; + debug!( + wait_ms = lock_start.elapsed().as_millis() as u64, + "ConfigManager: Acquired write lock, swapping config" + ); *config_guard = new_config; info!("ConfigManager: Configuration reloaded successfully."); Ok(()) diff --git a/healer/src/coordinator/dependency_coordinator.rs b/healer/src/coordinator/dependency_coordinator.rs index eb8127d..d547921 100644 --- a/healer/src/coordinator/dependency_coordinator.rs +++ b/healer/src/coordinator/dependency_coordinator.rs @@ -8,9 +8,9 @@ use async_trait::async_trait; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::RwLock; use tokio::sync::broadcast; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::RwLock; // no dependency graph imports needed diff --git a/healer/src/core_logic.rs b/healer/src/core_logic.rs index 97df798..fab9b94 100644 --- a/healer/src/core_logic.rs +++ b/healer/src/core_logic.rs @@ -57,13 +57,17 @@ async fn daemon_core_logic(config: Arc>, config_path: PathBuf) info!("Application Core Logic: Persistent services started."); // 4. 进行初始配置协调 + { - let config_guard = config.read().await; - debug!( - "Loaded {} process configurations:", - config_guard.processes.len() - ); - for (i, process_config) in config_guard.processes.iter().enumerate() { + let processes_snapshot = { + let guard = config.read().await; + debug!( + "Initial load: {} process configurations", + guard.processes.len() + ); + guard.processes.clone() + }; + for (i, process_config) in processes_snapshot.iter().enumerate() { debug!( "Process {}: name='{}', command='{}'", i + 1, @@ -79,7 +83,7 @@ async fn daemon_core_logic(config: Arc>, config_path: PathBuf) ); } } - monitor_manager.reconcile(&config_guard.processes).await?; + monitor_manager.reconcile(&processes_snapshot).await?; } info!("Application Core Logic: Initial reconciliation completed."); @@ -95,9 +99,12 @@ async fn daemon_core_logic(config: Arc>, config_path: PathBuf) continue; } - // 重新协调监控器 - let config_guard = config.read().await; - if let Err(e) = monitor_manager.reconcile(&config_guard.processes).await { + // 重新协调监控器(同样避免持 read 锁跨 await) + let processes_snapshot = { + let guard = config.read().await; + guard.processes.clone() + }; + if let Err(e) = monitor_manager.reconcile(&processes_snapshot).await { error!("Core Logic: Failed to reconcile monitors: {}", e); } } diff --git a/healer/src/daemon_handler.rs b/healer/src/daemon_handler.rs index 4712f56..fc3a949 100644 --- a/healer/src/daemon_handler.rs +++ b/healer/src/daemon_handler.rs @@ -43,8 +43,10 @@ pub fn run_as_daemon( where F: FnOnce() + Send + 'static, { - let config_guard = config.blocking_read(); - let daemon_config = config_guard.to_daemonize_config(); + let daemon_config = { + let config_guard = config.blocking_read(); + config_guard.to_daemonize_config() + }; println!("Starting the daemon process"); println!("PID file: {:?}", daemon_config.pid_file); diff --git a/healer/src/main.rs b/healer/src/main.rs index 59b49f9..57518e4 100644 --- a/healer/src/main.rs +++ b/healer/src/main.rs @@ -15,7 +15,7 @@ mod utils; use config::AppConfig; use daemon_handler::run_as_daemon; use std::env; -use std::path::{PathBuf}; +use std::path::PathBuf; use tokio::sync::RwLock; use clap::Parser; @@ -38,11 +38,15 @@ struct Cli { } fn candidate_config_paths(explicit: Option) -> Vec { - if let Some(p) = explicit { return vec![p]; } + if let Some(p) = explicit { + return vec![p]; + } let mut cands = Vec::new(); // 1. Environment variable - if let Ok(p) = env::var("HEALER_CONFIG") { cands.push(PathBuf::from(p)); } + if let Ok(p) = env::var("HEALER_CONFIG") { + cands.push(PathBuf::from(p)); + } // 2. Current working directory cands.push(PathBuf::from("./config.yaml")); cands.push(PathBuf::from("./healer.yaml")); @@ -50,16 +54,28 @@ fn candidate_config_paths(explicit: Option) -> Vec { cands.push(PathBuf::from("/etc/healer/config.yaml")); cands.push(PathBuf::from("/etc/healer/healer.yaml")); // 4. XDG config home if set - if let Ok(home) = env::var("XDG_CONFIG_HOME") { cands.push(PathBuf::from(home).join("healer/config.yaml")); } + if let Ok(home) = env::var("XDG_CONFIG_HOME") { + cands.push(PathBuf::from(home).join("healer/config.yaml")); + } // 5. ~/.config/healer/config.yaml - if let Some(home_dir) = dirs_next::home_dir() { cands.push(home_dir.join(".config/healer/config.yaml")); } + if let Some(home_dir) = dirs_next::home_dir() { + cands.push(home_dir.join(".config/healer/config.yaml")); + } cands } fn resolve_config_path(cli: &Cli) -> PathBuf { - if let Some(explicit) = &cli.config { return explicit.clone(); } - if let Ok(env_path) = env::var("HEALER_CONFIG") { return PathBuf::from(env_path); } - for cand in candidate_config_paths(None) { if cand.exists() { return cand; } } + if let Some(explicit) = &cli.config { + return explicit.clone(); + } + if let Ok(env_path) = env::var("HEALER_CONFIG") { + return PathBuf::from(env_path); + } + for cand in candidate_config_paths(None) { + if cand.exists() { + return cand; + } + } // Fallback default (will likely fail later if missing) PathBuf::from("config.yaml") } @@ -71,7 +87,10 @@ fn main() { let raw_config_path = resolve_config_path(&cli); println!("Config resolution: using {:?}", raw_config_path); - if cli.print_config_path { println!("{:?}", raw_config_path); return; } + if cli.print_config_path { + println!("{:?}", raw_config_path); + return; + } // Expand & canonicalize for safety let absolute_config_path = match std::fs::canonicalize(&raw_config_path) { @@ -82,12 +101,16 @@ fn main() { } }; - let initial_config = AppConfig::load_from_file(&absolute_config_path).expect("初始配置加载失败"); + let initial_config = + AppConfig::load_from_file(&absolute_config_path).expect("初始配置加载失败"); let shared_config = std::sync::Arc::new(RwLock::new(initial_config)); // Detect foreground from either flag or env let env_foreground = matches!( - env::var("HEALER_NO_DAEMON").unwrap_or_else(|_| "0".into()).to_ascii_lowercase().as_str(), + env::var("HEALER_NO_DAEMON") + .unwrap_or_else(|_| "0".into()) + .to_ascii_lowercase() + .as_str(), "1" | "true" | "yes" ); let run_foreground = cli.foreground || env_foreground; @@ -103,7 +126,8 @@ fn main() { let config_for_closure = std::sync::Arc::clone(&shared_config); let path_for_closure = absolute_config_path.clone(); - let core_logic_closure = move || core_logic::async_runtime(config_for_closure, path_for_closure); + let core_logic_closure = + move || core_logic::async_runtime(config_for_closure, path_for_closure); match run_as_daemon(shared_config, core_logic_closure) { Ok(_) => println!("Main program: Core logic quit"), Err(e) => println!("Main program: Core logic error with {:?}", e), diff --git a/healer/src/monitor/ebpf_monitor.rs b/healer/src/monitor/ebpf_monitor.rs index 5fd39a5..2516df0 100644 --- a/healer/src/monitor/ebpf_monitor.rs +++ b/healer/src/monitor/ebpf_monitor.rs @@ -1,9 +1,9 @@ use super::Monitor; use crate::{config::EbpfMonitorConfig, event_bus::ProcessEvent, publisher::Publisher, utils}; -use anyhow::Result; use anyhow::anyhow; +use anyhow::Result; use async_trait::async_trait; -use aya::{Ebpf, maps::PerfEventArray, programs::TracePoint, util::online_cpus}; +use aya::{maps::PerfEventArray, programs::TracePoint, util::online_cpus, Ebpf}; use bytes::BytesMut; use healer_common::ProcessExitEvent; use std::time::Duration; diff --git a/healer/src/monitor_manager.rs b/healer/src/monitor_manager.rs index 926ce76..53731d6 100644 --- a/healer/src/monitor_manager.rs +++ b/healer/src/monitor_manager.rs @@ -2,8 +2,8 @@ use crate::{ config::ProcessConfig, event_bus::ProcessEvent, monitor::{ - Monitor, ebpf_monitor::EbpfMonitor, network_monitor::NetworkMonitor, - pid_monitor::PidMonitor, + ebpf_monitor::EbpfMonitor, network_monitor::NetworkMonitor, pid_monitor::PidMonitor, + Monitor, }, }; use anyhow::Result; @@ -11,7 +11,7 @@ use std::collections::HashMap; use std::time::Duration; use tokio::sync::broadcast; use tokio::task::JoinHandle; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; // 监控器管理器,负责统一管理不同类型的监控器 pub struct MonitorManager { @@ -50,9 +50,41 @@ impl MonitorManager { }) } + /// Construct a monitor manager without attempting to initialize the global eBPF monitor. + /// This is primarily intended for tests or environments where the eBPF artifacts are + /// unavailable (for example, non-root CI runtimes). + #[allow(dead_code)] + pub fn new_without_ebpf(event_sender: broadcast::Sender) -> Self { + Self { + ebpf_monitor: None, + watched_ebpf_configs: HashMap::new(), + running_monitors: HashMap::new(), + event_sender, + } + } + + /// Returns the names of non-eBPF monitors that are currently running. + #[allow(dead_code)] + pub fn running_monitor_names(&self) -> Vec { + self.running_monitors.keys().cloned().collect() + } + + /// Returns the names of processes currently watched by the eBPF monitor. + #[allow(dead_code)] + pub fn watched_ebpf_names(&self) -> Vec { + self.watched_ebpf_configs.keys().cloned().collect() + } + // 根据新的配置更新所有监控器 pub async fn reconcile(&mut self, processes: &[ProcessConfig]) -> Result<()> { info!("MonitorManager: Starting reconciliation..."); + debug!( + total_processes = processes.len(), + "Reconcile invoked with processes" + ); + for (idx, p) in processes.iter().enumerate() { + debug!(index = idx, name = %p.name, enabled = p.enabled, monitor = ?p.monitor, "Incoming process config"); + } // 分离不同类型的监控配置, ebpf和其他的pid network监视器都略有不同 let (ebpf_configs, not_ebpf_configs): (Vec<_>, Vec<_>) = processes @@ -60,6 +92,13 @@ impl MonitorManager { .filter(|p| p.enabled) .partition(|p| p.get_ebpf_monitor_config().is_some()); + debug!( + enabled_total = ebpf_configs.len() + not_ebpf_configs.len(), + ebpf_count = ebpf_configs.len(), + other_count = not_ebpf_configs.len(), + "Enabled processes split into ebpf / others" + ); + // 更新 eBPF 监控器 self.reconcile_ebpf_monitors(ebpf_configs).await?; @@ -79,7 +118,7 @@ impl MonitorManager { warn!("MonitorManager: eBPF monitor not available, skipping eBPF reconciliation."); return Ok(()); }; - + debug!(current_watched = %self.watched_ebpf_configs.keys().cloned().collect::>().join(","), desired = %desired_configs.iter().map(|c| c.name.clone()).collect::>().join(","), "Reconciling eBPF monitors"); // 构建期望的配置映射 let desired_configs_map: HashMap = desired_configs .into_iter() @@ -94,6 +133,9 @@ impl MonitorManager { .cloned() .collect(); + if !configs_to_remove.is_empty() { + debug!(to_remove = %configs_to_remove.join(","), "eBPF configs scheduled for removal"); + } for name in configs_to_remove { if let Some(config) = self.watched_ebpf_configs.remove(&name) { if let Some(ebpf_config) = config.get_ebpf_monitor_config() { @@ -111,6 +153,7 @@ impl MonitorManager { // 添加新的监控 for (name, config) in desired_configs_map { if !self.watched_ebpf_configs.contains_key(&name) { + debug!(name = %name, "eBPF watch not present - will add"); if let Some(ebpf_config) = config.get_ebpf_monitor_config() { info!("MonitorManager: Adding eBPF watch for process '{}'", name); match ebpf_monitor.watch_config(ebpf_config).await { @@ -125,6 +168,8 @@ impl MonitorManager { } } } + } else { + debug!(name = %name, "eBPF watch already exists (no change)"); } } @@ -139,6 +184,8 @@ impl MonitorManager { .map(|config| (config.name.clone(), config)) .collect(); + debug!(current_running = %self.running_monitors.keys().cloned().collect::>().join(","), desired = %desired_configs_map.keys().cloned().collect::>().join(","), "Reconciling non-eBPF monitors"); + // 停止不再需要的监控器 let monitors_to_stop: Vec = self .running_monitors @@ -146,7 +193,9 @@ impl MonitorManager { .filter(|name| !desired_configs_map.contains_key(*name)) .cloned() .collect(); - + if !monitors_to_stop.is_empty() { + debug!(to_stop = %monitors_to_stop.join(","), "Non-eBPF monitors scheduled to stop"); + } for name in monitors_to_stop { info!( "MonitorManager: Stopping not-ebpf monitor for process '{}'", @@ -163,8 +212,19 @@ impl MonitorManager { // 启动新的监控器或重启已结束的监控器 for (name, process_config) in desired_configs_map { let should_start = match self.running_monitors.get(&name) { - Some(handle) => handle.is_finished(), - None => true, + Some(handle) => { + let finished = handle.is_finished(); + if finished { + debug!(process = %name, "Existing monitor task finished - will restart"); + } else { + debug!(process = %name, "Monitor already running - no restart needed"); + } + finished + } + None => { + debug!(process = %name, "No existing monitor - will start"); + true + } }; if should_start { @@ -184,6 +244,8 @@ impl MonitorManager { let monitor = NetworkMonitor::new(network_config, self.event_sender.clone()); let handle = tokio::spawn(monitor.run()); self.running_monitors.insert(name.clone(), handle); + } else { + debug!(process = %name, "Process has no recognized monitor config after filtering (unexpected)"); } } } diff --git a/healer/src/service_manager.rs b/healer/src/service_manager.rs index b0ab7d1..65ca9a3 100644 --- a/healer/src/service_manager.rs +++ b/healer/src/service_manager.rs @@ -2,13 +2,13 @@ use crate::{ config::AppConfig, coordinator::dependency_coordinator::DependencyCoordinator, event_bus::ProcessEvent, - subscriber::{Subscriber, process_healer::ProcessHealer}, + subscriber::{process_healer::ProcessHealer, Subscriber}, }; use nix::errno::Errno; -use nix::sys::wait::{WaitPidFlag, WaitStatus, waitpid}; +use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus}; use std::sync::Arc; use tokio::signal::unix::{self, SignalKind}; -use tokio::sync::{RwLock, broadcast}; +use tokio::sync::{broadcast, RwLock}; use tracing::{debug, error, info, warn}; /// 服务管理器,负责管理持久性后台任务 diff --git a/healer/src/subscriber/process_healer.rs b/healer/src/subscriber/process_healer.rs index 0869c88..08b8d9b 100644 --- a/healer/src/subscriber/process_healer.rs +++ b/healer/src/subscriber/process_healer.rs @@ -7,7 +7,7 @@ use std::os::unix::process::CommandExt; use std::process::{Command, Stdio}; use std::{fs, sync::Arc, time::Instant}; use tokio::sync::RwLock; -use tokio::sync::{Mutex, broadcast}; +use tokio::sync::{broadcast, Mutex}; use tracing::{debug, info, warn}; use users::get_user_by_name; @@ -73,232 +73,217 @@ impl ProcessHealer { ); return; } - let config_guard = + // 限定 read 锁作用域:只在获取并克隆需要的配置期间持有,避免后续阻塞操作(文件IO、spawn)长期占用读锁 + let process_config_opt = { match tokio::time::timeout(std::time::Duration::from_secs(5), self.app_config.read()) .await { - Ok(guard) => guard, - Err(_) => { - warn!( - target = "healer_action", - process_name = %name, - "Failed to acquire config lock within timeout, skipping recovery." - ); - return; - } - }; + Ok(guard) => guard.get_process_config_for(&name).cloned(), + Err(_) => None, + } + }; // 读锁在这里释放 - if let Some(process_config) = config_guard.get_process_config_for(&name) { - let command_to_run = &process_config.command; - let args = &process_config.args; - info!(target = "healer_event", process_name = %name, "Parsed the restart command. Conducting recovery."); + if process_config_opt.is_none() { + warn!( + target = "healer_action", + process_name = %name, + "No configuration found for process." + ); + return; + } + let process_config = process_config_opt.unwrap(); - // 创建日志目录(如果不存在) - if let Err(e) = std::fs::create_dir_all("/var/log/healer") { - warn!(target = "healer_action", process_name = %name, error = %e, "Failed to create log directory, using /tmp"); - } + let command_to_run = &process_config.command; + let args = &process_config.args; + info!(target = "healer_event", process_name = %name, "Parsed the restart command. Conducting recovery."); - let child_log_path = format!("/var/log/healer/{}.restarted.log", name); - let child_output_file = match fs::File::create(&child_log_path) { - Ok(file) => file, - Err(e) => { - warn!( - target = "healer_action", - process_name = %name, - error = %e, - "Failed to create log file, trying /tmp" - ); - // 尝试在/tmp创建日志文件 - let fallback_path = format!("/tmp/healer_{}.restarted.log", name); - match fs::File::create(&fallback_path) { - Ok(file) => file, - Err(e2) => { - tracing::error!( - target = "healer_action", - process_name = %name, - error = %e2, - "Failed to create fallback log file, aborting recovery." - ); - return; - } + // 创建日志目录(如果不存在) + if let Err(e) = std::fs::create_dir_all("/var/log/healer") { + warn!(target = "healer_action", process_name = %name, error = %e, "Failed to create log directory, using /tmp"); + } + + let child_log_path = format!("/var/log/healer/{}.restarted.log", name); + let child_output_file = match fs::File::create(&child_log_path) { + Ok(file) => file, + Err(e) => { + warn!( + target = "healer_action", + process_name = %name, + error = %e, + "Failed to create log file, trying /tmp" + ); + // 尝试在/tmp创建日志文件 + let fallback_path = format!("/tmp/healer_{}.restarted.log", name); + match fs::File::create(&fallback_path) { + Ok(file) => file, + Err(e2) => { + tracing::error!( + target = "healer_action", + process_name = %name, + error = %e2, + "Failed to create fallback log file, aborting recovery." + ); + return; } } - }; + } + }; - let mut command = Command::new(command_to_run); - command.args(args); + let mut command = Command::new(command_to_run); + command.args(args); - // 改进的权限处理 - if !process_config.run_as_root { - if let Some(username) = &process_config.run_as_user { - match get_user_by_name(username) { - Some(user) => { - command.uid(user.uid()); - command.gid(user.primary_group_id()); - info!(target: "healer_action", process_name = %name, user = %username, uid = %user.uid(), "Dropping privileges to run as specified user."); - } - None => { - warn!(target: "healer_action", process_name = %name, user = %username, "Specified user not found. Process will run as root. This is a security risk."); - } + // 改进的权限处理 + if !process_config.run_as_root { + if let Some(username) = &process_config.run_as_user { + match get_user_by_name(username) { + Some(user) => { + command.uid(user.uid()); + command.gid(user.primary_group_id()); + info!(target: "healer_action", process_name = %name, user = %username, uid = %user.uid(), "Dropping privileges to run as specified user."); + } + None => { + warn!(target: "healer_action", process_name = %name, user = %username, "Specified user not found. Process will run as root. This is a security risk."); } - } else { - warn!(target: "healer_action", process_name = %name, "run_as_root is false but no run_as_user specified. Process will run as root."); } + } else { + warn!(target: "healer_action", process_name = %name, "run_as_root is false but no run_as_user specified. Process will run as root."); } + } - // 被恢复的进程重定向io - command.stdout(Stdio::from(child_output_file.try_clone().unwrap())); - command.stderr(Stdio::from(child_output_file)); + // 被恢复的进程重定向io + command.stdout(Stdio::from(child_output_file.try_clone().unwrap())); + command.stderr(Stdio::from(child_output_file)); - match command.spawn() { - Ok(child) => { - info!(target = "healer_event", process_name = %name, process_pid = %child.id(), "Successfully restarted process."); - } - Err(e) => { - tracing::error!(target = "healer_action", + match command.spawn() { + Ok(child) => { + info!(target = "healer_event", process_name = %name, process_pid = %child.id(), "Successfully restarted process."); + } + Err(e) => { + tracing::error!(target = "healer_action", process_name = %name, error = %e, "Failed to restart process. This might be due to permission issues or invalid command path."); - } } - } else { - warn!( - target = "healer_action", - process_name = %name, - "No configuration found for process." - ); } } async fn check_circuit_breaker(&mut self, name: &String) -> bool { - match self.process_recovery_windows.lock().await.get_mut(name) { - Some(stats) => { - debug!("[{}] Checking circuit breaker state.", name); - match stats.recovery_state { - State::Closed => { - let config_guard = self.app_config.read().await; - if let Some(process_config) = config_guard.get_process_config_for(name) { - if let RecoveryConfig::Regular(regular_healer_fields) = - &process_config.recovery - { - //遍历一遍,把时间超过窗口期的恢复次数删除掉 - //Rust不用for和迭代器遍历,retain 来判断窗口期时间 - stats.recovery_session_starts.retain(|start_time| { - start_time.elapsed().as_secs() - < regular_healer_fields.retry_window_secs - }); + let process_config = { + let cfg = self.app_config.read().await; + cfg.get_process_config_for(name).cloned() + }; - if stats.recovery_session_starts.len() - == regular_healer_fields.retries as usize - { - //熔断了,切换为开路,设置好冷却时间,同时重置历史累计的回复次数 - stats.recovery_state = State::Open; - stats.in_cooldown_until = Some( - Instant::now() - + std::time::Duration::from_secs( - regular_healer_fields.cooldown_secs, - ), - ); - stats.recovery_session_starts.clear(); - return true; - } else { - stats.recovery_session_starts.push_back(Instant::now()); - debug!( - "Process {} has tried {} times", - &name, - stats.recovery_session_starts.len() - ); - return false; - } - } else if let RecoveryConfig::NotRegular(_) = &process_config.recovery { - warn!("Shouldn'be here, NotRegular is not implemented yet"); - return false; // 开还是关的返回值还未确定,健壮性有待商榷 (Todo) - } else { - warn!("No recovery configuration found for process {}", name); - return false; // 开还是关的返回值还未确定,健壮性有待商榷 - } - } else { - warn!("No configuration found for process {}", name); - return false; - } - } - State::Open => { - let now = Instant::now(); - let cooldown_secs = { - let cfg = self.app_config.read().await; - cfg.get_process_config_for(name) - .and_then(|p| match &p.recovery { - RecoveryConfig::Regular(fields) => Some(fields.cooldown_secs), - _ => None, - }) - .unwrap_or(5) - }; + let Some(process_config) = process_config else { + warn!("No configuration found for process {}", name); + self.process_recovery_windows.lock().await.remove(name); + return false; + }; - if let Some(cooldown_until) = stats.in_cooldown_until { - if now < cooldown_until { - return true; // 仍在冷却 - } - } else { - // 不应出现:Open 却无冷却时间,立即补齐一个冷却窗口 - warn!( - "Open state without cooldown for process {}. Reinstating cooldown.", - name - ); - stats.in_cooldown_until = - Some(now + std::time::Duration::from_secs(cooldown_secs)); - return true; - } + let mut windows = self.process_recovery_windows.lock().await; + let stats = windows + .entry(name.clone()) + .or_insert_with(ProcessRecoveryStats::default); - // 冷却结束 -> 进入半开,允许一次尝试 - stats.recovery_state = State::HalfOpen; + debug!("[{}] Checking circuit breaker state.", name); + + match stats.recovery_state { + State::Closed => { + if let RecoveryConfig::Regular(regular_healer_fields) = &process_config.recovery { + stats.recovery_session_starts.retain(|start_time| { + start_time.elapsed().as_secs() < regular_healer_fields.retry_window_secs + }); + + if stats.recovery_session_starts.len() == regular_healer_fields.retries as usize + { + stats.recovery_state = State::Open; + stats.in_cooldown_until = Some( + Instant::now() + + std::time::Duration::from_secs( + regular_healer_fields.cooldown_secs, + ), + ); stats.recovery_session_starts.clear(); - stats.half_open_safe_until = Some(now + std::time::Duration::from_secs(2)); // 可配置化:半开观察期 + return true; + } else { + stats.recovery_session_starts.push_back(Instant::now()); + debug!( + "Process {} has tried {} times", + &name, + stats.recovery_session_starts.len() + ); return false; } - State::HalfOpen => { - // 半开状态,尝试恢复 - if let Some(safe_until) = stats.half_open_safe_until { - let now = Instant::now(); - if now < safe_until { - // 半开尝试失败:在安全时间内再次触发恢复,回退到Open并重新开始冷却 - let cooldown_secs = { - let cfg = self.app_config.read().await; - cfg.get_process_config_for(name) - .and_then(|p| match &p.recovery { - RecoveryConfig::Regular(fields) => { - Some(fields.cooldown_secs) - } - _ => None, - }) - .unwrap_or(5) - }; - warn!( - "Process {} is in half-open; attempt failed within safe window. Back to open (cooldown).", - name - ); - stats.recovery_state = State::Open; - stats.in_cooldown_until = - Some(now + std::time::Duration::from_secs(cooldown_secs)); - stats.half_open_safe_until = None; - stats.recovery_session_starts.clear(); - return true; // 冷却中,阻断恢复 - } else { - // 半开成功:稳定通过安全窗口,关闭熔断 - stats.recovery_state = State::Closed; - stats.half_open_safe_until = None; - stats.recovery_session_starts.clear(); - return false; // 允许恢复 + } else if let RecoveryConfig::NotRegular(_) = &process_config.recovery { + warn!("Shouldn't be here, NotRegular is not implemented yet"); + return false; + } else { + warn!("No recovery configuration found for process {}", name); + return false; + } + } + State::Open => { + let now = Instant::now(); + let cooldown_secs = match &process_config.recovery { + RecoveryConfig::Regular(fields) => fields.cooldown_secs, + _ => { + warn!("Shouldn't be here, NotRegular is not implemented yet"); + 5 + } + }; + + if let Some(cooldown_until) = stats.in_cooldown_until { + if now < cooldown_until { + return true; + } + } else { + warn!( + "Open state without cooldown for process {}. Reinstating cooldown.", + name + ); + stats.in_cooldown_until = + Some(now + std::time::Duration::from_secs(cooldown_secs)); + return true; + } + + stats.recovery_state = State::HalfOpen; + stats.recovery_session_starts.clear(); + stats.half_open_safe_until = Some(now + std::time::Duration::from_secs(2)); + false + } + State::HalfOpen => { + if let Some(safe_until) = stats.half_open_safe_until { + let now = Instant::now(); + if now < safe_until { + let cooldown_secs = match &process_config.recovery { + RecoveryConfig::Regular(fields) => fields.cooldown_secs, + _ => { + warn!("Shouldn't be here, NotRegular is not implemented yet"); + 5 } - } else { - warn!("Half-open state without safe time set for process {}", name); - stats.recovery_state = State::Closed; - stats.half_open_safe_until = None; - stats.recovery_session_starts.clear(); - return false; // 可以恢复 - } + }; + warn!( + "Process {} is in half-open; attempt failed within safe window. Back to open (cooldown).", + name + ); + stats.recovery_state = State::Open; + stats.in_cooldown_until = + Some(now + std::time::Duration::from_secs(cooldown_secs)); + stats.half_open_safe_until = None; + stats.recovery_session_starts.clear(); + return true; + } else { + stats.recovery_state = State::Closed; + stats.half_open_safe_until = None; + stats.recovery_session_starts.clear(); + return false; } + } else { + warn!("Half-open state without safe time set for process {}", name); + stats.recovery_state = State::Closed; + stats.half_open_safe_until = None; + stats.recovery_session_starts.clear(); + return false; } } - None => true, } } } diff --git a/healer/tests/coordinator.rs b/healer/tests/coordinator.rs index f4b2e65..f93e1ca 100644 --- a/healer/tests/coordinator.rs +++ b/healer/tests/coordinator.rs @@ -3,7 +3,7 @@ use healer::config::{ ProcessConfig, RawDependency, RecoveryConfig, RegularHealerFields, }; use healer::coordinator::dependency_coordinator::DependencyCoordinator; -use healer::event_bus::{ProcessEvent, create_event_sender}; +use healer::event_bus::{create_event_sender, ProcessEvent}; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock; diff --git a/healer/tests/ebpf_e2e.rs b/healer/tests/ebpf_e2e.rs index 550831b..6b8e8fc 100644 --- a/healer/tests/ebpf_e2e.rs +++ b/healer/tests/ebpf_e2e.rs @@ -7,10 +7,13 @@ use std::time::Duration; fn workspace_root() -> PathBuf { // CARGO_MANIFEST_DIR 指向 healer 子 crate;集成测试期望使用工作区根目录(其父目录) let crate_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - crate_dir.parent().map(|p| p.to_path_buf()).unwrap_or(crate_dir) + crate_dir + .parent() + .map(|p| p.to_path_buf()) + .unwrap_or(crate_dir) } fn cleanup_stray_processes() { - let base = workspace_root(); + let base = workspace_root(); let healer_bin = base.join("target/debug/healer"); let test_helper = base.join("target/debug/test_process"); @@ -121,7 +124,7 @@ fn ensure_test_binaries() { err ); } - + // 使用与 process_e2e.rs 相同的测试二进制代码 let helper_src = format!( r#"fn main(){{ @@ -134,19 +137,19 @@ fn ensure_test_binaries() { pids_dir.display(), pids_dir.display() ); - + let bin_dir = base.join("target").join("debug"); let test_bin_src = helper_src_dir.join("test_process_ebpf.rs"); write_file(test_bin_src.to_str().unwrap(), &helper_src); - + // 为 eBPF 测试使用独立的二进制名称 let out_bin = bin_dir.join("test_process_ebpf"); - + // 如果已存在可执行文件,跳过编译,避免在 sudo 环境下找不到 rustc if out_bin.exists() { return; } - + // 尝试编译,如果失败则提供有用的错误信息 let result = try_compile_test_binary(&test_bin_src, &out_bin); match result { @@ -154,9 +157,11 @@ fn ensure_test_binaries() { Err(e) => { eprintln!("Warning: Failed to compile eBPF test binary: {}", e); eprintln!("Hint: If running with sudo, try pre-compiling the binary:"); - eprintln!(" rustc -O {} -o {}", - test_bin_src.display(), - out_bin.display()); + eprintln!( + " rustc -O {} -o {}", + test_bin_src.display(), + out_bin.display() + ); eprintln!("Or set RUSTC environment variable to point to rustc executable."); panic!("Cannot proceed without test binary"); } @@ -169,7 +174,9 @@ fn try_compile_test_binary(src: &std::path::Path, out: &std::path::Path) -> Resu // 首先尝试环境变量 std::env::var("RUSTC").ok(), // 尝试使用 which 命令 - Command::new("which").arg("rustc").output() + Command::new("which") + .arg("rustc") + .output() .ok() .map(|out| String::from_utf8_lossy(&out.stdout).trim().to_string()) .filter(|s| !s.is_empty()), @@ -181,11 +188,11 @@ fn try_compile_test_binary(src: &std::path::Path, out: &std::path::Path) -> Resu // 最后尝试直接调用 Some("rustc".to_string()), ]; - + for rustc_opt in rustc_candidates.into_iter().flatten() { if let Ok(status) = Command::new(&rustc_opt) .args(["-O", src.to_str().unwrap(), "-o", out.to_str().unwrap()]) - .status() + .status() { if status.success() { return Ok(()); @@ -194,7 +201,7 @@ fn try_compile_test_binary(src: &std::path::Path, out: &std::path::Path) -> Resu } } } - + Err("Could not find rustc executable".to_string()) } diff --git a/healer/tests/hot_reload.rs b/healer/tests/hot_reload.rs new file mode 100644 index 0000000..ee78d38 --- /dev/null +++ b/healer/tests/hot_reload.rs @@ -0,0 +1,95 @@ +use healer::config::{ + AppConfig, MonitorConfig, PidMonitorFields, ProcessConfig, RecoveryConfig, RegularHealerFields, +}; +use healer::subscriber::process_healer::ProcessHealer; +use std::path::Path; +use std::sync::Arc; +use tempfile::TempDir; +use tokio::sync::broadcast; +use tokio::sync::RwLock; +use tokio::time::{sleep, Duration}; + +fn make_process( + name: &str, + command: impl Into, + args: Vec, + pid_dir: &Path, +) -> ProcessConfig { + ProcessConfig { + name: name.to_string(), + enabled: true, + command: command.into(), + args, + run_as_user: None, + run_as_root: true, + working_dir: None, + monitor: MonitorConfig::Pid(PidMonitorFields { + pid_file_path: pid_dir.join(format!("{name}.pid")), + interval_secs: 1, + }), + recovery: RecoveryConfig::Regular(RegularHealerFields { + retries: 3, + retry_window_secs: 60, + cooldown_secs: 30, + }), + dependencies: vec![], + } +} + +fn make_config(base_dir: &Path, processes: Vec) -> AppConfig { + AppConfig { + log_level: None, + log_directory: Some(base_dir.join("logs")), + pid_file_directory: Some(base_dir.join("pids")), + processes, + working_directory: Some(base_dir.to_path_buf()), + } +} + +#[tokio::test] +async fn hot_reload_allows_new_process_recovery() -> anyhow::Result<()> { + let temp_dir = TempDir::new()?; + let base_path = temp_dir.path(); + let pid_dir = base_path.join("pids"); + std::fs::create_dir_all(&pid_dir)?; + + let initial_config = make_config( + base_path, + vec![make_process("alpha", "/bin/true", vec![], &pid_dir)], + ); + let shared = Arc::new(RwLock::new(initial_config)); + let (tx, _) = broadcast::channel(8); + let rx = tx.subscribe(); + + let mut healer = ProcessHealer::new(rx, Arc::clone(&shared)).await; + + let marker_path = base_path.join("beta_marker.log"); + let script = format!("echo hot >> {}", marker_path.display()); + let beta_process = make_process("beta", "/bin/sh", vec!["-c".into(), script], &pid_dir); + + { + let mut guard = shared.write().await; + *guard = make_config(base_path, vec![beta_process]); + } + + healer.heal_process(&"alpha".to_string()).await; + assert!( + !marker_path.exists(), + "Marker file should not exist before beta recovery is triggered" + ); + + healer.heal_process(&"beta".to_string()).await; + sleep(Duration::from_millis(300)).await; + + assert!( + marker_path.exists(), + "Expected beta recovery command to run after reload" + ); + let content = std::fs::read_to_string(&marker_path)?; + assert!( + content.contains("hot"), + "Marker file should contain recovery output" + ); + + Ok(()) +} diff --git a/healer/tests/monitor_reconcile.rs b/healer/tests/monitor_reconcile.rs new file mode 100644 index 0000000..544f41c --- /dev/null +++ b/healer/tests/monitor_reconcile.rs @@ -0,0 +1,112 @@ +use healer::config::{ + MonitorConfig, NetworkMonitorFields, PidMonitorFields, ProcessConfig, RecoveryConfig, + RegularHealerFields, +}; +use healer::event_bus::create_event_sender; +use healer::monitor_manager::MonitorManager; + +fn pid_process(name: &str, pid_path: &str) -> ProcessConfig { + ProcessConfig { + name: name.into(), + enabled: true, + command: "/bin/true".into(), + args: vec![], + run_as_user: None, + run_as_root: true, + working_dir: None, + monitor: MonitorConfig::Pid(PidMonitorFields { + pid_file_path: pid_path.into(), + interval_secs: 1, + }), + recovery: RecoveryConfig::Regular(RegularHealerFields { + retries: 3, + retry_window_secs: 30, + cooldown_secs: 10, + }), + dependencies: vec![], + } +} + +fn network_process(name: &str, url: &str) -> ProcessConfig { + ProcessConfig { + name: name.into(), + enabled: true, + command: "/bin/true".into(), + args: vec![], + run_as_user: None, + run_as_root: true, + working_dir: None, + monitor: MonitorConfig::Network(NetworkMonitorFields { + target_url: url.into(), + interval_secs: 1, + }), + recovery: RecoveryConfig::Regular(RegularHealerFields { + retries: 3, + retry_window_secs: 30, + cooldown_secs: 10, + }), + dependencies: vec![], + } +} + +fn disabled_process(name: &str) -> ProcessConfig { + ProcessConfig { + name: name.into(), + enabled: false, + command: "/bin/true".into(), + args: vec![], + run_as_user: None, + run_as_root: true, + working_dir: None, + monitor: MonitorConfig::Pid(PidMonitorFields { + pid_file_path: "/tmp/ignore.pid".into(), + interval_secs: 1, + }), + recovery: RecoveryConfig::Regular(RegularHealerFields { + retries: 3, + retry_window_secs: 30, + cooldown_secs: 10, + }), + dependencies: vec![], + } +} + +#[tokio::test] +async fn reconcile_starts_stops_pid_and_network_monitors() { + let event_tx = create_event_sender(); + let mut manager = MonitorManager::new_without_ebpf(event_tx); + + let initial = vec![ + pid_process("pid_a", "/tmp/pid_a.pid"), + network_process("net_a", "http://localhost:1234/health"), + disabled_process("pid_disabled"), + ]; + + manager + .reconcile(&initial) + .await + .expect("initial reconcile should succeed"); + + let mut running = manager.running_monitor_names(); + running.sort(); + assert_eq!( + running, + vec!["net_a".to_string(), "pid_a".to_string()], + "PID and network monitors should start for enabled processes" + ); + + let updated = vec![network_process("net_a", "http://localhost:1234/health")]; + manager + .reconcile(&updated) + .await + .expect("second reconcile should succeed"); + + let running_after = manager.running_monitor_names(); + assert_eq!( + running_after, + vec!["net_a".to_string()], + "PID monitor should stop when the process is removed" + ); + + manager.shutdown().await; +} diff --git a/healer/tests/process_e2e.rs b/healer/tests/process_e2e.rs index 36069d5..0131c26 100644 --- a/healer/tests/process_e2e.rs +++ b/healer/tests/process_e2e.rs @@ -16,37 +16,37 @@ struct TestContext { impl TestContext { fn new() -> Self { let temp_dir = TempDir::new().expect("Failed to create temp directory"); - + // 动态分配端口 let port = find_free_port(); - + Self { temp_dir, port, children: Vec::new(), } } - + fn temp_path(&self) -> &std::path::Path { self.temp_dir.path() } - + fn logs_dir(&self) -> PathBuf { let logs = self.temp_path().join("logs"); fs::create_dir_all(&logs).expect("Failed to create logs directory"); logs } - + fn pids_dir(&self) -> PathBuf { let pids = self.temp_path().join("pids"); fs::create_dir_all(&pids).expect("Failed to create pids directory"); pids } - + fn add_child(&mut self, child: Child) { self.children.push(child); } - + fn cleanup(&mut self) { // 清理所有子进程 for mut child in self.children.drain(..) { @@ -72,7 +72,10 @@ fn find_free_port() -> u16 { fn workspace_root() -> PathBuf { // CARGO_MANIFEST_DIR 指向 healer 子 crate;集成测试期望使用工作区根目录(其父目录) let crate_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); - crate_dir.parent().map(|p| p.to_path_buf()).unwrap_or(crate_dir) + crate_dir + .parent() + .map(|p| p.to_path_buf()) + .unwrap_or(crate_dir) } fn write_file(path: &PathBuf, content: &str) { @@ -92,7 +95,7 @@ fn spawn_healer_foreground(config_path: &PathBuf) -> Child { "info,healer::monitor::pid_monitor=debug,healer_action=debug,healer_event=info,dep_coord=debug".to_string() }), ); - + // 测试时可继承 stdio(HEALER_TEST_INHERIT_STDIO=1) let inherit_stdio = std::env::var("HEALER_TEST_INHERIT_STDIO") .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes")) @@ -142,7 +145,7 @@ fn build_pid_only_config(ctx: &TestContext) -> String { let logs_dir = ctx.logs_dir(); let pids_dir = ctx.pids_dir(); let test_id = ctx.temp_path().file_name().unwrap().to_string_lossy(); - + format!( r#" log_level: "info" @@ -178,7 +181,7 @@ processes: fn build_network_only_config(ctx: &TestContext) -> String { let logs_dir = ctx.logs_dir(); let pids_dir = ctx.pids_dir(); - + format!( r#" log_level: "info" @@ -223,12 +226,12 @@ fn ensure_test_binaries(ctx: &TestContext) { pids_dir.display(), pids_dir.display() ); - + let base = workspace_root(); let bin_dir = base.join("target").join("debug"); let test_bin_src = ctx.temp_path().join("test_process.rs"); write_file(&test_bin_src, &helper_src); - + // 为每个测试创建唯一的二进制文件名 let test_id = ctx.temp_path().file_name().unwrap().to_string_lossy(); let out_bin = bin_dir.join(format!("test_process_{}", test_id)); @@ -248,7 +251,7 @@ fn ensure_test_binaries(ctx: &TestContext) { fn restart_on_pid_exit_and_circuit_breaker() { let mut ctx = TestContext::new(); ensure_test_binaries(&ctx); - + let cfg_text = build_pid_only_config(&ctx); let cfg_path = ctx.temp_path().join("it_config.yaml"); write_file(&cfg_path, &cfg_text); @@ -256,13 +259,16 @@ fn restart_on_pid_exit_and_circuit_breaker() { // 先启动 helper,保证 PID 文件存在 let base = workspace_root(); let test_id = ctx.temp_path().file_name().unwrap().to_string_lossy(); - let helper_bin = base.join("target").join("debug").join(format!("test_process_{}", test_id)); + let helper_bin = base + .join("target") + .join("debug") + .join(format!("test_process_{}", test_id)); let mut initial = Command::new(helper_bin) .stdout(Stdio::null()) .stderr(Stdio::null()) .spawn() .expect("failed to spawn initial test_process"); - + // 直接写入 PID 文件,消除 helper 自行写入的竞态 let pid_path = ctx.pids_dir().join("counter.pid"); let initial_pid = initial.id() as i32; @@ -309,10 +315,10 @@ fn restart_on_pid_exit_and_circuit_breaker() { .ok() .and_then(|s| s.trim().parse().ok()) .unwrap_or(0); - + // 验证我们有有效的初始 PID assert!(first_pid > 0, "No valid initial PID found: {}", first_pid); - + if first_pid > 0 { kill_by_pid(first_pid); // 等待初始进程退出 @@ -410,8 +416,8 @@ fn restart_on_pid_exit_and_circuit_breaker() { #[test] fn network_monitor_detects_crash_and_recovers() { let mut ctx = TestContext::new(); - ensure_test_binaries(&ctx); - + // ensure_test_binaries(&ctx); + let dummy_py = format!( r#"import http.server, socketserver, sys socketserver.TCPServer.allow_reuse_address = True @@ -429,7 +435,7 @@ with socketserver.TCPServer(("", PORT), H) as srv: "#, ctx.port ); - + let py_path = ctx.temp_path().join("dummy_service.py"); write_file(&py_path, &dummy_py); @@ -452,7 +458,7 @@ with socketserver.TCPServer(("", PORT), H) as srv: stream.read_to_string(&mut buf).ok()?; Some(buf) } - + let is_healthy = |port: u16| -> bool { http_get(port, "/health") .map(|r| r.starts_with("HTTP/1.1 200") || r.contains("OK")) @@ -465,7 +471,7 @@ with socketserver.TCPServer(("", PORT), H) as srv: } wait_secs(1); } - + let _ = http_get(ctx.port, "/crash"); let mut healthy = false; for _ in 0..20 { -- Gitee