diff --git a/Cargo.toml b/Cargo.toml index d13e29f025a0edaff03cdbac47493c924d032d1c..b46515242bfef1af8475c8d8a2b0d84c66ca5fc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ required-features = ["eventbus", "servers", "test"] #required-features = [] [workspace] -members = ["crates/os_socket_comms"] +members = ["crates/os_socket_comms", "crates/network_info"] [workspace.dependencies] @@ -30,6 +30,7 @@ log = "0.4.22" tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync"] } serde_json = "1.0.127" chrono = { version = "0.4.38", features = ["serde"] } +serde = { version = "1.0.209", features = ["derive"] } [workspace.package] version = "0.4.0" @@ -46,8 +47,14 @@ messaging = ["eventbus"] os_socket = ["os_socket_comms"] [dependencies] +# crates os_socket_comms = { path = "crates/os_socket_comms", optional = true } +network_info = { path = "crates/network_info" } + +# dependencies feventbus = { git = "https://gitee.com/iscas-system/eventbus.git", optional = true } +pnet = "0.34.0" +pnet_datalink = "0.34.0" #fleetmodv2 = { path = "../fleetmodv2" } fleetmodv2 = { git = "https://gitee.com/iscas-system/fleetmodv2.git" } #client-rust = { path = "../client-rust" } diff --git a/crates/network_info/Cargo.toml b/crates/network_info/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..497a01425cbd4f289e156f63697a12614a31c20b --- /dev/null +++ b/crates/network_info/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "network_info" +version = "0.1.0" +edition = "2021" + +[dependencies] +pnet = "0.34.0" +pnet_datalink = "0.34.0" +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +log = { workspace = true } \ No newline at end of file diff --git a/crates/network_info/src/lib.rs b/crates/network_info/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..12eff1ae641c67443f57838ee7196aa3eff0d9bc --- /dev/null +++ b/crates/network_info/src/lib.rs @@ -0,0 +1,196 @@ +//! Network Information Gathering Library +//! +//! This library provides functionality to gather network interface information +//! and test network connectivity on Linux systems. +//! +//! # Examples +//! +//! ```rust +//! use network_info::{get_network_interfaces, test_connectivity}; +//! +//! // Get all network interfaces +//! let interfaces = get_network_interfaces(); +//! for interface in interfaces { +//! println!("Interface: {}, IP: {:?}, Speed: {:?} Mbps", +//! interface.interface_name, +//! interface.ip_address, +//! interface.speed_mbps +//! ); +//! } +//! +//! // Test connectivity to a host +//! let is_connected = test_connectivity("8.8.8.8").await; +//! println!("Is connected: {}", is_connected); +//! ``` + +use pnet_datalink; +use std::process::Command; +use serde::{Serialize, Deserialize}; +use std::fs; +use std::path::Path; + +/// Network interface information structure +/// +/// Contains details about a network interface including its name, +/// IP address, MAC address, operational status, and speed. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct NetworkInterfaceInfo { + /// Name of the network interface (e.g., "eth0", "wlan0") + pub interface_name: String, + /// IPv4 address of the interface, if available + pub ip_address: Option, + /// MAC address of the interface + pub mac_address: String, + /// Whether the interface is currently up and operational + pub is_up: bool, + /// Speed of the interface in Mbps, if available + pub speed_mbps: Option, +} + +/// Get the speed of a network interface +/// +/// Reads the speed information from `/sys/class/net//speed`. +/// Returns `None` if the speed cannot be determined. +/// +/// # Arguments +/// +/// * `interface_name` - Name of the network interface +/// +/// # Returns +/// +/// * `Option` - Speed in Mbps if available, None otherwise +fn get_interface_speed(interface_name: &str) -> Option { + let speed_path = Path::new("/sys/class/net") + .join(interface_name) + .join("speed"); + + match fs::read_to_string(speed_path) { + Ok(speed_str) => { + match speed_str.trim().parse::() { + Ok(speed) => Some(speed), + Err(e) => { + log::warn!("解析网卡 {} 速度失败: {}", interface_name, e); + None + } + } + } + Err(e) => { + // 某些网卡可能不支持速度信息,这是正常的 + log::debug!("获取网卡 {} 速度失败: {}", interface_name, e); + None + } + } +} + +/// Get information about all network interfaces +/// +/// Retrieves detailed information about all network interfaces +/// present in the system, including their IP addresses, MAC addresses, +/// operational status, and speeds. +/// +/// # Returns +/// +/// * `Vec` - Vector of network interface information +pub fn get_network_interfaces() -> Vec { + pnet_datalink::interfaces() + .into_iter() + .map(|interface| { + // 获取第一个IPv4地址 + let ip_address = interface.ips.iter() + .find(|ip| ip.is_ipv4()) + .map(|ip| ip.ip().to_string()); + + // 获取MAC地址,如果没有则使用默认值 + let mac_address = interface.mac + .map(|mac| mac.to_string()) + .unwrap_or_else(|| "00:00:00:00:00:00".to_string()); + + // 获取网卡速度 + let speed_mbps = if interface.is_up() { + get_interface_speed(&interface.name) + } else { + None + }; + + NetworkInterfaceInfo { + interface_name: interface.name.clone(), + ip_address, + mac_address, + is_up: interface.is_up(), + speed_mbps, + } + }) + .collect() +} + +/// Test network connectivity to a target IP address +/// +/// Uses the `ping` command to test connectivity to the specified IP address. +/// Sends a single ICMP echo request and waits for a response. +/// +/// # Arguments +/// +/// * `target_ip` - IP address to test connectivity to +/// +/// # Returns +/// +/// * `bool` - True if the target is reachable, false otherwise +pub async fn test_connectivity(target_ip: &str) -> bool { + let output = Command::new("ping") + .arg("-c") + .arg("1") + .arg(target_ip) + .output(); + + match output { + Ok(output) => output.status.success(), + Err(e) => { + log::error!("执行ping命令失败: {}", e); + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_network_interfaces() { + let interfaces = get_network_interfaces(); + assert!(!interfaces.is_empty(), "Should find at least one network interface"); + + // Test that loopback interface exists + let has_loopback = interfaces.iter() + .any(|iface| iface.interface_name == "lo"); + assert!(has_loopback, "Should find loopback interface"); + + // Test interface information structure + for interface in interfaces { + assert!(!interface.interface_name.is_empty(), "Interface name should not be empty"); + assert!(!interface.mac_address.is_empty(), "MAC address should not be empty"); + + // If interface is up, it should have an IP address + if interface.is_up { + assert!(interface.ip_address.is_some(), "Up interface should have an IP address"); + } + } + } + + #[tokio::test] + async fn test_connectivity_localhost() { + let is_connected = test_connectivity("127.0.0.1").await; + assert!(is_connected, "Should be able to connect to localhost"); + } + + #[test] + fn test_interface_speed() { + // Test loopback interface speed (should be None) + let speed = get_interface_speed("lo"); + assert!(speed.is_none(), "Loopback interface should not have a speed"); + + // Test a non-existent interface + let speed = get_interface_speed("nonexistent"); + assert!(speed.is_none(), "Non-existent interface should not have a speed"); + } +} \ No newline at end of file diff --git a/src/cores/mod.rs b/src/cores/mod.rs index 8cef0f9fa9d6635df64a363a946f6c4b979d878a..b8c7e7a42df2d4a3706a7fd86fa35207ce432a9a 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -3,6 +3,7 @@ use bon::Builder; use consensus_kv::raft_config::{self, RaftConfig}; use std::iter::zip; use std::{collections::BTreeMap, ffi::CString}; +pub mod services; pub mod daemons; @@ -25,7 +26,7 @@ pub async fn prepare_app_state( use crate::cores::daemons::messaging::WatchDaemon; use crate::cores::router::Router; - use crate::cores::state::AppState; + use crate::cores::state::{AppState, NetworkStatusConfig}; use crate::db::db::DbPool; use crate::utils::get_or_create_uuid; use crate::utils::token::get_or_create_secret; @@ -49,6 +50,7 @@ pub async fn prepare_app_state( consensus_daemon, cluster_id, token_secret, + network_status_config: NetworkStatusConfig::default(), }) } @@ -106,6 +108,20 @@ pub async fn start_server(params: ServerStartParams) -> anyhow::Result<()> { app_state.watch_daemon.start(); app_state.consensus_daemon.start(); + // 启动网络状态聚合器 + if app_state.network_status_config.enabled { + log::info!( + "启动网络状态聚合器,更新间隔:{}秒", + app_state.network_status_config.update_interval + ); + let app_state_for_network = app_state.clone(); + tokio::spawn(async move { + services::network_status::start_aggregation(app_state_for_network.into()).await; + }); + } else { + log::info!("网络状态聚合器未启用"); + } + // 启动各个server let messaging_server: Option> = Some(Box::new(servers::MessagingServer)); diff --git a/src/cores/services/mod.rs b/src/cores/services/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..4e7feea72dab7a00918285a063e118807910c2fe --- /dev/null +++ b/src/cores/services/mod.rs @@ -0,0 +1 @@ +pub mod network_status; \ No newline at end of file diff --git a/src/cores/services/network_status.rs b/src/cores/services/network_status.rs new file mode 100644 index 0000000000000000000000000000000000000000..0ebbc1048eac021fb79b40200adbd8fb36c6e632 --- /dev/null +++ b/src/cores/services/network_status.rs @@ -0,0 +1,85 @@ +use crate::cores::state::AppState; +use crate::cores::handlers::network_status::models::{NewNetworkInterface, NewConnectivityTest}; +use crate::db::network_status_ops; +use network_info::{self}; +use std::sync::Arc; +use tokio::time::{self, Duration}; + +/// 更新网络接口信息到数据库 +async fn update_network_interfaces(app_state: &Arc) { + let interfaces = network_info::get_network_interfaces(); + let mut db_conn = match app_state.db_pool.get_connection() { + Ok(conn) => conn, + Err(e) => { + log::error!("获取数据库连接失败: {}", e); + return; + } + }; + + for interface in interfaces { + let new_interface = NewNetworkInterface { + interface_name: interface.interface_name, + ip_address: interface.ip_address, + mac_address: interface.mac_address, + is_up: interface.is_up, + speed_mbps: interface.speed_mbps, + }; + + if let Err(e) = network_status_ops::create_network_interface(&mut db_conn, &new_interface).await { + log::error!("更新网络接口失败: {}", e); + } else { + log::info!("成功更新网络接口"); + } + } +} + +/// 更新连通性测试结果到数据库 +async fn update_connectivity_tests(app_state: &Arc, target_ips: Vec) { + let mut db_conn = match app_state.db_pool.get_connection() { + Ok(conn) => conn, + Err(e) => { + log::error!("获取数据库连接失败: {}", e); + return; + } + }; + + for target_ip in target_ips { + let is_connected = network_info::test_connectivity(&target_ip).await; + let new_test = NewConnectivityTest { + target_ip: target_ip.clone(), + is_connected, + }; + + if let Err(e) = network_status_ops::create_or_update_connectivity_test(&mut db_conn, &new_test).await { + log::error!("更新连通性测试 {} 失败: {}", target_ip, e); + } else { + log::info!("成功更新连通性测试: {} -> {}", target_ip, is_connected); + } + } +} + +/// 启动网络状态聚合进程 +/// +/// 该函数会定期收集网络状态信息并存储到数据库中 +/// +/// # 参数 +/// * `app_state` - 包含数据库连接池的应用状态 +pub async fn start_aggregation(app_state: Arc) { + log::info!("网络状态聚合器已启动"); + + // 使用配置的更新间隔 + let update_interval = app_state.network_status_config.update_interval; + let mut interval = time::interval(Duration::from_secs(update_interval)); + + loop { + interval.tick().await; + + // 更新网络接口信息 + update_network_interfaces(&app_state).await; + + // 更新连通性测试结果 + update_connectivity_tests(&app_state, app_state.network_status_config.target_ips.clone()).await; + + log::info!("网络状态更新完成"); + } +} \ No newline at end of file diff --git a/src/cores/state.rs b/src/cores/state.rs index e05f3982b8a34ff86a2b71ceda09ba91a26b7868..fc72a62b57394bbbdc2d10e092902349a6b8cf38 100644 --- a/src/cores/state.rs +++ b/src/cores/state.rs @@ -6,6 +6,29 @@ use std::sync::Arc; use super::daemons::consensus::ConsensusDaemon; +/// 网络状态聚合器配置 +#[derive(Clone)] +pub struct NetworkStatusConfig { + /// 更新间隔(秒) + pub update_interval: u64, + /// 是否启用网络状态聚合器 + pub enabled: bool, + /// 要测试连通性的目标IP列表 + pub target_ips: Vec, +} + +impl Default for NetworkStatusConfig { + fn default() -> Self { + Self { + update_interval: 60, // 默认60秒更新一次 + enabled: true, // 默认启用 + target_ips: vec![ + "127.0.0.1".to_string(), // 本地回环地址 + ], + } + } +} + #[derive(Clone)] pub struct AppState { pub router: Arc, @@ -15,4 +38,5 @@ pub struct AppState { pub consensus_daemon: Arc, pub cluster_id: String, pub token_secret: Vec, + pub network_status_config: NetworkStatusConfig, }