diff --git a/Cargo.toml b/Cargo.toml index b46515242bfef1af8475c8d8a2b0d84c66ca5fc0..e581cdbfb62ea0e1d1f853fc4f0eb006c0efa7f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,3 +98,5 @@ jsonwebtoken = "9.2" rand = "0.8" futures-util = "0.3.31" serialport = "=4.6.1" + +reqwest = { version = "0.11", features = ["json", "blocking"] } diff --git a/crates/os_socket_comms/Cargo.toml b/crates/os_socket_comms/Cargo.toml index 5ee2319e19fc7b1bf8d13ce00dd9fe7fb4170ccd..82b8c05516730cc7fe1cc0bb587331ed17c6da32 100644 --- a/crates/os_socket_comms/Cargo.toml +++ b/crates/os_socket_comms/Cargo.toml @@ -10,4 +10,7 @@ async-trait = { workspace = true } log = { workspace = true } tokio = { workspace = true } serde_json = { workspace = true } -chrono = { workspace = true } \ No newline at end of file +chrono = { workspace = true } + +[dev-dependencies] +bytes = "1.0" \ No newline at end of file diff --git a/crates/os_socket_comms/README_zh.md b/crates/os_socket_comms/README_zh.md index aed7abef72b1f942724bda8bf8f501b0ac2bd823..4cf02d2e28a164eee90f9e8d878db627732c1668 100644 --- a/crates/os_socket_comms/README_zh.md +++ b/crates/os_socket_comms/README_zh.md @@ -9,7 +9,7 @@ * 向操作系统组件发送 TLV 格式的消息。 * 从操作系统组件接收和解析 TLV 格式的消息。 * 处理接收到的消息并将其转发到消息总线系统。 -* 定期向操作系统组件发送遥测数据。 +* 从消息总线的特定主题(例如 `telemetry_task_planning`, `telemetry_processing`)订阅遥测数据,将其缓存到磁盘,并定期从磁盘读取并发送到操作系统组件。 ## 关键组件 @@ -19,12 +19,17 @@ * 提供 `read_data()` 方法以异步从套接字读取原始字节数据。 * 提供 `send_tlv_message(tag: u32, value: &[u8])` 来构造和发送 TLV 消息。 -* **`OSSocketHandler`**: +* **`OSSocketHandler`**: * 一个使用 `OSSocketServer` 的高级处理器。 - * 它泛型于 `P`,`P` 必须实现 `feventbus::traits::producer::Producer`。 + * 它泛型于 `P`,`P` 必须同时实现 `feventbus::traits::producer::Producer` 和 `feventbus::traits::consumer::Consumer`。 * 持续从套接字读取数据,使用 `tlv::parse_tlv` 将其解析为 `TLVMessage`。 - * 根据消息标签将其转发到消息总线上的不同主题。 - * 启动一个单独的异步任务以定期发送遥测数据(使用 `fetch_local_telemetry_data` 作为数据源,`TAG_USER_CLOUD_PLATFORM_OS_TO_CLOUD` 作为标签)。 + * 根据消息标签将其转发到消息总线上的不同主题 (例如 `TOPIC_TELECOMMAND_TASK_PLANNING`)。 + * 启动异步任务以订阅特定主题 (例如 `TOPIC_TELEMETRY_TASK_PLANNING`, `TOPIC_TELEMETRY_PROCESSING`) 的遥测数据。 + * 从 `TOPIC_TELEMETRY_TASK_PLANNING` 收到的遥测数据将被写入磁盘缓存文件 (`/var/tmp/apiserver_telemetry_task_planning_cache.dat`)。 + * 从 `TOPIC_TELEMETRY_PROCESSING` 收到的遥测数据将被写入磁盘缓存文件 (`/var/tmp/apiserver_telemetry_processing_cache.dat`)。 + * 启动一个单独的异步任务,定期从这两个磁盘缓存文件中读取遥测数据。 + * 来自任务规划缓存的数据使用 `TAG_USER_TASK_PLANNING_CLOUD_TO_OS` 标签发送。 + * 来自处理缓存的数据使用 `TAG_USER_PROCESSING_CLOUD_TO_OS` 标签发送。 * **`TLVMessage`**: * 一个表示已解析 TLV 消息的结构体: @@ -41,7 +46,10 @@ * **常量**: * `OS_SOCKET_PATH`:定义 Unix 域套接字的路径。 - * 各种 `TAG_*` 常量定义了 TLV 协议中使用的 `u32` 标签(例如, `TAG_TASK_PLANNING`, `TAG_USER_CLOUD_PLATFORM_OS_TO_CLOUD`)。 + * 各种 `TAG_*` 常量定义了 TLV 协议中使用的 `u32` 标签 (包括 `TAG_USER_TASK_PLANNING_CLOUD_TO_OS` 和 `TAG_USER_PROCESSING_CLOUD_TO_OS` 用于特定遥测数据源)。 + * 各种 `TOPIC_*` 常量定义了消息总线的主题名称 (例如 `TOPIC_TELEMETRY_TASK_PLANNING`, `TOPIC_TELECOMMAND_PROCESSING`)。 + * `TELEMETRY_TASK_PLANNING_CACHE_FILE_PATH`: 定义任务规划遥测数据磁盘缓存文件的路径 (`/var/tmp/apiserver_telemetry_task_planning_cache.dat`)。 + * `TELEMETRY_PROCESSING_CACHE_FILE_PATH`: 定义处理遥测数据磁盘缓存文件的路径 (`/var/tmp/apiserver_telemetry_processing_cache.dat`)。 ## TLV 协议详情 @@ -57,16 +65,17 @@ ```rust use os_socket_comms::{OSSocketHandler, OSSocketServer}; -use feventbus::Arc; // 假设 feventbus::Arc 用于生产者 -use feventbus::mem::MemoryProducer; // 示例生产者 +use feventbus::Arc; // 假设 feventbus 用于生产者和消费者 +use feventbus::impls::mem::MemoryEventBus; // 示例:使用内存事件总线,它同时实现 Producer 和 Consumer #[tokio::main] async fn main() -> anyhow::Result<()> { // 初始化日志 (例如,使用 env_logger 或 tracing_subscriber) env_logger::init(); - // 1. 创建一个消息生产者 (示例使用内存生产者) - let message_producer = Arc::new(MemoryProducer::new(100)); + // 1. 创建一个消息总线实例 (示例使用内存事件总线) + // 确保所选的 P 类型同时实现了 Producer 和 Consumer 特性。 + let message_bus = Arc::new(MemoryEventBus::new(100)); // MemoryEventBus 通常需要特定的初始化 // 2. 尝试创建一个 OSSocketServer 实例 // OSSocketServer::new() 将尝试连接到 OS_SOCKET_PATH。 @@ -75,26 +84,26 @@ async fn main() -> anyhow::Result<()> { Some(socket_server) => { // 3. 创建 OSSocketHandler // OSSocketHandler::new 也返回 Result> - match OSSocketHandler::new(message_producer.clone(), Some(socket_server))? { + match OSSocketHandler::new(message_bus.clone(), Some(socket_server))? { Some(handler) => { // 4. 启动处理器 - // 这将为读取消息和发送遥测数据启动任务。 - handler.start().await; - log::info!(\"OSSocketHandler 已启动.\"); + // 这将为读取消息、订阅遥测主题和发送遥测数据启动任务。 + Arc::new(handler).start().await; // 注意:start() 消费 Arc + log::info!("OSSocketHandler 已启动."); // 处理器现在将在后台运行。 // 在此处添加应用程序逻辑,或保持主线程活动。 // 例如,无限期等待或等待特定信号。 tokio::signal::ctrl_c().await?; // 等待 Ctrl+C - log::info!(\"正在关闭.\"); + log::info!("正在关闭."); } None => { - log::error!(\"无法创建 OSSocketHandler (如果 server 是 Some,则不应发生这种情况).\"); + log::error!("无法创建 OSSocketHandler (如果 server 是 Some,则不应发生这种情况)."); } } } None => { - log::error!(\"无法连接到 OS 套接字。OSSocketServer 无法初始化.\"); + log::error!("无法连接到 OS 套接字。OSSocketServer 无法初始化."); // 处理 OS 套接字不可用的情况。 // 应用程序可能会以功能受限的方式继续运行或退出。 } @@ -103,9 +112,18 @@ async fn main() -> anyhow::Result<()> { } ``` -### 关于遥测数据的占位符 +### 关于遥测数据的处理 -函数 `fetch_local_telemetry_data()` 当前是一个占位符。在实际应用中,应实现此函数以从系统收集实际的遥测数据。 +遥测数据现在从通过 `feventbus` 配置的消息总线上的两个特定主题获取:`TOPIC_TELEMETRY_TASK_PLANNING` 和 `TOPIC_TELEMETRY_PROCESSING`。 +当 `OSSocketHandler` 从这些主题收到消息时,它会将消息体(假定为字节)写入对应的磁盘缓存文件: +* 来自 `TOPIC_TELEMETRY_TASK_PLANNING` 的数据写入 `/var/tmp/apiserver_telemetry_task_planning_cache.dat`。 +* 来自 `TOPIC_TELEMETRY_PROCESSING` 的数据写入 `/var/tmp/apiserver_telemetry_processing_cache.dat`。 + +一个独立的异步任务会定期: +1. 尝试从 `/var/tmp/apiserver_telemetry_task_planning_cache.dat` 读取数据。如果成功且文件非空,则使用 `TAG_USER_TASK_PLANNING_CLOUD_TO_OS` 标签将内容打包成 TLV 格式并发送到 OS 套接字。 +2. 尝试从 `/var/tmp/apiserver_telemetry_processing_cache.dat` 读取数据。如果成功且文件非空,则使用 `TAG_USER_PROCESSING_CLOUD_TO_OS` 标签将内容打包成 TLV 格式并发送到 OS 套接字。 + +如果某个缓存文件不存在或读取失败,则跳过该特定源在该周期的遥测发送。 ## 构建和测试 @@ -126,10 +144,10 @@ cargo test * `tokio`: 用于异步运行时。 * `anyhow`: 用于灵活的错误处理。 * `log`: 用于日志记录。 -* `feventbus`: 用于消息总线集成 (特别是 `Producer` 特性)。 -* `chrono`: 用于占位符遥测函数中的时间戳。 -* `serde_json`: `OSSocketHandler` 用它来序列化事件总线的消息负载。 +* `feventbus`: 用于消息总线集成 (特别是 `Producer` 和 `Consumer` 特性)。 +* `serde_json`: `OSSocketHandler` 用它来序列化/反序列化事件总线的消息负载。 +* `std::fs`: 用于将遥测数据缓存到磁盘的文件操作。 ## 许可证 -此包根据 Mulan PSL v2 许可证授权。有关更多详细信息,请参阅源文件中的版权声明。 \ No newline at end of file +此包 根据 Mulan PSL v2 许可证授权。有关更多详细信息,请参阅源文件中的版权声明。 \ No newline at end of file diff --git a/crates/os_socket_comms/examples/simple_sender.rs b/crates/os_socket_comms/examples/simple_sender.rs new file mode 100644 index 0000000000000000000000000000000000000000..73d0327b20a9a226d73c6c1bc92fd46b16d61687 --- /dev/null +++ b/crates/os_socket_comms/examples/simple_sender.rs @@ -0,0 +1,282 @@ +use feventbus::impls::messaging::datamgr_api; +use feventbus::traits::controller::EventBus; +use tokio::net::{UnixListener}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use std::error::Error; +use std::ffi::CString; +use std::time::Duration; +use bytes::Buf; +use std::fs; +use std::sync::Arc; +use feventbus::impls::messaging::messaging::Messaging; +use feventbus::traits::producer::Producer; +use feventbus::message::{Message, NativeEventAction}; +use serde_json; +use serde_json::json; +use std::os::raw::c_void; + +// This should match the definition in your os_socket_comms::lib.rs +const OS_SOCKET_PATH: &str = "/var/run/EulixOnBoardGuardian/os_fleet.sock"; +const TELEMETRY_PROCESSING_TOPIC: &str = "telemetry_processing"; + +/// Represents a parsed TLV message (simplified for client/server) +#[derive(Debug)] +struct ClientTLVMessage { + tag: u32, + length: u32, // This is the *decoded* actual length of the value + value: Vec, +} + +/// Encodes the length according to the specific TLV protocol. +/// The actual payload length (e.g., 10) is converted to its decimal string ("10"), +/// and then this string is parsed as a hexadecimal number (0x10, which is decimal 16). +fn encode_tlv_length(actual_payload_len: usize) -> Result { + let actual_payload_len_str = actual_payload_len.to_string(); + u32::from_str_radix(&actual_payload_len_str, 16).map_err(|e| { + format!( + "Failed to parse actual payload length string '{}' as hex: {}", + actual_payload_len_str, e + ) + }) +} + +/// Decodes the TLV length field to get the actual payload length. +fn decode_tlv_length(encoded_len_val: u32) -> Result { + let encoded_len_hex_str = format!("{:x}", encoded_len_val); + u32::from_str_radix(&encoded_len_hex_str, 10).map_err(|e| { + format!( + "Failed to parse encoded length hex string '{}' as decimal: {}", + encoded_len_hex_str, e + ) + }) +} + +/// Parses a TLV message from a byte slice (simplified for client/server) +/// Assumes the slice starts with a TLV message. +fn parse_tlv_from_slice(data: &mut &[u8]) -> Result { + if data.len() < 8 { // Minimum length for tag + length field + return Err(format!("Data too short for TLV header: {} bytes", data.len())); + } + + let tag = data.get_u32_le(); + let encoded_length_val = data.get_u32_le(); + + let actual_value_length = decode_tlv_length(encoded_length_val)? as usize; + + if data.len() < actual_value_length { + return Err(format!( + "Data too short for TLV value. Expected {} bytes, got {} bytes remaining. Tag: 0x{:08X}, EncodedLen: 0x{:08X}", + actual_value_length, data.len(), tag, encoded_length_val + )); + } + + let value = data[..actual_value_length].to_vec(); + data.advance(actual_value_length); + + Ok(ClientTLVMessage { + tag, + length: actual_value_length as u32, + value, + }) +} + + +unsafe fn init_mq_plugins( + plugin_to_load_key: &CString, + plugin_to_load_value: &CString, +) -> *mut c_void { + let master_plugin_manager = datamgr_api::NewPluginManager(); + datamgr_api::SetParameter( + master_plugin_manager, + plugin_to_load_key.as_ptr(), + plugin_to_load_value.as_ptr(), + ); + + datamgr_api::LoadPlugins(master_plugin_manager); + master_plugin_manager +} + + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Setup feventbus producer + let mut message_bus_instance = Messaging::new().await?; + + let plugin_to_load_key = CString::new("tests.pluginsToLoad").unwrap(); + let plugin_to_load_value = CString::new("test tlv Storage").unwrap(); + + let plugin_manager_ptr: *mut std::os::raw::c_void = unsafe { + init_mq_plugins(&plugin_to_load_key, &plugin_to_load_value) + }; + + message_bus_instance.set_plugin_manager(plugin_manager_ptr); + + let message_bus = Arc::new(message_bus_instance); + println!( + "Messaging instance created and plugin manager (placeholder) set. Will publish to topic: '{}'", + TELEMETRY_PROCESSING_TOPIC + ); + + // Clone message_bus for the periodic sender task + let bus_for_periodic_sender = message_bus.clone(); + tokio::spawn(async move { + let mut counter: u64 = 0; + loop { + counter += 1; + let periodic_payload = json!({ + "source": "periodic_sender", + "count": counter, + "timestamp": chrono::Utc::now().to_rfc3339() + }); + + let event_bus_message = Message::new( + TELEMETRY_PROCESSING_TOPIC.to_string(), + NativeEventAction::Other, + None, // metadata + Some(periodic_payload.clone()), // body + None, // created_at + ); + + println!( + "Periodic sender: Attempting to send message to topic '{}': {:?}", + TELEMETRY_PROCESSING_TOPIC, + periodic_payload + ); + + match bus_for_periodic_sender.publish(event_bus_message).await { + Ok(_) => { + println!( + "Periodic sender: Successfully published message to topic '{}'.", + TELEMETRY_PROCESSING_TOPIC + ); + } + Err(e) => { + eprintln!( + "Periodic sender: Failed to publish message to topic '{}': {}", + TELEMETRY_PROCESSING_TOPIC, e + ); + } + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + + // Attempt to remove the socket file if it already exists + // This is important for idempotency, as bind will fail if the file exists. + match fs::remove_file(OS_SOCKET_PATH) { + Ok(_) => println!("Successfully removed existing socket file: {}", OS_SOCKET_PATH), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // This is fine, socket didn't exist + println!("Socket file not found, no need to remove: {}", OS_SOCKET_PATH); + } + Err(e) => { + eprintln!("Error removing existing socket file {}: {}. Please check permissions or remove manually.", OS_SOCKET_PATH, e); + return Err(e.into()); + } + } + + println!("Attempting to bind to OS socket: {}", OS_SOCKET_PATH); + let listener = UnixListener::bind(OS_SOCKET_PATH)?; + println!("Successfully bound to socket: {}. Waiting for a client...", OS_SOCKET_PATH); + + loop { // Start of the loop to handle multiple clients + match listener.accept().await { + Ok((mut stream, client_addr)) => { + println!("Accepted connection from: {:?}", client_addr); + let bus_clone_for_handler = message_bus.clone(); // Clone Arc for use in this handler scope + + // Attempt to read a message + println!("Attempting to read message from client..."); + let mut read_buffer = vec![0; 4096]; // Read buffer + + // Using a timeout for reading can be good practice + match tokio::time::timeout(Duration::from_secs(10), stream.read(&mut read_buffer)).await { + Ok(Ok(bytes_read)) if bytes_read > 0 => { + println!("Received {} bytes from client.", bytes_read); + let mut received_data_slice = &read_buffer[..bytes_read]; + match parse_tlv_from_slice(&mut received_data_slice) { + Ok(tlv_message) => { + println!("Parsed TLV message: {:?}", tlv_message); + if let Ok(value_str) = String::from_utf8(tlv_message.value.clone()) { + println!("Message value as string: '{}'", value_str); + } + + // Send payload to feventbus topic + println!( + "Attempting to send {} bytes from received TLV (tag: 0x{:08X}) to topic '{}'...", + tlv_message.value.len(), + tlv_message.tag, + TELEMETRY_PROCESSING_TOPIC + ); + + // Construct the feventbus::message::Message + let json_payload = serde_json::to_value(tlv_message.value.clone())?; + let event_bus_message = Message::new( + TELEMETRY_PROCESSING_TOPIC.to_string(), + NativeEventAction::Other, // Using Other as a default + None, // metadata + Some(json_payload), // body + None, // created_at or other optional fields + ); + + match bus_clone_for_handler.publish(event_bus_message).await { + Ok(_) => { + println!( + "Successfully published payload to topic '{}'.", + TELEMETRY_PROCESSING_TOPIC + ); + } + Err(e) => { + eprintln!( + "Failed to publish payload to topic '{}': {}", + TELEMETRY_PROCESSING_TOPIC, e + ); + } + } + + // Optionally, send a response back to the client here + // For example: + // let response_value = "Acknowledged by server"; + // let response_tag: u32 = 0x00000001; // Example response tag + // let encoded_len_resp = encode_tlv_length(response_value.as_bytes().len())?; + // let mut message_to_send = Vec::new(); + // message_to_send.extend_from_slice(&response_tag.to_le_bytes()); + // message_to_send.extend_from_slice(&encoded_len_resp.to_le_bytes()); + // message_to_send.extend_from_slice(response_value.as_bytes()); + // if let Err(e) = stream.write_all(&message_to_send).await { + // eprintln!("Failed to send response: {}", e); + // } else { + // println!("Sent acknowledgment to client."); + // } + } + Err(e) => { + eprintln!("Failed to parse TLV message from client: {}", e); + eprintln!("Raw received data (hex): {:x?}", &read_buffer[..bytes_read]); + } + } + } + Ok(Ok(_)) => { // 0 bytes read, connection closed by peer + println!("Client closed the connection without sending data."); + } + Ok(Err(e)) => { + eprintln!("Error reading from client socket: {}", e); + } + Err(_) => { // Timeout + println!("Timeout waiting for message from client."); + } + } + if let Err(e) = stream.shutdown().await { // Close the connection + eprintln!("Error shutting down client stream: {}", e); + } + println!("Connection with client {:?} shut down.", client_addr); + } + Err(e) => { + eprintln!("Failed to accept connection: {}. Server continues to listen.", e); + // Optionally, add a small delay here or more robust error handling if accept fails repeatedly + } + } + } // End of the loop, will never be reached in this simple form + // To make it exit gracefully, you'd need a signal handler (e.g., Ctrl+C) + // println!("Server shutting down."); // This line is now unreachable + // Ok(()) +} \ No newline at end of file diff --git a/crates/os_socket_comms/src/handler.rs b/crates/os_socket_comms/src/handler.rs index f8e1bfb1ff608ab1223f79646e539fe2a99cf0d5..240663aa2d84a60e9368cff97ed5e854c934302c 100644 --- a/crates/os_socket_comms/src/handler.rs +++ b/crates/os_socket_comms/src/handler.rs @@ -11,51 +11,51 @@ // Affiliation: Institute of Software, Chinese Academy of Sciences use crate::{ - OSSocketServer, TLVMessage, TAG_CLOUD_PLATFORM, TAG_PROCESSING, TAG_TASK_PARAMS, - TAG_TASK_PLANNING, TAG_USER_CLOUD_PLATFORM_CLOUD_TO_OS + OSSocketServer, + TLVMessage, + TAG_CLOUD_PLATFORM, + TAG_PROCESSING, + TAG_TASK_PARAMS, + TAG_TASK_PLANNING, + TAG_USER_PROCESSING_CLOUD_TO_OS, // Specific tags for telemetry sources + TAG_USER_TASK_PLANNING_CLOUD_TO_OS, }; -// The AppState dependency will be removed/refactored in the next step. -// For now, this line would cause a compile error if this crate is built alone. -// use crate::cores::state::AppState; // This needs to be changed use anyhow::Result; -use chrono::Utc; +use feventbus::err::Error as FEventBusError; use feventbus::message::{Message, NativeEventAction}; +use feventbus::traits::consumer::Consumer as EventBusConsumer; +use feventbus::traits::consumer::MessageHandler; use feventbus::traits::producer::Producer; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; +use std::fs; +use std::future::Future; +use std::path::Path; +use std::pin::Pin; use std::sync::Arc; -use tokio::time::{sleep, Duration}; // For placeholder telemetry data +use tokio::time::{sleep, Duration}; // Added for Path manipulation -/// Handles messages received from the OS socket and sends telemetry data. -/// Generic over `P` which must implement the `Producer` trait for message bus integration. -pub struct OSSocketHandler { - // Made Producer generic - socket_server: OSSocketServer, // The server instance for OS socket communication. - // app_state: Arc, // To be replaced - message_producer: Arc

, // New field for message bus producer. Used to send received messages to the bus. - // Other specific dependencies can be added here if needed -} +// Define separate cache file paths for each telemetry source +const TELEMETRY_TASK_PLANNING_CACHE_FILE_PATH: &str = + "/tmp/apiserver_telemetry_task_planning_cache.dat"; +const TELEMETRY_PROCESSING_CACHE_FILE_PATH: &str = "/tmp/apiserver_telemetry_processing_cache.dat"; -/// Fetches local telemetry data. -/// This is a placeholder function and should be replaced with actual telemetry data gathering logic. -/// Currently, it simulates fetching data by creating a timestamped payload. -pub async fn fetch_local_telemetry_data() -> Result> { - let timestamp = Utc::now().to_rfc3339(); - let payload = format!("telemetry_update_{}", timestamp); - info!("Simulated fetching local telemetry data: {}", payload); - Ok(payload.into_bytes()) +/// 处理从 OS 套接字接收到的消息并发送遥测数据。 +/// 泛型 `P` 必须实现 `Producer` 和 `EventBusConsumer` (例如 `Messaging`). +pub struct OSSocketHandler { + socket_server: OSSocketServer, // OS 套接字通信的服务器实例 + message_producer: Arc

, // 用于生产和订阅消息 (如果 P 是 Messaging) } -impl OSSocketHandler

{ - /// 创建新的 OS Socket 处理器实例 - /// Creates a new instance of `OSSocketHandler`. +impl OSSocketHandler

{ + /// 创建一个新的 `OSSocketHandler` 实例。 /// - /// # Arguments - /// * `message_producer`: An `Arc

` where `P` is a message bus producer. - /// * `socket_server_opt`: An `Option`. If `None`, the handler won't be created. + /// # 参数 + /// * `message_producer`: 一个 `Arc

`,其中 `P` 是消息总线生产者和消费者。 + /// * `socket_server_opt`: 一个 `Option`。如果为 `None`,则不会创建处理器。 /// - /// # Returns - /// `Result>`: `Ok(Some(Self))` if successful, `Ok(None)` if `socket_server_opt` is `None`. + /// # 返回 + /// `Result>`: 如果成功则为 `Ok(Some(Self))`,如果 `socket_server_opt` 为 `None` 则为 `Ok(None)`。 pub fn new( message_producer: Arc

, socket_server_opt: Option, @@ -69,14 +69,14 @@ impl OSSocketHandler

{ } } - /// Creates a new `OSSocketHandler` with a default `OSSocketServer` instance. + /// 创建一个新的 `OSSocketHandler`,并使用默认的 `OSSocketServer` 实例。 /// - /// This constructor attempts to create an `OSSocketServer` internally. - /// # Arguments - /// * `message_producer`: An `Arc

` where `P` is a message bus producer. + /// 此构造函数尝试在内部创建 `OSSocketServer`。 + /// # 参数 + /// * `message_producer`: 一个 `Arc

`,其中 `P` 是消息总线生产者和消费者。 /// - /// # Returns - /// `Result>`: `Ok(Some(Self))` if the server is successfully created, `Ok(None)` otherwise. + /// # 返回 + /// `Result>`: 如果服务器成功创建则为 `Ok(Some(Self))`,否则为 `Ok(None)`。 pub fn new_with_default_server(message_producer: Arc

) -> Result> { match OSSocketServer::new()? { Some(server) => Ok(Some(Self { @@ -87,177 +87,325 @@ impl OSSocketHandler

{ } } - /// Starts the handler, which involves two main tasks: - /// 1. Spawning a task to continuously read and process messages from the OS socket. - /// 2. Spawning a task to periodically send telemetry data to the OS socket. - pub async fn start(&self) { + /// 启动处理器,主要包括三个任务: + /// 1. 启动一个任务以持续读取和处理来自 OS 套接字的消息。 + /// 2. 启动任务以订阅遥测数据主题并将数据写入磁盘缓存。 + /// 3. 启动一个任务以定期从磁盘读取遥测数据并将其发送到 OS 套接字。 + pub async fn start(self: Arc) { let server_for_reader = self.socket_server.clone(); - // let app_state_for_reader = self.app_state.clone(); // Removed let message_producer_for_handler = self.message_producer.clone(); - // Spawn a task to handle incoming messages from the OS socket. + // 任务 1: 处理来自 OS 套接字的传入消息 tokio::spawn(async move { - log::info!("OSSocketHandler: Read loop started for OS socket."); + log::info!("OSSocketHandler: OS 套接字读取循环已启动。"); loop { match server_for_reader.read_data().await { Ok(data) => { log::debug!( - "OSSocketHandler: server.read_data() returned {} bytes.", + "OSSocketHandler: server.read_data() 返回 {} 字节。", data.len() ); if !data.is_empty() { - // Attempt to parse the received data as a TLV message. match crate::tlv::parse_tlv(&data) { Ok(tlv) => { - log::debug!("OSSocketHandler: Successfully parsed TLV message with tag 0x{:08x}, length {}", tlv.tag, tlv.length); - // Pass message_producer instead of full app_state - // Handle the parsed TLV message. + log::debug!("OSSocketHandler: 成功解析 TLV 消息,标签 0x{:08x},长度 {}", tlv.tag, tlv.length); if let Err(e) = Self::handle_tlv_message( message_producer_for_handler.clone(), tlv, ) .await { - error!( - "OSSocketHandler: Error handling TLV message: {}", - e - ); + error!("OSSocketHandler: 处理 TLV 消息时出错: {}", e); } } Err(parse_err) => { - // Log an error if TLV parsing fails. let display_data_len = std::cmp::min(100, data.len()); - error!("OSSocketHandler: Failed to parse TLV message: {}. Raw data (first {} bytes): {:x?}", parse_err, display_data_len, &data[..display_data_len]); + error!("OSSocketHandler: 解析 TLV 消息失败: {}. 原始数据 (前 {} 字节): {:x?}", parse_err, display_data_len, &data[..display_data_len]); } } } else { - // If no data is read, it might indicate the peer closed the connection. - log::info!("OSSocketHandler: server.read_data() returned 0 bytes. Peer has likely closed the connection. Stopping read loop."); - break; // Exit the loop. + log::info!("OSSocketHandler: server.read_data() 返回 0 字节。对方可能已关闭连接。正在停止读取循环。"); + break; } } Err(e) => { - // Log an error if reading from the socket fails, then retry after a delay. - error!("OSSocketHandler: Error in server.read_data(): {}. Will retry after delay.", e); + error!( + "OSSocketHandler: server.read_data() 出错: {}. 将在延迟后重试。", + e + ); sleep(Duration::from_secs(1)).await; } } } - log::info!("OSSocketHandler: Read loop terminated."); + log::info!("OSSocketHandler: 读取循环已终止。"); + }); + + // 任务 2.1: 订阅 "telemetry_task_planning" 主题以获取遥测数据 + let self_for_task_planning_subscriber = self.clone(); + tokio::spawn(async move { + self_for_task_planning_subscriber + .start_telemetry_topic_subscriber( + crate::TOPIC_TELEMETRY_TASK_PLANNING, + TELEMETRY_TASK_PLANNING_CACHE_FILE_PATH, + ) + .await; }); - // Spawn a task to periodically send telemetry data. + // 任务 2.2: 订阅 "telemetry_processing" 主题以获取遥测数据 + let self_for_processing_subscriber = self.clone(); + tokio::spawn(async move { + self_for_processing_subscriber + .start_telemetry_topic_subscriber( + crate::TOPIC_TELEMETRY_PROCESSING, + TELEMETRY_PROCESSING_CACHE_FILE_PATH, + ) + .await; + }); + + // 任务 3: 定期发送遥测数据 let server_for_sender = self.socket_server.clone(); tokio::spawn(async move { Self::start_telemetry_sender(server_for_sender).await; }); } - /// Starts the telemetry data sender loop. - /// - /// This function periodically fetches local telemetry data and sends it - /// to the OS socket using a specific TLV tag. - async fn start_telemetry_sender(socket_server: OSSocketServer) { - info!("OSSocketHandler: Telemetry sender loop started."); - let telemetry_interval = Duration::from_secs(10); // Interval for sending telemetry. + /// 订阅指定主题并将接收到的数据写入指定的遥测缓存文件。 + async fn start_telemetry_topic_subscriber( + self: Arc, + topic_name: &'static str, + cache_file_path: &'static str, + ) { + info!( + "OSSocketHandler: 启动遥测数据订阅任务,主题: {}, 缓存文件: {}", + topic_name, cache_file_path + ); - loop { - sleep(telemetry_interval).await; - info!("OSSocketHandler: Time to send telemetry data."); + let message_handler: MessageHandler = Arc::new(move |message: Message| { + let topic_name_for_handler = topic_name; + let cache_file_path_for_handler = cache_file_path; - // Fetch the local telemetry data. - match fetch_local_telemetry_data().await { - Ok(telemetry_payload) => { - if telemetry_payload.is_empty() { - debug!("OSSocketHandler: No telemetry data to send."); - continue; // Skip if no data. - } - info!( - "OSSocketHandler: Sending telemetry data ({} bytes) with tag 0x{:08X}", - telemetry_payload.len(), - TAG_USER_CLOUD_PLATFORM_CLOUD_TO_OS // Use the designated tag for telemetry. - ); - // Send the telemetry data as a TLV message. - match socket_server - .send_tlv_message(TAG_USER_CLOUD_PLATFORM_CLOUD_TO_OS, &telemetry_payload) - .await - { - Ok(()) => { - info!( - "OSSocketHandler: Successfully sent telemetry data to OS socket." - ); + let fut = async move { + debug!( + "遥测订阅者收到主题 '{}' 的消息: {:?}", + topic_name_for_handler, message + ); + if let Some(body_value) = message.body { + match serde_json::to_vec(&body_value) { + Ok(bytes) => { + // Ensure the cache directory exists + if let Some(parent_dir) = + Path::new(cache_file_path_for_handler).parent() + { + if !parent_dir.exists() { + match fs::create_dir_all(parent_dir) { + Ok(_) => { + info!( + "OSSocketHandler: 成功创建缓存目录: {:?}", + parent_dir + ); + } + Err(e) => { + error!("OSSocketHandler: 创建缓存目录 {:?} 失败: {}. 缓存将失败。", parent_dir, e); + // Proceeding to fs::write will likely fail, but let it try and log that specific error. + } + } + } + } else { + // This case should ideally not happen with valid cache_file_path strings like "/var/tmp/..." + warn!( + "OSSocketHandler: 无法确定缓存文件 '{}' 的父目录。", + cache_file_path_for_handler + ); + } + + match fs::write(cache_file_path_for_handler, &bytes) { + Ok(_) => { + info!("OSSocketHandler: 成功将来自主题 '{}' 的遥测数据写入缓存文件: {}. 字节数: {}", topic_name_for_handler, cache_file_path_for_handler, bytes.len()); + } + Err(e) => { + error!("OSSocketHandler: 从主题 '{}' 写入遥测数据到缓存文件 '{}' 失败: {}", topic_name_for_handler, cache_file_path_for_handler, e); + } + } } Err(e) => { - error!("OSSocketHandler: Failed to send telemetry data: {}", e); + error!( + "OSSocketHandler: 从主题 '{}' 序列化消息体为字节时出错: {}", + topic_name_for_handler, e + ); } } + } else { + warn!( + "OSSocketHandler: 收到主题 '{}' 的消息但消息体为空。", + topic_name_for_handler + ); + } + Ok(String::new()) as Result + }; + let pinned_fut: Pin> + Send>> = + Box::pin(fut); + pinned_fut + }); + + // Loop to continuously attempt subscription + loop { + match self + .message_producer + .as_ref() + .subscribe(topic_name, message_handler.clone()) + .await + { + // Cloned message_handler for retries + Ok(_) => { + info!("OSSocketHandler: 成功订阅主题 '{}' (缓存到 {}) 以获取遥测数据。该订阅将保持活动状态。", topic_name, cache_file_path); + // The subscription is now active. feventbus will call the handler for new messages. + // This task can now block indefinitely, or if subscribe() internally manages the lifetime, + // it could even complete. For robust explicit blocking: + std::future::pending::<()>().await; // Keeps this spawned task alive indefinitely after successful subscription. + // This break will not be reached if std::future::pending is used. + break; } Err(e) => { - error!( - "OSSocketHandler: Failed to fetch local telemetry data: {}", + error!("OSSocketHandler: 订阅主题 '{}' (缓存到 {}) 以获取遥测数据失败: {}. 将在 5 秒后重试。", topic_name, cache_file_path, e); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + } + } + + /// 定期从缓存文件读取遥测数据并使用特定标签发送。 + async fn send_telemetry_from_cache( + socket_server: &OSSocketServer, + cache_path: &str, + tag: u32, + source_name: &str, + ) -> Result<()> { + match fs::read(cache_path) { + Ok(telemetry_payload) => { + if telemetry_payload.is_empty() { + debug!( + "OSSocketHandler: {} 的遥测缓存文件为空 ({}),不发送。", + source_name, cache_path + ); + return Ok(()); + } + info!( + "OSSocketHandler: 从 {} 的缓存文件 ({}) 发送遥测数据 ({} 字节),标签 0x{:08X}", + source_name, + cache_path, + telemetry_payload.len(), + tag + ); + socket_server + .send_tlv_message(tag, &telemetry_payload) + .await + .map_err(|e| { + error!( + "OSSocketHandler: 从 {} 的缓存 ({}) 发送遥测数据失败: {}", + source_name, cache_path, e + ); e + })?; + info!( + "OSSocketHandler: 成功将来自 {} (缓存 {}) 的遥测数据发送到 OS 套接字。", + source_name, cache_path + ); + } + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + debug!( + "OSSocketHandler: {} 的遥测缓存文件未找到: '{}'. 跳过发送。", + source_name, cache_path + ); + } else { + debug!( + "OSSocketHandler: 读取 {} 的遥测缓存文件 '{}' 失败: {}. 跳过发送。", + source_name, cache_path, e ); } } } + Ok(()) + } + + /// 定期从各源的缓存文件读取遥测数据并发送。 + async fn start_telemetry_sender(socket_server: OSSocketServer) { + info!("OSSocketHandler: 遥测发送循环已启动。"); + let telemetry_interval = Duration::from_secs(10); + + loop { + sleep(telemetry_interval).await; + info!("OSSocketHandler: 检查并发送各源的遥测数据。"); + + // 发送任务规划遥测数据 + if let Err(e) = Self::send_telemetry_from_cache( + &socket_server, + TELEMETRY_TASK_PLANNING_CACHE_FILE_PATH, + TAG_USER_TASK_PLANNING_CLOUD_TO_OS, + "任务规划", + ) + .await + { + warn!("发送任务规划遥测数据时遇到问题: {}", e); // 警告而不是终止循环 + } + + // 发送处理遥测数据 + if let Err(e) = Self::send_telemetry_from_cache( + &socket_server, + TELEMETRY_PROCESSING_CACHE_FILE_PATH, + TAG_USER_PROCESSING_CLOUD_TO_OS, + "数据处理", + ) + .await + { + warn!("发送数据处理遥测数据时遇到问题: {}", e); + } + } } - /// Handles a received TLV message based on its tag. + /// 根据标签处理接收到的 TLV 消息。 /// - /// Depending on the tag, the message value is published to a specific topic on the message bus. - /// # Arguments - /// * `message_producer`: The message bus producer. - /// * `tlv`: The `TLVMessage` to handle. + /// 根据标签,消息值将被发布到消息总线上的特定主题。 + /// # 参数 + /// * `message_producer`: 消息总线生产者。 + /// * `tlv`: 要处理的 `TLVMessage`。 async fn handle_tlv_message(message_producer: Arc

, tlv: TLVMessage) -> Result<()> { - // let message_client = app_state.message_cli.clone(); // Changed debug!( - "Received TLV message with tag: 0x{:08x}, length: {}", + "收到 TLV 消息,标签: 0x{:08x},长度: {}", tlv.tag, tlv.length ); - // Match the TLV tag to determine how to process the message. + // 根据 TLV 标签匹配以确定如何处理消息。 match tlv.tag { TAG_TASK_PLANNING => { - debug!("Processing task planning message"); - let topic = "task_planning"; + debug!("处理任务规划消息"); + let topic = crate::TOPIC_TELECOMMAND_TASK_PLANNING; Self::publish_message(&message_producer, topic, tlv.value).await?; - debug!( - "Successfully forwarded task planning message to topic: {}", - topic - ); + debug!("成功将任务规划消息转发到主题: {}", topic); } TAG_CLOUD_PLATFORM => { - debug!("Processing cloud platform message"); - let topic = "cloud_platform"; + debug!("处理云平台消息"); + let topic = "cloud_platform"; // 假设这个主题常量也将在 crate 级别定义 Self::publish_message(&message_producer, topic, tlv.value).await?; - debug!( - "Successfully forwarded cloud platform message to topic: {}", - topic - ); + debug!("成功将云平台消息转发到主题: {}", topic); } TAG_PROCESSING => { - debug!("Processing processing message"); - let topic = "processing"; + debug!("处理数据处理消息"); + let topic = crate::TOPIC_TELECOMMAND_PROCESSING; Self::publish_message(&message_producer, topic, tlv.value).await?; - debug!( - "Successfully forwarded processing message to topic: {}", - topic - ); + debug!("成功将数据处理消息转发到主题: {}", topic); } TAG_TASK_PARAMS => { - debug!("Processing task parameters message"); - let topic = "task_parameters"; + debug!("处理任务参数消息"); + let topic = crate::TOPIC_TELECOMMAND_TASK_PARAMS; Self::publish_message(&message_producer, topic, tlv.value).await?; - debug!( - "Successfully forwarded task parameters message to topic: {}", - topic - ); + debug!("成功将任务参数消息转发到主题: {}", topic); } - // TODO: Add cases for new TAG_USER_*_CLOUD_TO_OS tags if needed - // Handle unknown tags. + // TODO: 如果需要,为新的 TAG_USER_*_CLOUD_TO_OS 标签添加处理分支 + // 处理未知标签。 _ => { debug!( - "Received unknown TLV message with tag: 0x{:08x}, length: {}", + "收到未知 TLV 消息,标签: 0x{:08x},长度: {}", tlv.tag, tlv.length ); } @@ -265,43 +413,31 @@ impl OSSocketHandler

{ Ok(()) } - /// Publishes a message to the message bus. + /// 将消息发布到消息总线。 /// - /// # Arguments - /// * `message_producer`: The message bus producer. - /// * `topic`: The topic to publish the message to. - /// * `value`: The payload of the message (as raw bytes). + /// # 参数 + /// * `message_producer`: 消息总线生产者。 + /// * `topic`: 要发布消息的主题。 + /// * `value`: 消息的有效负载 (原始字节)。 async fn publish_message(message_producer: &Arc

, topic: &str, value: Vec) -> Result<()> { - debug!( - "Preparing to publish message to topic: {}, value length: {}", - topic, - value.len() - ); - // Create a new message for the event bus. - // The value is serialized to JSON. + debug!("准备向主题发布消息: {}, 值长度: {}", topic, value.len()); + // 为事件总线创建新消息。 + // 值被序列化为 JSON。 let message = Message::new( topic.to_string(), - NativeEventAction::Other, // Default action type. - None, // No specific source ID. - Some(serde_json::to_value(value)?), // Serialize the byte vector to a JSON value. - None, // No specific destination ID. + NativeEventAction::Other, // 默认操作类型。 + None, // 无特定源 ID。 + Some(serde_json::to_value(value)?), // 将字节向量序列化为 JSON 值。 + None, // 无特定目标 ID。 ); - log::debug!("OSSocketHandler Publishing message to topic: {}", topic); - // Publish the message using the provided producer. + log::debug!("OSSocketHandler 正在向主题发布消息: {}", topic); + // 使用提供的生产者发布消息。 if let Err(e) = message_producer.as_ref().publish(message).await { - log::error!( - "OSSocketHandler Failed to publish message to topic {}: {}", - topic, - e - ); - return Err(anyhow::anyhow!( - "Failed to publish message to topic {}: {}", - topic, - e - )); + log::error!("OSSocketHandler 向主题 {} 发布消息失败: {}", topic, e); + return Err(anyhow::anyhow!("向主题 {} 发布消息失败: {}", topic, e)); } - log::debug!("OSSocketHandler Successfully published message to topic: {}", topic); + log::debug!("OSSocketHandler 成功向主题 {} 发布消息。", topic); Ok(()) } } diff --git a/crates/os_socket_comms/src/lib.rs b/crates/os_socket_comms/src/lib.rs index 17cf7ea2c7ec275d734439a4483b961d2d906eae..a6dd60db197d6ed12f605b14b5926a91c6b42266 100644 --- a/crates/os_socket_comms/src/lib.rs +++ b/crates/os_socket_comms/src/lib.rs @@ -11,11 +11,10 @@ // Author: songpenglei, songpenglei@otcaix.iscas.ac.cn // Affiliation: Institute of Software, Chinese Academy of Sciences -// Modules for handling OS socket communication and TLV messaging. +// 用于处理 OS 套接字通信和 TLV 消息传递的模块。 pub mod handler; pub mod tlv; -pub use handler::fetch_local_telemetry_data; pub use handler::OSSocketHandler; pub use tlv::TLVMessage; @@ -26,81 +25,91 @@ use std::os::unix::net::UnixStream; use std::sync::Arc; use tokio::sync::Mutex; -/// Path to the Unix domain socket for communication with the OS. +/// 用于与 OS 通信的 Unix 域套接字路径。 pub const OS_SOCKET_PATH: &str = "/var/run/EulixOnBoardGuardian/os_fleet.sock"; -// Original TLV message tag definitions. -/// 0x03030100: Task Planning (OS -> Cloud) +// 原始 TLV 消息标签定义。 +/// 0x03030100: 任务规划 (OS -> Cloud) pub const TAG_TASK_PLANNING: u32 = 0x03030100; -/// 0x03040100: Cloud Platform (OS -> Cloud) +/// 0x03040100: 云平台 (OS -> Cloud) pub const TAG_CLOUD_PLATFORM: u32 = 0x03040100; -/// 0x03050100: Processing (OS -> Cloud) +/// 0x03050100: 处理 (OS -> Cloud) pub const TAG_PROCESSING: u32 = 0x03050100; -/// 0x05010100: Task Planning Parameters (OS -> Cloud) +/// 0x05010100: 任务规划参数 (OS -> Cloud) pub const TAG_TASK_PARAMS: u32 = 0x05010100; -// New Tags based on user request -// New TLV message tags based on user requirements. -/// 0x01030100: Task Planning (OS -> Cloud) +// 根据用户请求新增的标签 +// 根据用户需求新增的 TLV 消息标签。 +/// 0x01030100: 任务规划 (OS -> Cloud) pub const TAG_USER_TASK_PLANNING_OS_TO_CLOUD: u32 = 0x01030100; -/// 0x01030200: Task Planning (Cloud -> OS) +/// 0x01030200: 任务规划 (Cloud -> OS) pub const TAG_USER_TASK_PLANNING_CLOUD_TO_OS: u32 = 0x01030200; -/// 0x01040100: Cloud Platform (OS -> Cloud) +/// 0x01040100: 云平台 (OS -> Cloud) pub const TAG_USER_CLOUD_PLATFORM_OS_TO_CLOUD: u32 = 0x01040100; -/// 0x01040200: Cloud Platform (Cloud -> OS) +/// 0x01040200: 云平台 (Cloud -> OS) pub const TAG_USER_CLOUD_PLATFORM_CLOUD_TO_OS: u32 = 0x01040200; -/// 0x01050100: Processing (OS -> Cloud) +/// 0x01050100: 处理 (OS -> Cloud) pub const TAG_USER_PROCESSING_OS_TO_CLOUD: u32 = 0x01050100; -/// 0x01050200: Processing (Cloud -> OS) +/// 0x01050200: 处理 (Cloud -> OS) pub const TAG_USER_PROCESSING_CLOUD_TO_OS: u32 = 0x01050200; -/// OS Socket 服务器 -/// 负责与 OS 进行 Unix Domain Socket 通信 -/// Represents the OS Socket server, responsible for Unix Domain Socket communication with the OS. +// 主题列表 +/// 遥测数据主题 (telemetry topics) +pub const TOPIC_TELEMETRY_TASK_PLANNING: &str = "telemetry_task_planning"; +pub const TOPIC_TELEMETRY_PROCESSING: &str = "telemetry_processing"; + +/// 遥控指令主题 (telecommand topics) +pub const TOPIC_TELECOMMAND_TASK_PLANNING: &str = "telecommand_task_planning"; +pub const TOPIC_TELECOMMAND_PROCESSING: &str = "telecommand_processing"; +pub const TOPIC_TELECOMMAND_TASK_PARAMS: &str = "telecommand_task_params"; + +/// OS 套接字服务器 +/// 负责与 OS 进行 Unix 域套接字通信 +/// 代表 OS 套接字服务器,负责与 OS 的 Unix 域套接字通信。 #[derive(Clone)] pub struct OSSocketServer { socket_connection: Arc>, } impl OSSocketServer { - /// Creates a new instance of `OSSocketServer`. + /// 创建一个新的 `OSSocketServer` 实例。 /// - /// Attempts to connect to the OS socket with a specified number of retries. - /// Returns `Ok(Some(Self))` on successful connection, `Ok(None)` if connection fails after all retries, - /// or an `Err` if an unexpected error occurs. + /// 尝试以指定的重试次数连接到 OS 套接字。 + /// 成功连接则返回 `Ok(Some(Self))`,所有重试均失败则返回 `Ok(None)`, + /// 或在发生意外错误时返回 `Err`。 pub fn new() -> Result> { const MAX_RETRIES: u32 = 5; const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1); log::info!( - "Attempting to connect to OS socket at {} with up to {} retries.", + "尝试连接到 OS 套接字 {},最多重试 {} 次。", OS_SOCKET_PATH, MAX_RETRIES ); for attempt in 0..MAX_RETRIES { log::info!( - "OS socket connection attempt {}/{} to {}.", + "OS 套接字连接尝试 {}/{} 到 {}。", attempt + 1, MAX_RETRIES, OS_SOCKET_PATH ); match UnixStream::connect(OS_SOCKET_PATH) { Ok(socket) => { - log::info!("Successfully connected to OS socket: {}", OS_SOCKET_PATH); + log::info!("成功连接到 OS 套接字: {}", OS_SOCKET_PATH); return Ok(Some(Self { socket_connection: Arc::new(Mutex::new(socket)), })); } Err(e) => { log::warn!( - "Failed to connect to OS socket on attempt {}/{}: {}.", + "在第 {}/{} 次尝试连接 OS 套接字失败: {}。", attempt + 1, MAX_RETRIES, e ); if attempt < MAX_RETRIES - 1 { - log::info!("Retrying in {}s...", RETRY_DELAY.as_secs()); + log::info!("将在 {} 秒后重试...", RETRY_DELAY.as_secs()); std::thread::sleep(RETRY_DELAY); } } @@ -108,71 +117,74 @@ impl OSSocketServer { } log::error!( - "Failed to connect to OS socket {} after {} attempts. OS socket server will not start.", + "在 {} 次尝试后未能连接到 OS 套接字 {}。OS 套接字服务器将不会启动。", OS_SOCKET_PATH, MAX_RETRIES ); Ok(None) } - /// Reads data from the OS socket. + /// 从 OS 套接字读取数据。 /// - /// Returns a `Result` containing a `Vec` with the read data, - /// or an empty `Vec` if no data was read (e.g., connection closed by peer). - /// Returns an `Err` if a read error occurs. + /// 返回一个 `Result`,其中包含带有读取数据的 `Vec`, + /// 如果没有读取到数据(例如,连接被对方关闭),则返回一个空的 `Vec`。 + /// 如果发生读取错误,则返回 `Err`。 pub async fn read_data(&self) -> Result> { - let mut buffer = vec![0u8; 4096]; // Buffer to store read data. + let mut buffer = vec![0u8; 4096]; // 用于存储读取数据的缓冲区。 let mut socket = self.socket_connection.lock().await; match socket.read(&mut buffer) { - Ok(size) if size > 0 => Ok(buffer[..size].to_vec()), // Return the portion of the buffer with data. - Ok(_) => Ok(Vec::new()), // No data read, or 0 bytes read (connection closed). + Ok(size) if size > 0 => Ok(buffer[..size].to_vec()), // 返回包含数据的缓冲区部分。 + Ok(_) => Ok(Vec::new()), // 没有读取到数据,或读取到 0 字节 (连接关闭)。 Err(e) => { - error!("Error reading from OS socket: {}", e); - Err(anyhow::anyhow!("Failed to read from socket: {}", e)) + error!("从 OS 套接字读取数据时出错: {}", e); + Err(anyhow::anyhow!("从套接字读取失败: {}", e)) } } } - /// Sends a TLV (Tag-Length-Value) formatted message to the OS socket. + /// 向 OS 套接字发送 TLV (Tag-Length-Value) 格式的消息。 /// - /// # Arguments + /// # 参数 /// - /// * `tag`: The `u32` tag for the TLV message. - /// * `value`: A byte slice `&[u8]` representing the value of the TLV message. + /// * `tag`: TLV 消息的 `u32` 标签。 + /// * `value`: 表示 TLV 消息值的字节切片 `&[u8]`。 /// - /// # Returns + /// # 返回 /// - /// A `Result<()>` indicating success or failure. + /// 一个 `Result<()>`,指示成功或失败。 /// - /// # TLV Structure - /// The message is constructed as: - /// - Tag (4 bytes, Little Endian) - /// - Length (4 bytes, Little Endian, representing the length of the *value*). - /// The length field itself is calculated by taking the decimal string representation - /// of the actual payload length, and then parsing that string as a hexadecimal number. - /// - Value (variable length) + /// # TLV 结构 + /// 消息构造如下: + /// - 标签 (4 字节,小端序) + /// - 长度 (4 字节,小端序,表示 *值* 的长度)。 + /// 长度字段本身是通过获取实际有效负载长度的十进制字符串表示, + /// 然后将该字符串解析为十六进制数来计算的。 + /// - 值 (可变长度) pub async fn send_tlv_message(&self, tag: u32, value: &[u8]) -> Result<()> { let mut socket = self.socket_connection.lock().await; - // Convert tag to Little Endian bytes. + // 将标签转换为小端序字节。 let tag_bytes = tag.to_le_bytes(); - // Get the actual length of the payload. + // 获取有效负载的实际长度。 let actual_payload_len = value.len() as u32; - // Convert the actual payload length to its decimal string representation. + // 将实际有效负载长度转换为其十进制字符串表示。 let actual_payload_len_str = actual_payload_len.to_string(); - // Parse this decimal string representation as a hexadecimal number - // to get the intermediate value that needs to be sent as the length field. - // This is a specific requirement for the TLV protocol being used. + // 将此十进制字符串表示解析为十六进制数 + // 以获取需要作为长度字段发送的中间值。 + // 这是所使用的 TLV 协议的特定要求。 let intermediate_length_val_to_send = u32::from_str_radix(&actual_payload_len_str, 16) - .map_err(|e| anyhow::anyhow!( - "Failed to parse actual payload length string '{}' as a hex number for TLV length field: {}", - actual_payload_len_str, e - ))?; + .map_err(|e| { + anyhow::anyhow!( + "未能将实际有效负载长度字符串 '{}' 解析为 TLV 长度字段的十六进制数: {}", + actual_payload_len_str, + e + ) + })?; - // Convert the calculated length to Little Endian bytes. + // 将计算出的长度转换为小端序字节。 let length_bytes = intermediate_length_val_to_send.to_le_bytes(); debug!( @@ -180,13 +192,13 @@ impl OSSocketServer { tag, actual_payload_len, actual_payload_len_str, intermediate_length_val_to_send, length_bytes, value.len() ); - // Construct the message: Tag + Length + Value. - let mut message = Vec::with_capacity(8 + value.len()); // Pre-allocate capacity. - message.extend_from_slice(&tag_bytes); // Tag in Little Endian - message.extend_from_slice(&length_bytes); // Calculated length field in Little Endian - message.extend_from_slice(value); // The actual payload. + // 构造消息: 标签 + 长度 + 值。 + let mut message = Vec::with_capacity(8 + value.len()); // 预分配容量。 + message.extend_from_slice(&tag_bytes); // 小端序标签 + message.extend_from_slice(&length_bytes); // 小端序计算长度字段 + message.extend_from_slice(value); // 实际有效负载。 - // Send the entire message. + // 发送整个消息。 socket.write_all(&message)?; Ok(()) } @@ -196,53 +208,58 @@ impl OSSocketServer { mod tests { use super::*; - // Helper function to encapsulate the core TLV encoding logic for testing + // 用于测试的核心 TLV 编码逻辑的辅助函数 fn encode_tlv_parts(tag: u32, value: &[u8]) -> Result<(Vec, Vec, Vec), String> { let tag_bytes = tag.to_le_bytes().to_vec(); let actual_payload_len = value.len() as u32; let actual_payload_len_str = actual_payload_len.to_string(); let intermediate_length_val_to_send = u32::from_str_radix(&actual_payload_len_str, 16) - .map_err(|e| format!("Failed to parse actual payload length string '{}' as a hex number: {}", actual_payload_len_str, e))?; + .map_err(|e| { + format!( + "未能将实际有效负载长度字符串 '{}' 解析为十六进制数: {}", + actual_payload_len_str, e + ) + })?; let length_bytes = intermediate_length_val_to_send.to_le_bytes().to_vec(); - + Ok((tag_bytes, length_bytes, value.to_vec())) } #[test] fn test_send_tlv_message_encoding_logic() { let tag = 0x01020304; - let value = b"hello"; // 5 bytes + let value = b"hello"; // 5 字节 - // Expected encoding based on the logic in send_tlv_message: + // 基于 send_tlv_message 中逻辑的预期编码: // actual_payload_len = 5 // actual_payload_len_str = "5" // intermediate_length_val_to_send = u32::from_str_radix("5", 16).unwrap() = 0x05 - + let (tag_bytes, length_bytes, value_bytes) = encode_tlv_parts(tag, value).unwrap(); assert_eq!(tag_bytes, tag.to_le_bytes()); - + let expected_intermediate_len: u32 = 0x05; assert_eq!(length_bytes, expected_intermediate_len.to_le_bytes()); assert_eq!(value_bytes, value); - // Test with a different length - let value2 = b"0123456789ABCDEF"; // 16 bytes - // actual_payload_len = 16 - // actual_payload_len_str = "16" - // intermediate_length_val_to_send = u32::from_str_radix("16", 16).unwrap() = 0x16 + // 测试不同长度 + let value2 = b"0123456789ABCDEF"; // 16 字节 + // actual_payload_len = 16 + // actual_payload_len_str = "16" + // intermediate_length_val_to_send = u32::from_str_radix("16", 16).unwrap() = 0x16 let (tag_bytes2, length_bytes2, value_bytes2) = encode_tlv_parts(tag, value2).unwrap(); let expected_intermediate_len2: u32 = 0x16; assert_eq!(length_bytes2, expected_intermediate_len2.to_le_bytes()); assert_eq!(value_bytes2, value2); assert_eq!(tag_bytes2, tag.to_le_bytes()); - // Test with zero length value - let value3 = b""; // 0 bytes - // actual_payload_len = 0 - // actual_payload_len_str = "0" - // intermediate_length_val_to_send = u32::from_str_radix("0", 16).unwrap() = 0x00 + // 测试零长度值 + let value3 = b""; // 0 字节 + // actual_payload_len = 0 + // actual_payload_len_str = "0" + // intermediate_length_val_to_send = u32::from_str_radix("0", 16).unwrap() = 0x00 let (tag_bytes3, length_bytes3, value_bytes3) = encode_tlv_parts(tag, value3).unwrap(); let expected_intermediate_len3: u32 = 0x00; assert_eq!(length_bytes3, expected_intermediate_len3.to_le_bytes()); @@ -250,4 +267,3 @@ mod tests { assert_eq!(tag_bytes3, tag.to_le_bytes()); } } - diff --git a/crates/os_socket_comms/src/tlv.rs b/crates/os_socket_comms/src/tlv.rs index 02603aeed89b43291d9b266870093ee4a19b776f..011fa12c2b3da4bf599380c1b2fbf50ee4e1f860 100644 --- a/crates/os_socket_comms/src/tlv.rs +++ b/crates/os_socket_comms/src/tlv.rs @@ -12,57 +12,58 @@ // Affiliation: Institute of Software, Chinese Academy of Sciences use anyhow::Result; -use log::debug; // Corrected from log::debug to use the debug macro +use log::debug; -/// Represents a TLV (Tag-Length-Value) message. +/// 表示一个 TLV (Tag-Length-Value) 消息。 #[derive(Debug, Clone)] pub struct TLVMessage { - pub tag: u32, // The tag identifying the message type. - pub length: u32, // The length of the value part. - pub value: Vec, // The actual value (payload) of the message. + pub tag: u32, // 标识消息类型的标签。 + pub length: u32, // 值部分的长度。 + pub value: Vec, // 消息的实际值 (有效负载)。 } -/// Parses a byte slice into a `TLVMessage`. +/// 将字节切片解析为 `TLVMessage`。 /// -/// The TLV structure is assumed to be: -/// - Tag (4 bytes, Little Endian) -/// - Length (4 bytes, Little Endian). This is an *intermediate* length value. -/// - Value (variable length, determined by a transformation of the intermediate length) +/// TLV 结构假定为: +/// - 标签 (4 字节,小端序) +/// - 长度 (4 字节,小端序)。这是一个 *中间* 长度值。 +/// - 值 (可变长度,由中间长度的转换确定) /// -/// The actual length of the `value` is determined by a specific transformation: -/// 1. The 4-byte intermediate length is read as a Little Endian `u32`. -/// 2. This `u32` is formatted as a hexadecimal string (e.g., `0x1A` becomes `"1a"`). -/// 3. This hexadecimal string is then parsed as a *decimal* number to get the final actual length of the value. +/// `value` 的实际长度由特定的转换确定: +/// 1. 4 字节的中间长度作为小端序 `u32` 读取。 +/// 2. 此 `u32` 被格式化为十六进制字符串 (例如, `0x1A` 变为 `"1a"`)。 +/// 3. 然后此十六进制字符串被解析为 *十进制* 数以获得值的最终实际长度。 /// -/// # Arguments -/// * `data`: A byte slice `&[u8]` containing the raw TLV data. +/// # 参数 +/// * `data`: 包含原始 TLV 数据的字节切片 `&[u8]`。 /// -/// # Returns -/// A `Result` which is `Ok(TLVMessage)` on successful parsing, -/// or an `Err` if the data is malformed or too short. +/// # 返回 +/// 一个 `Result`,成功解析时为 `Ok(TLVMessage)`, +/// 如果数据格式错误或太短,则为 `Err`。 pub fn parse_tlv(data: &[u8]) -> Result { - // Check if the data is long enough to contain the tag and length fields (8 bytes). + // 检查数据是否足够长以包含标签和长度字段 (8 字节)。 if data.len() < 8 { - return Err(anyhow::anyhow!("Invalid TLV message: too short")); + return Err(anyhow::anyhow!("无效的 TLV 消息:太短")); } - // Parse the tag (first 4 bytes, Little Endian). + // 解析标签 (前 4 字节,小端序)。 let tag = u32::from_le_bytes(data[0..4].try_into()?); - // Parse the intermediate length value (next 4 bytes, Little Endian). - let intermediate_length_val = - u32::from_le_bytes(data[4..8].try_into().map_err(|e| { - anyhow::anyhow!("Failed to parse intermediate length bytes: {}", e) - })?); + // 解析中间长度值 (接下来的 4 字节,小端序)。 + let intermediate_length_val = u32::from_le_bytes( + data[4..8] + .try_into() + .map_err(|e| anyhow::anyhow!("解析中间长度字节失败: {}", e))?, + ); - // Convert the intermediate length value to its hexadecimal string representation. + // 将中间长度值转换为其十六进制字符串表示。 let hex_string_repr = format!("{:x}", intermediate_length_val); - // Parse the hexadecimal string representation as a decimal number to get the final actual length. - // This is a specific requirement of the TLV protocol being implemented. + // 将十六进制字符串表示解析为十进制数以获得最终的实际长度。 + // 这是正在实现的 TLV 协议的特定要求。 let final_actual_length = u32::from_str_radix(&hex_string_repr, 10).map_err(|e| { anyhow::anyhow!( - "Failed to parse hex string representation '{}' (from intermediate value 0x{:08x}) as a decimal number: {}", + "未能将十六进制字符串表示 '{}' (来自中间值 0x{:08x}) 解析为十进制数: {}", hex_string_repr, intermediate_length_val, e @@ -70,27 +71,27 @@ pub fn parse_tlv(data: &[u8]) -> Result { })?; debug!( - "parse_tlv: Intermediate length val 0x{:08x} -> hex string '{}' -> final actual length {}.", + "parse_tlv: 中间长度值 0x{:08x} -> 十六进制字符串 '{}' -> 最终实际长度 {}.", intermediate_length_val, hex_string_repr, final_actual_length ); - // Check if the data is long enough to contain the value based on the calculated final_actual_length. + // 根据计算出的 final_actual_length 检查数据是否足够长以包含值。 if data.len() < (8 + final_actual_length as usize) { return Err(anyhow::anyhow!( - "Invalid TLV message: incomplete value. Data len {} < 8 + final_actual_length ({}) = {}.", + "无效的 TLV 消息:值不完整。数据长度 {} < 8 + 最终实际长度 ({}) = {}.", data.len(), final_actual_length, 8 + final_actual_length as usize )); } - // Extract the value part of the message. + // 提取消息的值部分。 let value = data[8..(8 + final_actual_length as usize)].to_vec(); - // Construct and return the TLVMessage. + // 构造并返回 TLVMessage。 Ok(TLVMessage { tag, - length: final_actual_length, // Store the final, actual length. + length: final_actual_length, // 存储最终的实际长度。 value, }) } @@ -101,14 +102,14 @@ mod tests { #[test] fn test_parse_tlv_valid() { - // Simulates sending a value of decimal length 10. + // 模拟发送十进制长度为 10 的值。 // actual_payload_len = 10 // actual_payload_len_str = "10" - // intermediate_length_val_to_send = u32::from_str_radix("10", 16).unwrap() = 0x10 = 16 (decimal) + // intermediate_length_val_to_send = u32::from_str_radix("10", 16).unwrap() = 0x10 = 16 (十进制) // intermediate_length_val_bytes = 0x10000000 (LE) let tag_val: u32 = 0x01020304; - let value_payload = b"0123456789".to_vec(); // 10 bytes - let intermediate_length: u32 = 0x10; // This is what should be in the length field + let value_payload = b"0123456789".to_vec(); // 10 字节 + let intermediate_length: u32 = 0x10; // 这应该是长度字段中的内容 let mut data = Vec::new(); data.extend_from_slice(&tag_val.to_le_bytes()); @@ -130,7 +131,7 @@ mod tests { // intermediate_length_val_to_send = u32::from_str_radix("0", 16).unwrap() = 0x0 // intermediate_length_val_bytes = 0x00000000 (LE) let tag_val: u32 = 0xDEADBEEF; - let value_payload = Vec::new(); // 0 bytes + let value_payload = Vec::new(); // 0 字节 let intermediate_length: u32 = 0x0; let mut data = Vec::new(); @@ -148,19 +149,19 @@ mod tests { #[test] fn test_parse_tlv_data_too_short_for_header() { - let data = vec![0x01, 0x02, 0x03]; // Less than 8 bytes + let data = vec![0x01, 0x02, 0x03]; // 少于 8 字节 let result = parse_tlv(&data); assert!(result.is_err()); - assert_eq!(result.err().unwrap().to_string(), "Invalid TLV message: too short"); + assert_eq!(result.err().unwrap().to_string(), "无效的 TLV 消息:太短"); } #[test] fn test_parse_tlv_data_too_short_for_value() { // intermediate_length_val (0x10) -> hex_string "10" -> final_actual_length 10 - // Expects 10 bytes of value, but only provide 5 + // 期望 10 字节的值,但只提供 5 字节 let tag_val: u32 = 0x01020304; - let intermediate_length: u32 = 0x10; // Indicates final length of 10 - let value_payload_incomplete = b"01234".to_vec(); // 5 bytes + let intermediate_length: u32 = 0x10; // 表示最终长度为 10 + let value_payload_incomplete = b"01234".to_vec(); // 5 字节 let mut data = Vec::new(); data.extend_from_slice(&tag_val.to_le_bytes()); @@ -171,19 +172,19 @@ mod tests { assert!(result.is_err()); assert_eq!( result.err().unwrap().to_string(), - "Invalid TLV message: incomplete value. Data len 13 < 8 + final_actual_length (10) = 18." + "无效的 TLV 消息:值不完整。数据长度 13 < 8 + 最终实际长度 (10) = 18." ); } #[test] fn test_parse_tlv_complex_length() { - // actual_payload_len = 25 (decimal) + // actual_payload_len = 25 (十进制) // actual_payload_len_str = "25" - // intermediate_length_val_to_send = u32::from_str_radix("25", 16).unwrap() = 0x25 = 37 (decimal) + // intermediate_length_val_to_send = u32::from_str_radix("25", 16).unwrap() = 0x25 = 37 (十进制) // intermediate_length_val_bytes = 0x25000000 (LE) let tag_val: u32 = 0xCAFEBABE; - let value_payload = (0..25).map(|i| i as u8).collect::>(); // 25 bytes - let intermediate_length: u32 = 0x25; // This is what should be in the length field + let value_payload = (0..25).map(|i| i as u8).collect::>(); // 25 字节 + let intermediate_length: u32 = 0x25; // 这应该是长度字段中的内容 let mut data = Vec::new(); data.extend_from_slice(&tag_val.to_le_bytes()); @@ -191,33 +192,33 @@ mod tests { data.extend_from_slice(&value_payload); let result = parse_tlv(&data); - assert!(result.is_ok(), "Parsing failed: {:?}", result.err()); + assert!(result.is_ok(), "解析失败: {:?}", result.err()); let tlv_message = result.unwrap(); assert_eq!(tlv_message.tag, tag_val); assert_eq!(tlv_message.length, value_payload.len() as u32); assert_eq!(tlv_message.value, value_payload); } - #[test] + #[test] fn test_parse_tlv_invalid_hex_string_for_decimal_parse() { - // This test constructs a scenario where intermediate_length_val, when formatted as hex, - // results in a string that is not a valid decimal number if it contained non-digit hex chars. - // For example, if intermediate_length_val was 0x1A -> "1a" (hex string). - // u32::from_str_radix("1a", 10) would fail. + // 此测试构造了一种场景:intermediate_length_val 格式化为十六进制时, + // 得到的字符串如果包含非数字的十六进制字符,则不是有效的十进制数。 + // 例如, 如果 intermediate_length_val 是 0x1A -> "1a" (十六进制字符串)。 + // u32::from_str_radix("1a", 10) 将会失败。 let tag_val: u32 = 0xABADCAFE; - let intermediate_length_problematic: u32 = 0x1A; // -> "1a" as hex string + let intermediate_length_problematic: u32 = 0x1A; // -> 十六进制字符串 "1a" let value_payload = b"dummy".to_vec(); let mut data = Vec::new(); data.extend_from_slice(&tag_val.to_le_bytes()); data.extend_from_slice(&intermediate_length_problematic.to_le_bytes()); - data.extend_from_slice(&value_payload); // Actual value doesn't matter as parsing length should fail + data.extend_from_slice(&value_payload); // 实际值无关紧要,因为解析长度应该会失败 let result = parse_tlv(&data); assert!(result.is_err()); let err_msg = result.err().unwrap().to_string(); - // Expected error: "Failed to parse hex string representation '1a' (from intermediate value 0x0000001a) as a decimal number: invalid digit found in string" - assert!(err_msg.contains("Failed to parse hex string representation '1a'")); - assert!(err_msg.contains("as a decimal number")); + // 预期错误: "未能将十六进制字符串表示 '1a' (来自中间值 0x0000001a) 解析为十进制数: 无效数字" + assert!(err_msg.contains("未能将十六进制字符串表示 '1a'")); + assert!(err_msg.contains("解析为十进制数")); } -} \ No newline at end of file +} diff --git a/src/cores/servers/mod.rs b/src/cores/servers/mod.rs index 47a72d3c7047b59994fb252bba28e625d2fe22fa..8fc3749794f46a487f76b01e6a9f44e9c76fa429 100644 --- a/src/cores/servers/mod.rs +++ b/src/cores/servers/mod.rs @@ -5,6 +5,7 @@ use crate::cores::state::AppState; use async_trait::async_trait; pub use message::MessagingServer; use os_socket_comms::handler; +use std::sync::Arc; #[async_trait] pub trait Server: Send + Sync + Unpin + 'static { @@ -20,7 +21,7 @@ impl Server for OSSocketServerManager { match handler::OSSocketHandler::new_with_default_server(app_state.message_cli.clone()) { Ok(Some(handler)) => { log::info!("OSSocketHandler (from os_socket_comms) created successfully, starting handler."); - handler.start().await; + Arc::new(handler).start().await; } Ok(None) => { log::warn!("OSSocketHandler::new_with_default_server returned None, OS socket service will not start. This typically means the OS socket file was not found."); diff --git a/tests/datamgr_test.rs b/tests/datamgr_test.rs index ff932238e99d9ddea86ebfcdfe12480bfb0da8eb..d29e202f466777da6c7ca5dddfeb7de510117d0d 100644 --- a/tests/datamgr_test.rs +++ b/tests/datamgr_test.rs @@ -1,7 +1,5 @@ use actix_web::rt::Runtime; -use fleet_apiserver::cores::plugin::PluginManager; -use fleet_apiserver::datamgr_route::datamgr_api; -use fleet_apiserver::datamgr_route::route; +use fleet_apiserver::cores::handlers::datamgr::datamgr_api; use reqwest::header::{HeaderMap, HeaderValue}; use reqwest::Client; use serde_json::Value; @@ -24,33 +22,40 @@ extern "C" {} #[tokio::test] async fn test_datamgr() -> Result<(), Box> { unsafe { - let plugin_manager = Arc::new(PluginManager::new()); - let data_plugin_manager = Arc::new(Mutex::new(SafePtr(datamgr_api::NewPluginManager()))); + // let plugin_manager = Arc::new(PluginManager::new()); + let data_plugin_manager_ptr = datamgr_api::NewPluginManager(); + let data_plugin_manager = Arc::new(Mutex::new(SafePtr(data_plugin_manager_ptr))); // portal-plugin.so 及 storage-plugin.so 的位置 let plugin_file_path = std::ffi::CString::new("/root/project/fleet-datamgr/plugins/release").unwrap(); - let plugin_manager = plugin_manager.clone(); - let data_plugin_manager = Arc::clone(&data_plugin_manager); + // let plugin_manager = plugin_manager.clone(); + let data_plugin_manager_clone = Arc::clone(&data_plugin_manager); let server_handle = thread::spawn(move || { let rt = Runtime::new().unwrap(); rt.block_on(async { - datamgr_api::LoadPluginsFromDirectory( - data_plugin_manager.lock().unwrap().0, - plugin_file_path.as_ptr(), - ); - route::init_plugin( - plugin_manager.clone(), - data_plugin_manager.lock().unwrap().0, - ); - if let Err(e) = - fleet_apiserver::start_server(DATABASE_URL, ADDRESS, plugin_manager).await - { - eprintln!("Failed to start server: {}", e); + // Use LoadPlugins instead of LoadPluginsFromDirectory + // Assuming LoadPlugins now knows the directory implicitly or it's set elsewhere. + // The original plugin_file_path is available if LoadPlugins needs it, but its signature is just (ptr). + datamgr_api::LoadPlugins(data_plugin_manager_clone.lock().unwrap().0); + + // route::init_plugin is not found, commenting out. Its role needs clarification. + // route::init_plugin( + // data_plugin_manager_clone.lock().unwrap().0, + // ); + + // The main fleet_apiserver::start_server expects ServerStartParams. + // The datamgr_api.rs provides its own StartApiServer for the FFI plugin manager. + let address_cstring = std::ffi::CString::new(ADDRESS).unwrap(); + let result = datamgr_api::StartApiServer(data_plugin_manager_clone.lock().unwrap().0, address_cstring.as_ptr()); + if result != 0 { // Assuming 0 is success for C API + eprintln!("Failed to start datamgr API server, result: {}", result); } - datamgr_api::UnloadAllPlugins(data_plugin_manager.lock().unwrap().0); - datamgr_api::DeletePluginManager(data_plugin_manager.lock().unwrap().0); + + // Use UnloadPlugins instead of UnloadAllPlugins + datamgr_api::UnloadPlugins(data_plugin_manager_clone.lock().unwrap().0); + datamgr_api::DeletePluginManager(data_plugin_manager_clone.lock().unwrap().0); }); }); @@ -125,7 +130,7 @@ async fn test_datamgr() -> Result<(), Box> { .headers(headers.clone()) .send() .await?; - let body = res.bytes().await?; + let body: Vec = res.bytes().await?.to_vec(); if let Ok(text) = std::str::from_utf8(&body) { println!("{}", text); } diff --git a/tests/network_status_test.rs b/tests/network_status_test.rs index 2077db8db8f6dac85aa35d78485f5431cae43597..2afee579c7aae9b47e1c09a7b09b9067b7e69abf 100644 --- a/tests/network_status_test.rs +++ b/tests/network_status_test.rs @@ -87,6 +87,7 @@ async fn setup_test_app_state() -> Arc { .build()).await), cluster_id: "test-cluster".to_string(), token_secret: vec![1, 2, 3, 4], + network_status_config: cores::state::NetworkStatusConfig::default(), }) }