diff --git a/healer/Cargo.toml b/healer/Cargo.toml index 1798ead0a4fb4dc71ac3afcdee69b917bd7d7d73..3b394787dd39fb6a0208e7d00d72422920eefbb0 100644 --- a/healer/Cargo.toml +++ b/healer/Cargo.toml @@ -32,6 +32,10 @@ aya-build = { git = "https://github.com/aya-rs/aya" } anyhow = "1" healer-ebpf = { path = "../healer-ebpf" } +[lib] +name = "healer" +path = "src/lib.rs" + [[bin]] name = "healer" path = "src/main.rs" diff --git a/healer/src/lib.rs b/healer/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..a203f0027caed622d705cf48475a253ab0b334ce --- /dev/null +++ b/healer/src/lib.rs @@ -0,0 +1,14 @@ +pub mod config; +pub mod config_manager; +pub mod coordinator; +pub mod core_logic; +pub mod daemon_handler; +pub mod event_bus; +pub mod logger; +pub mod monitor; +pub mod monitor_manager; +pub mod publisher; +pub mod service_manager; +pub mod signal_handler; +pub mod subscriber; +pub mod utils; diff --git a/healer/src/main.rs b/healer/src/main.rs index abdbcd6db36a8cfdb3e4e50487a2fdcaa61be43d..134eebb17536ec79a3ca9133efbb8361ae64a281 100644 --- a/healer/src/main.rs +++ b/healer/src/main.rs @@ -14,13 +14,27 @@ mod subscriber; mod utils; use config::AppConfig; use daemon_handler::run_as_daemon; +use std::env; use tokio::sync::RwLock; fn main() { - println!("Attempting to load the config"); + // Support overriding config path and running in foreground for tests/dev. + let config_file_path_str = + env::var("HEALER_CONFIG").unwrap_or_else(|_| "config.yaml".to_string()); + let run_foreground = matches!( + env::var("HEALER_NO_DAEMON") + .unwrap_or_else(|_| "0".to_string()) + .to_ascii_lowercase() + .as_str(), + "1" | "true" | "yes" + ); - let config_file_path_str = "config.yaml"; - let absolue_config_path = match std::fs::canonicalize(config_file_path_str) { + println!( + "Attempting to load the config from {}", + config_file_path_str + ); + + let absolue_config_path = match std::fs::canonicalize(&config_file_path_str) { Ok(path) => path, Err(e) => { eprintln!( @@ -30,8 +44,23 @@ fn main() { std::process::exit(1); } }; + let initial_config = AppConfig::load_from_file(&absolue_config_path).expect("初始配置加载失败"); let shared_config = std::sync::Arc::new(RwLock::new(initial_config)); + + if run_foreground { + // Minimal stdout logger for foreground mode; respects RUST_LOG. + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_ansi(true) + .try_init(); + + // Run core logic directly without daemonizing (useful for tests). + core_logic::async_runtime(std::sync::Arc::clone(&shared_config), absolue_config_path); + return; + } + + // Default: run as daemon. let config_for_closure = std::sync::Arc::clone(&shared_config); let path_for_closure = absolue_config_path.clone(); let core_logic_closure = diff --git a/healer/src/subscriber/process_healer.rs b/healer/src/subscriber/process_healer.rs index 994ad4abf66dba27ed17114fe60950be5215cc41..066fd3de9aa5f23798bfd1d01c050e1effaec9d3 100644 --- a/healer/src/subscriber/process_healer.rs +++ b/healer/src/subscriber/process_healer.rs @@ -225,55 +225,71 @@ impl ProcessHealer { } } State::Open => { + let now = Instant::now(); + let cooldown_secs = { + let cfg = self.app_config.read().await; + cfg.get_process_config_for(name) + .and_then(|p| match &p.recovery { + RecoveryConfig::Regular(fields) => Some(fields.cooldown_secs), + _ => None, + }) + .unwrap_or(5) + }; + if let Some(cooldown_until) = stats.in_cooldown_until { - if Instant::now() < cooldown_until { - return true; // 熔断器仍然处于开启状态 - } else { - // 如果半开重试标志为true,说明可以尝试恢复 - stats.recovery_state = State::HalfOpen; - stats.recovery_session_starts.clear(); - stats.in_cooldown_until = None; // 清除冷却时间 - stats.half_open_safe_until = - Some(Instant::now() + std::time::Duration::from_secs(2)); //Todo: 这里的2秒是个写死的,实际应该根据配置来设置或者给出默认值 - return false; + if now < cooldown_until { + return true; // 仍在冷却 } } else { - // in_cool_down_until为None也处理为可以半开路, 但是不应该走到这里 - warn!( - "Shouldn't be here, in_cooldown_until is None for process {}", - name - ); + // 不应出现:Open 却无冷却时间,立即补齐一个冷却窗口 warn!( - "No cooldown time set for process {}, assuming it can be recovered.", + "Open state without cooldown for process {}. Reinstating cooldown.", name ); - // 如果半开重试标志为true,说明 - stats.recovery_state = State::HalfOpen; - stats.recovery_session_starts.clear(); - stats.in_cooldown_until = None; // 清除冷却时间 - return false; + stats.in_cooldown_until = + Some(now + std::time::Duration::from_secs(cooldown_secs)); + return true; } + + // 冷却结束 -> 进入半开,允许一次尝试 + stats.recovery_state = State::HalfOpen; + stats.recovery_session_starts.clear(); + stats.half_open_safe_until = Some(now + std::time::Duration::from_secs(2)); // 可配置化:半开观察期 + return false; } State::HalfOpen => { // 半开状态,尝试恢复 if let Some(safe_until) = stats.half_open_safe_until { - if Instant::now() < safe_until { - //切换成Open - //可以考虑在后续实现为指数回避 (Todo) + let now = Instant::now(); + if now < safe_until { + // 半开尝试失败:在安全时间内再次触发恢复,回退到Open并重新开始冷却 + let cooldown_secs = { + let cfg = self.app_config.read().await; + cfg.get_process_config_for(name) + .and_then(|p| match &p.recovery { + RecoveryConfig::Regular(fields) => { + Some(fields.cooldown_secs) + } + _ => None, + }) + .unwrap_or(5) + }; warn!( - "Process {} is in half-open state, but safe time has not passed yet.", + "Process {} is in half-open; attempt failed within safe window. Back to open (cooldown).", name ); stats.recovery_state = State::Open; - stats.half_open_safe_until = None; // 清除半开安全时间 + stats.in_cooldown_until = + Some(now + std::time::Duration::from_secs(cooldown_secs)); + stats.half_open_safe_until = None; stats.recovery_session_starts.clear(); - return true; // 半开状态,不能恢复 + return true; // 冷却中,阻断恢复 } else { - // 半开状态结束,重置为关闭状态 + // 半开成功:稳定通过安全窗口,关闭熔断 stats.recovery_state = State::Closed; stats.half_open_safe_until = None; stats.recovery_session_starts.clear(); - return false; // 可以恢复 + return false; // 允许恢复 } } else { warn!("Half-open state without safe time set for process {}", name); diff --git a/healer/tests/coordinator.rs b/healer/tests/coordinator.rs new file mode 100644 index 0000000000000000000000000000000000000000..f4b2e65541434bdb85c80094d07e11f4ee1b2318 --- /dev/null +++ b/healer/tests/coordinator.rs @@ -0,0 +1,109 @@ +use healer::config::{ + AppConfig, DependencyConfig, DependencyKind, MonitorConfig, NetworkMonitorFields, OnFailure, + ProcessConfig, RawDependency, RecoveryConfig, RegularHealerFields, +}; +use healer::coordinator::dependency_coordinator::DependencyCoordinator; +use healer::event_bus::{ProcessEvent, create_event_sender}; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::RwLock; + +fn mk_process(name: &str, deps: Vec) -> ProcessConfig { + ProcessConfig { + name: name.to_string(), + enabled: true, + command: "/bin/true".to_string(), + args: vec![], + run_as_user: None, + run_as_root: true, + working_dir: None, + monitor: MonitorConfig::Network(NetworkMonitorFields { + target_url: "http://127.0.0.1:1/health".to_string(), + interval_secs: 60, + }), + recovery: RecoveryConfig::Regular(RegularHealerFields { + retries: 1, + retry_window_secs: 5, + cooldown_secs: 5, + }), + dependencies: deps, + } +} + +#[tokio::test] +async fn defers_then_releases_on_timeout_skip() { + let dep = RawDependency::Detailed(DependencyConfig { + target: "B".to_string(), + kind: DependencyKind::Requires, + hard: true, + max_wait_secs: 1, + on_failure: OnFailure::Skip, + }); + let cfg = AppConfig { + log_level: None, + log_directory: None, + pid_file_directory: None, + working_directory: Some(PathBuf::from("/")), + processes: vec![mk_process("A", vec![dep]), mk_process("B", vec![])], + }; + let shared = Arc::new(RwLock::new(cfg)); + + let in_tx = create_event_sender(); + let in_rx = in_tx.subscribe(); + let out_tx = create_event_sender(); + let mut out_rx = out_tx.subscribe(); + + // 启动协调器循环 + let coordinator = DependencyCoordinator::new(in_rx, out_tx.clone(), Arc::clone(&shared)); + tokio::spawn(async move { + coordinator.run_loop().await; + }); + + // 1) B 先下线,标记为恢复中 + let _ = in_tx.send(ProcessEvent::ProcessDown { + name: "B".to_string(), + pid: 123, + }); + // 留出处理时间 + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // 2) A 下线,应被延后(不应立刻转发) + let _ = in_tx.send(ProcessEvent::ProcessDown { + name: "A".to_string(), + pid: 456, + }); + + // 短超时内不应收到 A 的转发 + let mut got_immediate = false; + if let Ok(evt) = + tokio::time::timeout(std::time::Duration::from_millis(200), out_rx.recv()).await + { + // 若收到 A,则延后逻辑失败 + if let Ok(ProcessEvent::ProcessDown { name, .. }) = evt { + if name == "A" { + got_immediate = true; + } + } + } + assert!( + !got_immediate, + "A should have been deferred due to B recovering" + ); + + // 约 5s 后重试,最多等 ~8s + let mut forwarded = false; + for _ in 0..16 { + if let Ok(Ok(ProcessEvent::ProcessDown { name, .. })) = + tokio::time::timeout(std::time::Duration::from_millis(500), out_rx.recv()).await + { + if name == "A" { + forwarded = true; + break; + } + } + } + assert!( + forwarded, + "A should be forwarded after dependency timeout with Skip policy" + ); +} diff --git a/healer/tests/ebpf_e2e.rs b/healer/tests/ebpf_e2e.rs new file mode 100644 index 0000000000000000000000000000000000000000..f447e0def89129b82577925f7a9ed70018f39957 --- /dev/null +++ b/healer/tests/ebpf_e2e.rs @@ -0,0 +1,216 @@ +use std::fs; +// no extra std::io imports needed +use std::path::PathBuf; +use std::process::{Child, Command, Stdio}; +use std::thread; +use std::time::Duration; + +fn cleanup_stray_processes() { + let base = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let healer_bin = base.join("target/debug/healer"); + let test_helper = base.join("target/debug/test_process"); + + let patterns = vec![ + healer_bin.to_string_lossy().to_string(), + test_helper.to_string_lossy().to_string(), + ]; + + for pat in patterns { + let _ = Command::new("pkill").args(["-9", "-f", &pat]).status(); + } +} + +fn write_file(path: &str, content: &str) { + let p = PathBuf::from(path); + if let Some(parent) = p.parent() { + let _ = fs::create_dir_all(parent); + } + fs::write(p, content).expect("write file failed"); +} + +fn spawn_healer_foreground(config_path: &str) -> Child { + let mut cmd = Command::new(env!("CARGO_BIN_EXE_healer")); + cmd.env("HEALER_CONFIG", config_path) + .env("HEALER_NO_DAEMON", "1") + .env( + "RUST_LOG", + std::env::var("RUST_LOG").unwrap_or_else(|_| { + "info,healer::monitor::ebpf_monitor=debug,healer_action=debug,healer_event=info,dep_coord=debug".to_string() + }), + ); + let inherit_stdio = std::env::var("HEALER_TEST_INHERIT_STDIO") + .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes")) + .unwrap_or(false); + if inherit_stdio { + cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit()); + } else { + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); + } + + cmd.spawn().expect("failed to spawn healer") +} + +fn wait_secs(s: u64) { + thread::sleep(Duration::from_secs(s)); +} + +fn kill_by_pid(pid: i32) { + #[cfg(unix)] + { + let _ = Command::new("/bin/kill") + .args(["-9", &pid.to_string()]) + .status(); + } +} + +fn kill_child(mut child: Child) { + let _ = Command::new("/bin/kill") + .args(["-INT", &child.id().to_string()]) + .status(); + for _ in 0..10 { + if let Ok(Some(_)) = child.try_wait() { + return; + } + thread::sleep(Duration::from_millis(50)); + } + let _ = Command::new("/bin/kill") + .args(["-9", &child.id().to_string()]) + .status(); + let _ = child.wait(); +} + +fn build_ebpf_config(base: &str) -> String { + format!( + r#" +log_level: "info" +log_directory: "/tmp/healer-tests/logs" +pid_file_directory: "/tmp/healer-tests/pids" +working_directory: "/" + +processes: + - name: "counter_ebpf" + enabled: true + command: "{base}/target/debug/test_process" + args: [] + run_as_root: true + run_as_user: null + monitor: + type: "ebpf" + recovery: + type: "regular" + retries: 3 + retry_window_secs: 10 + cooldown_secs: 5 +"# + ) +} + +fn ensure_test_binaries() { + let helper_src = r#"fn main(){ + use std::{fs,thread,time,process,io::{self,Write}}; + let pid = process::id(); + let _ = fs::create_dir_all("/tmp/healer-tests/pids"); + let _ = fs::write("/tmp/healer-tests/pids/counter.pid", pid.to_string()); + let mut n=0u64; loop{ print!("\r[PID {}] alive {}", pid,n); let _=io::stdout().flush(); thread::sleep(time::Duration::from_secs(1)); n+=1; } + }"#; + let base = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let bin_dir = base.join("target").join("debug"); + let test_bin_src = base.join("tests/fixtures/test_process.rs"); + let _ = fs::create_dir_all(test_bin_src.parent().unwrap()); + write_file(test_bin_src.to_str().unwrap(), helper_src); + let out_bin = bin_dir.join("test_process"); + // 如果已存在可执行文件,跳过编译,避免在 sudo 环境下找不到 rustc + if out_bin.exists() { + return; + } + let status = Command::new("rustc") + .args([ + "-O", + test_bin_src.to_str().unwrap(), + "-o", + out_bin.to_str().unwrap(), + ]) + .status() + .expect("failed to run rustc for test helper"); + assert!(status.success(), "failed to build test helper bin"); +} + +#[test] +#[ignore] +fn ebpf_detects_exit_and_recovers() { + // 仅在显式开启时运行(需要 root/capabilities) + if std::env::var("HEALER_EBPF_E2E") + .ok() + .map(|v| v == "1") + .unwrap_or(false) + == false + { + eprintln!("skipped: set HEALER_EBPF_E2E=1 to run"); + return; + } + + cleanup_stray_processes(); + ensure_test_binaries(); + let base = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let cfg_text = build_ebpf_config(base.to_str().unwrap()); + let cfg_path = base.join("target/debug/ebpf_config.yaml"); + write_file(cfg_path.to_str().unwrap(), &cfg_text); + + // 先启动被监控进程,便于观察 eBPF 事件 + let helper_bin = base.join("target/debug/test_process"); + let mut child = Command::new(helper_bin) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .expect("failed to spawn test_process"); + + let pid_path = "/tmp/healer-tests/pids/counter.pid"; + for _ in 0..10 { + if fs::read_to_string(pid_path).is_ok() { + break; + } + wait_secs(1); + } + + let healer = spawn_healer_foreground(cfg_path.to_str().unwrap()); + // 等待 eBPF 初始化与 watch 生效 + wait_secs(3); + + // 基线 PID + let first_pid: i32 = fs::read_to_string(pid_path) + .ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + assert!(first_pid > 0, "invalid baseline pid"); + + // 触发退出 + kill_by_pid(first_pid); + let _ = child.wait(); + + // 期望 healer 通过 eBPF 事件拉起新进程(PID 变化) + let mut new_pid = first_pid; + for _ in 0..20 { + wait_secs(1); + if let Ok(s) = fs::read_to_string(pid_path) { + if let Ok(p) = s.trim().parse::() { + if p > 0 && p != first_pid { + new_pid = p; + break; + } + } + } + } + assert!( + new_pid > 0 && new_pid != first_pid, + "ebpf did not trigger restart" + ); + + // 清理 + kill_child(healer); + if let Ok(s) = fs::read_to_string(pid_path) { + if let Ok(p) = s.trim().parse::() { + kill_by_pid(p); + } + } + cleanup_stray_processes(); +} diff --git a/healer/tests/fixtures/dummy_service.py b/healer/tests/fixtures/dummy_service.py new file mode 100644 index 0000000000000000000000000000000000000000..9e02afd2b0bef2201aa83878e05ee71cdb8ad17d --- /dev/null +++ b/healer/tests/fixtures/dummy_service.py @@ -0,0 +1,13 @@ +import http.server, socketserver, sys +socketserver.TCPServer.allow_reuse_address = True +PORT=8080 +class H(http.server.SimpleHTTPRequestHandler): + def do_GET(self): + if self.path == '/health': + self.send_response(200); self.end_headers(); self.wfile.write(b'OK') + elif self.path == '/crash': + self.send_response(200); self.end_headers(); self.wfile.write(b'DIE'); sys.exit(1) + else: + self.send_response(404); self.end_headers() +with socketserver.TCPServer(("", PORT), H) as srv: + srv.serve_forever() diff --git a/healer/tests/fixtures/test_process.rs b/healer/tests/fixtures/test_process.rs new file mode 100644 index 0000000000000000000000000000000000000000..7db433c8bf880232b17e29021834b6963ed30a58 --- /dev/null +++ b/healer/tests/fixtures/test_process.rs @@ -0,0 +1,7 @@ +fn main(){ + use std::{fs,thread,time,process,io::{self,Write}}; + let pid = process::id(); + let _ = fs::create_dir_all("/tmp/healer-tests/pids"); + let _ = fs::write("/tmp/healer-tests/pids/counter.pid", pid.to_string()); + let mut n=0u64; loop{ print!("\r[PID {}] alive {}", pid,n); let _=io::stdout().flush(); thread::sleep(time::Duration::from_secs(1)); n+=1; } + } \ No newline at end of file diff --git a/healer/tests/process_e2e.rs b/healer/tests/process_e2e.rs new file mode 100644 index 0000000000000000000000000000000000000000..f6c1a5220607c90f5e50241c819f83db56ed0ccc --- /dev/null +++ b/healer/tests/process_e2e.rs @@ -0,0 +1,364 @@ +use std::fs; +use std::io::{Read, Write}; +use std::path::PathBuf; +use std::process::{Child, Command, Stdio}; +use std::thread; +use std::time::Duration; + +fn cleanup_stray_processes() { + let base = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let healer_bin = base.join("target/debug/healer"); + let test_helper = base.join("target/debug/test_process"); + let dummy_py = base.join("tests/fixtures/dummy_service.py"); + + let patterns = vec![ + healer_bin.to_string_lossy().to_string(), + test_helper.to_string_lossy().to_string(), + dummy_py.to_string_lossy().to_string(), + ]; + + for pat in patterns { + let _ = Command::new("pkill").args(["-9", "-f", &pat]).status(); + } +} + +fn write_file(path: &str, content: &str) { + let p = PathBuf::from(path); + if let Some(parent) = p.parent() { + let _ = fs::create_dir_all(parent); + } + fs::write(p, content).expect("write file failed"); +} + +fn spawn_healer_foreground(config_path: &str) -> Child { + let mut cmd = Command::new(env!("CARGO_BIN_EXE_healer")); + cmd.env("HEALER_CONFIG", config_path) + .env("HEALER_NO_DAEMON", "1") + .env( + "RUST_LOG", + std::env::var("RUST_LOG").unwrap_or_else(|_| { + "info,healer::monitor::pid_monitor=debug,healer_action=debug,healer_event=info,dep_coord=debug".to_string() + }), + ); + // 测试时可继承 stdio(HEALER_TEST_INHERIT_STDIO=1) + let inherit_stdio = std::env::var("HEALER_TEST_INHERIT_STDIO") + .map(|v| matches!(v.to_ascii_lowercase().as_str(), "1" | "true" | "yes")) + .unwrap_or(false); + if inherit_stdio { + cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit()); + } else { + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); + } + + cmd.spawn().expect("failed to spawn healer") +} + +fn wait_secs(s: u64) { + thread::sleep(Duration::from_secs(s)); +} + +fn kill_by_pid(pid: i32) { + #[cfg(unix)] + { + let _ = Command::new("/bin/kill") + .args(["-9", &pid.to_string()]) + .status(); + } +} + +fn kill_child(mut child: Child) { + // 先优雅退出(SIGINT),必要时再强杀(SIGKILL) + let _ = Command::new("/bin/kill") + .args(["-INT", &child.id().to_string()]) + .status(); + for _ in 0..10 { + if let Ok(Some(_)) = child.try_wait() { + return; + } + wait_secs(0); // yield + thread::sleep(Duration::from_millis(50)); + } + let _ = Command::new("/bin/kill") + .args(["-9", &child.id().to_string()]) + .status(); + let _ = child.wait(); +} + +fn build_temp_config(base: &str) -> String { + format!( + r#" +log_level: "info" +log_directory: "/tmp/healer-tests/logs" +pid_file_directory: "/tmp/healer-tests/pids" +working_directory: "/" + +processes: + - name: "counter" + enabled: true + command: "{base}/target/debug/test_process" + args: [] + run_as_root: true + run_as_user: null + monitor: + type: "pid" + pid_file_path: "/tmp/healer-tests/pids/counter.pid" + interval_secs: 1 + recovery: + type: "regular" + retries: 3 + retry_window_secs: 10 + cooldown_secs: 5 + + - name: "dummy_net" + enabled: true + command: "/usr/bin/python3" + args: ["{base}/tests/fixtures/dummy_service.py"] + run_as_root: true + run_as_user: null + monitor: + type: "network" + target_url: "http://127.0.0.1:8080/health" + interval_secs: 1 + recovery: + type: "regular" + retries: 2 + retry_window_secs: 5 + cooldown_secs: 4 +"# + ) +} + +fn ensure_test_binaries() { + let helper_src = r#"fn main(){ + use std::{fs,thread,time,process,io::{self,Write}}; + let pid = process::id(); + let _ = fs::create_dir_all("/tmp/healer-tests/pids"); + let _ = fs::write("/tmp/healer-tests/pids/counter.pid", pid.to_string()); + let mut n=0u64; loop{ print!("\\r[PID {}] alive {}", pid,n); let _=io::stdout().flush(); thread::sleep(time::Duration::from_secs(1)); n+=1; } + }"#; + let base = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let bin_dir = base.join("target").join("debug"); + let test_bin_src = base.join("tests/fixtures/test_process.rs"); + let _ = fs::create_dir_all(test_bin_src.parent().unwrap()); + write_file(test_bin_src.to_str().unwrap(), helper_src); + let out_bin = bin_dir.join("test_process"); + let status = Command::new("rustc") + .args([ + "-O", + test_bin_src.to_str().unwrap(), + "-o", + out_bin.to_str().unwrap(), + ]) + .status() + .expect("failed to run rustc for test helper"); + assert!(status.success(), "failed to build test helper bin"); +} + +#[test] +fn restart_on_pid_exit_and_circuit_breaker() { + cleanup_stray_processes(); + ensure_test_binaries(); + let base = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let cfg_text = build_temp_config(base.to_str().unwrap()); + let cfg_path = base.join("target/debug/it_config.yaml"); + write_file(cfg_path.to_str().unwrap(), &cfg_text); + + // 先启动 helper,保证 PID 文件存在 + let helper_bin = base.join("target/debug/test_process"); + let mut initial = Command::new(helper_bin) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .expect("failed to spawn initial test_process"); + // 等待 PID 文件就绪 + let pid_path = "/tmp/healer-tests/pids/counter.pid"; + for _ in 0..10 { + if fs::read_to_string(pid_path).is_ok() { + break; + } + wait_secs(1); + } + + let mut healer = spawn_healer_foreground(cfg_path.to_str().unwrap()); + wait_secs(2); + + // 检查 healer 是否仍在运行 + match healer.try_wait() { + Ok(Some(status)) => { + panic!("Healer exited early with status: {:?}", status); + } + Ok(None) => { + println!("Healer is running normally"); + } + Err(e) => { + panic!("Error checking healer status: {}", e); + } + } + + let first_pid: i32 = fs::read_to_string(pid_path) + .ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + if first_pid > 0 { + kill_by_pid(first_pid); + // 回收初始子进程,避免僵尸进程 + let _ = initial.wait(); + } + + // 等待重启 + let mut new_pid = first_pid; + println!("Waiting for restart. Original PID: {}", first_pid); + for i in 0..15 { + wait_secs(1); + if let Ok(s) = fs::read_to_string(pid_path) { + if let Ok(p) = s.trim().parse::() { + println!("Iteration {}: PID file contains: {}", i + 1, p); + if p > 0 && p != first_pid { + new_pid = p; + println!( + "Process restarted with new PID {} after {} seconds", + p, + i + 1 + ); + break; + } + } + } else { + println!("Iteration {}: Could not read PID file", i + 1); + } + if i % 5 == 4 { + let current_pid = fs::read_to_string(pid_path) + .ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + println!( + "Still waiting for restart... Current PID: {}, Original PID: {}", + current_pid, first_pid + ); + } + } + assert!( + new_pid > 0 && new_pid != first_pid, + "healer did not restart the counter process" + ); + + // 快速杀 3 次触发熔断(与配置一致) + for _ in 0..3 { + kill_by_pid(new_pid); + let last = new_pid; + for _ in 0..10 { + wait_secs(1); + if let Ok(s) = fs::read_to_string(pid_path) { + if let Ok(p) = s.trim().parse::() { + if p != last { + new_pid = p; + break; + } + } + } + } + } + + // 再杀一次应被冷却期拦住,~3s 内不应重启 + kill_by_pid(new_pid); + let old = new_pid; + wait_secs(3); + let after: i32 = fs::read_to_string(pid_path) + .ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0); + assert_eq!(after, old, "circuit breaker did not hold during cooldown"); + + // 冷却后应恢复重试 + let mut restarted = false; + for _ in 0..6 { + wait_secs(1); + if let Ok(s) = fs::read_to_string(pid_path) { + if let Ok(p) = s.trim().parse::() { + if p != old { + restarted = true; + break; + } + } + } + } + assert!(restarted, "healer did not attempt restart after cooldown"); + + // 清理:结束 healer 与最后的 helper + kill_child(healer); + if let Ok(s) = fs::read_to_string(pid_path) { + if let Ok(p) = s.trim().parse::() { + kill_by_pid(p); + } + } + cleanup_stray_processes(); +} + +#[test] +fn network_monitor_detects_crash_and_recovers() { + cleanup_stray_processes(); + ensure_test_binaries(); + let base = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let dummy_py = r#"import http.server, socketserver, sys +socketserver.TCPServer.allow_reuse_address = True +PORT=8080 +class H(http.server.SimpleHTTPRequestHandler): + def do_GET(self): + if self.path == '/health': + self.send_response(200); self.end_headers(); self.wfile.write(b'OK') + elif self.path == '/crash': + self.send_response(200); self.end_headers(); self.wfile.write(b'DIE'); sys.exit(1) + else: + self.send_response(404); self.end_headers() +with socketserver.TCPServer(("", PORT), H) as srv: + srv.serve_forever() +"#; + let py_path = base.join("tests/fixtures/dummy_service.py"); + write_file(py_path.to_str().unwrap(), dummy_py); + + let cfg_text = build_temp_config(base.to_str().unwrap()); + let cfg_path = base.join("target/debug/net_config.yaml"); + write_file(cfg_path.to_str().unwrap(), &cfg_text); + + let healer = spawn_healer_foreground(cfg_path.to_str().unwrap()); + wait_secs(2); + + fn http_get(path: &str) -> Option { + use std::net::TcpStream; + let mut stream = TcpStream::connect("127.0.0.1:8080").ok()?; + let req = format!( + "GET {} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n", + path + ); + stream.write_all(req.as_bytes()).ok()?; + let mut buf = String::new(); + stream.read_to_string(&mut buf).ok()?; + Some(buf) + } + fn is_healthy() -> bool { + http_get("/health") + .map(|r| r.starts_with("HTTP/1.1 200") || r.contains("OK")) + .unwrap_or(false) + } + + for _ in 0..10 { + if is_healthy() { + break; + } + wait_secs(1); + } + let _ = http_get("/crash"); + let mut healthy = false; + for _ in 0..20 { + if is_healthy() { + healthy = true; + break; + } + wait_secs(1); + } + assert!(healthy, "network monitor did not recover dummy service"); + + let _ = http_get("/crash"); + kill_child(healer); + cleanup_stray_processes(); +}