diff --git a/Cargo.toml b/Cargo.toml index 1eb6c7d7c5ab5153536d2742f8b77f272003e545..d13e29f025a0edaff03cdbac47493c924d032d1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,26 +18,47 @@ required-features = ["eventbus", "servers", "test"] #name = "api_generator" #required-features = [] +[workspace] +members = ["crates/os_socket_comms"] + + +[workspace.dependencies] +feventbus = { git = "https://gitee.com/iscas-system/eventbus.git" } +anyhow = "1.0.94" +async-trait = "0.1.74" +log = "0.4.22" +tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync"] } +serde_json = "1.0.127" +chrono = { version = "0.4.38", features = ["serde"] } + +[workspace.package] +version = "0.4.0" +license = "Apache-2.0" +edition = "2021" + [features] -default = ["eventbus", "servers", "messaging"] +default = ["eventbus", "servers", "messaging", "os_socket"] eventbus = ["feventbus"] servers = ["utils", "eventbus", "messaging", "diesel", "diesel_migrations", "actix-http", "actix-web"] utils = [] test = ["utils", "eventbus"] messaging = ["eventbus"] +os_socket = ["os_socket_comms"] [dependencies] +os_socket_comms = { path = "crates/os_socket_comms", optional = true } feventbus = { git = "https://gitee.com/iscas-system/eventbus.git", optional = true } #fleetmodv2 = { path = "../fleetmodv2" } fleetmodv2 = { git = "https://gitee.com/iscas-system/fleetmodv2.git" } #client-rust = { path = "../client-rust" } client-rust = { git = "https://gitee.com/iscas-system/client-rust" } +consensus_kv = { git = "https://gitee.com/iscas-system/consensus-kv" } + r2d2 = "0.8.10" diesel = { version = "2.2.0", features = ["sqlite", "postgres", "r2d2"], optional = true } diesel_migrations = { version = "2.2.0", optional = true } actix-http = { version = "3.9.0", optional = true } actix-web = { version = "4.9.0", optional = true } - async-trait = "0.1.74" env_logger = "0.11.5" serde = { version = "1.0.209", features = ["derive"] } @@ -70,4 +91,3 @@ jsonwebtoken = "9.2" rand = "0.8" futures-util = "0.3.31" serialport = "=4.6.1" -consensus_kv = { git = "https://gitee.com/iscas-system/consensus-kv" } \ No newline at end of file diff --git a/crates/os_socket_comms/Cargo.toml b/crates/os_socket_comms/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..5ee2319e19fc7b1bf8d13ce00dd9fe7fb4170ccd --- /dev/null +++ b/crates/os_socket_comms/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "os_socket_comms" +version = { workspace = true } +edition = { workspace = true } + +[dependencies] +feventbus = { workspace = true } +anyhow = { workspace = true } +async-trait = { workspace = true } +log = { workspace = true } +tokio = { workspace = true } +serde_json = { workspace = true } +chrono = { workspace = true } \ No newline at end of file diff --git a/crates/os_socket_comms/README_zh.md b/crates/os_socket_comms/README_zh.md new file mode 100644 index 0000000000000000000000000000000000000000..aed7abef72b1f942724bda8bf8f501b0ac2bd823 --- /dev/null +++ b/crates/os_socket_comms/README_zh.md @@ -0,0 +1,135 @@ +# os_socket_comms 包 + +## 概述 + +`os_socket_comms` 包提供了一个 Rust 接口,用于通过 Unix 域套接字与操作系统级组件建立和管理通信。它设计用于发送和接收根据特定标签-长度-值 (TLV) 协议格式化的消息。该包包括以下功能: + +* 建立并维护到预定义 Unix 域套接字路径的连接。 +* 如果初始连接不成功,则重试连接。 +* 向操作系统组件发送 TLV 格式的消息。 +* 从操作系统组件接收和解析 TLV 格式的消息。 +* 处理接收到的消息并将其转发到消息总线系统。 +* 定期向操作系统组件发送遥测数据。 + +## 关键组件 + +* **`OSSocketServer`**: + * 管理低级 Unix 域套接字连接 (`/var/run/EulixOnBoardGuardian/os_fleet.sock`)。 + * 处理带重试的连接尝试。 + * 提供 `read_data()` 方法以异步从套接字读取原始字节数据。 + * 提供 `send_tlv_message(tag: u32, value: &[u8])` 来构造和发送 TLV 消息。 + +* **`OSSocketHandler`**: + * 一个使用 `OSSocketServer` 的高级处理器。 + * 它泛型于 `P`,`P` 必须实现 `feventbus::traits::producer::Producer`。 + * 持续从套接字读取数据,使用 `tlv::parse_tlv` 将其解析为 `TLVMessage`。 + * 根据消息标签将其转发到消息总线上的不同主题。 + * 启动一个单独的异步任务以定期发送遥测数据(使用 `fetch_local_telemetry_data` 作为数据源,`TAG_USER_CLOUD_PLATFORM_OS_TO_CLOUD` 作为标签)。 + +* **`TLVMessage`**: + * 一个表示已解析 TLV 消息的结构体: + ```rust + pub struct TLVMessage { + pub tag: u32, + pub length: u32, // 值的实际长度 + pub value: Vec, + } + ``` + +* **`tlv::parse_tlv(data: &[u8]) -> Result`**: + * 将原始字节切片解析为 `TLVMessage`。 + +* **常量**: + * `OS_SOCKET_PATH`:定义 Unix 域套接字的路径。 + * 各种 `TAG_*` 常量定义了 TLV 协议中使用的 `u32` 标签(例如, `TAG_TASK_PLANNING`, `TAG_USER_CLOUD_PLATFORM_OS_TO_CLOUD`)。 + +## TLV 协议详情 + +该包实现了一种特定的 TLV (标签-长度-值) 消息格式: + +1. **标签 (Tag)**:4 字节,小端序 (`u32`)。 +2. **长度 (Length)**:4 字节,小端序 (`u32`)。此字段存储一个*中间*长度值。 + * **解码实际长度**:要获取 `value` 字段的实际长度,会读取 `intermediate_length_val`,将其格式化为十六进制字符串(例如,`0x1A` 变成字符串 `"1a"`),然后此字符串被解析为*十进制*数。因此,如果长度字段包含 `0x10`(十进制 16),它将被视为十六进制字符串 `"10"`,然后解析为十进制数 `10`。因此,`value` 的长度将为 10 字节。 + * **编码实际长度**:要发送具有特定 `actual_payload_len` 的 `value` 消息,`actual_payload_len` 会被转换为其十进制字符串表示(例如,`10` 变成 `"10"`)。然后,此字符串被解析为*十六进制*数以获得 `intermediate_length_val_to_send`(例如,`"10"` 解析为十六进制后变成 `0x10` 或十进制 `16`)。然后,这个 `intermediate_length_val_to_send` 作为长度字段写入套接字。 +3. **值 (Value)**:一个字节序列,其长度由解码后的实际长度确定。 + +## 使用示例 + +```rust +use os_socket_comms::{OSSocketHandler, OSSocketServer}; +use feventbus::Arc; // 假设 feventbus::Arc 用于生产者 +use feventbus::mem::MemoryProducer; // 示例生产者 + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // 初始化日志 (例如,使用 env_logger 或 tracing_subscriber) + env_logger::init(); + + // 1. 创建一个消息生产者 (示例使用内存生产者) + let message_producer = Arc::new(MemoryProducer::new(100)); + + // 2. 尝试创建一个 OSSocketServer 实例 + // OSSocketServer::new() 将尝试连接到 OS_SOCKET_PATH。 + // 它返回 Result>。 + match OSSocketServer::new()? { + Some(socket_server) => { + // 3. 创建 OSSocketHandler + // OSSocketHandler::new 也返回 Result> + match OSSocketHandler::new(message_producer.clone(), Some(socket_server))? { + Some(handler) => { + // 4. 启动处理器 + // 这将为读取消息和发送遥测数据启动任务。 + handler.start().await; + log::info!(\"OSSocketHandler 已启动.\"); + + // 处理器现在将在后台运行。 + // 在此处添加应用程序逻辑,或保持主线程活动。 + // 例如,无限期等待或等待特定信号。 + tokio::signal::ctrl_c().await?; // 等待 Ctrl+C + log::info!(\"正在关闭.\"); + } + None => { + log::error!(\"无法创建 OSSocketHandler (如果 server 是 Some,则不应发生这种情况).\"); + } + } + } + None => { + log::error!(\"无法连接到 OS 套接字。OSSocketServer 无法初始化.\"); + // 处理 OS 套接字不可用的情况。 + // 应用程序可能会以功能受限的方式继续运行或退出。 + } + } + Ok(()) +} +``` + +### 关于遥测数据的占位符 + +函数 `fetch_local_telemetry_data()` 当前是一个占位符。在实际应用中,应实现此函数以从系统收集实际的遥测数据。 + +## 构建和测试 + +构建包: +```sh +cargo build +``` + +运行测试: +```sh +cargo test +``` +测试包括对 `tlv.rs` 中的 TLV 解析逻辑和 `lib.rs` 中使用的 TLV 编码逻辑的检查。 + +## 依赖项 + +主要依赖项包括: +* `tokio`: 用于异步运行时。 +* `anyhow`: 用于灵活的错误处理。 +* `log`: 用于日志记录。 +* `feventbus`: 用于消息总线集成 (特别是 `Producer` 特性)。 +* `chrono`: 用于占位符遥测函数中的时间戳。 +* `serde_json`: `OSSocketHandler` 用它来序列化事件总线的消息负载。 + +## 许可证 + +此包根据 Mulan PSL v2 许可证授权。有关更多详细信息,请参阅源文件中的版权声明。 \ No newline at end of file diff --git a/crates/os_socket_comms/src/handler.rs b/crates/os_socket_comms/src/handler.rs new file mode 100644 index 0000000000000000000000000000000000000000..f8e1bfb1ff608ab1223f79646e539fe2a99cf0d5 --- /dev/null +++ b/crates/os_socket_comms/src/handler.rs @@ -0,0 +1,307 @@ +// Copyright (c) 2025 Institute of Software, Chinese Academy of Sciences +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PSL v2 for more details. + +// Author: songpenglei, songpenglei@otcaix.iscas.ac.cn +// Affiliation: Institute of Software, Chinese Academy of Sciences + +use crate::{ + OSSocketServer, TLVMessage, TAG_CLOUD_PLATFORM, TAG_PROCESSING, TAG_TASK_PARAMS, + TAG_TASK_PLANNING, TAG_USER_CLOUD_PLATFORM_CLOUD_TO_OS +}; +// The AppState dependency will be removed/refactored in the next step. +// For now, this line would cause a compile error if this crate is built alone. +// use crate::cores::state::AppState; // This needs to be changed + +use anyhow::Result; +use chrono::Utc; +use feventbus::message::{Message, NativeEventAction}; +use feventbus::traits::producer::Producer; +use log::{debug, error, info}; +use std::sync::Arc; +use tokio::time::{sleep, Duration}; // For placeholder telemetry data + +/// Handles messages received from the OS socket and sends telemetry data. +/// Generic over `P` which must implement the `Producer` trait for message bus integration. +pub struct OSSocketHandler { + // Made Producer generic + socket_server: OSSocketServer, // The server instance for OS socket communication. + // app_state: Arc, // To be replaced + message_producer: Arc

, // New field for message bus producer. Used to send received messages to the bus. + // Other specific dependencies can be added here if needed +} + +/// Fetches local telemetry data. +/// This is a placeholder function and should be replaced with actual telemetry data gathering logic. +/// Currently, it simulates fetching data by creating a timestamped payload. +pub async fn fetch_local_telemetry_data() -> Result> { + let timestamp = Utc::now().to_rfc3339(); + let payload = format!("telemetry_update_{}", timestamp); + info!("Simulated fetching local telemetry data: {}", payload); + Ok(payload.into_bytes()) +} + +impl OSSocketHandler

{ + /// 创建新的 OS Socket 处理器实例 + /// Creates a new instance of `OSSocketHandler`. + /// + /// # Arguments + /// * `message_producer`: An `Arc

` where `P` is a message bus producer. + /// * `socket_server_opt`: An `Option`. If `None`, the handler won't be created. + /// + /// # Returns + /// `Result>`: `Ok(Some(Self))` if successful, `Ok(None)` if `socket_server_opt` is `None`. + pub fn new( + message_producer: Arc

, + socket_server_opt: Option, + ) -> Result> { + match socket_server_opt { + Some(server) => Ok(Some(Self { + socket_server: server, + message_producer, + })), + None => Ok(None), + } + } + + /// Creates a new `OSSocketHandler` with a default `OSSocketServer` instance. + /// + /// This constructor attempts to create an `OSSocketServer` internally. + /// # Arguments + /// * `message_producer`: An `Arc

` where `P` is a message bus producer. + /// + /// # Returns + /// `Result>`: `Ok(Some(Self))` if the server is successfully created, `Ok(None)` otherwise. + pub fn new_with_default_server(message_producer: Arc

) -> Result> { + match OSSocketServer::new()? { + Some(server) => Ok(Some(Self { + socket_server: server, + message_producer, + })), + None => Ok(None), + } + } + + /// Starts the handler, which involves two main tasks: + /// 1. Spawning a task to continuously read and process messages from the OS socket. + /// 2. Spawning a task to periodically send telemetry data to the OS socket. + pub async fn start(&self) { + let server_for_reader = self.socket_server.clone(); + // let app_state_for_reader = self.app_state.clone(); // Removed + let message_producer_for_handler = self.message_producer.clone(); + + // Spawn a task to handle incoming messages from the OS socket. + tokio::spawn(async move { + log::info!("OSSocketHandler: Read loop started for OS socket."); + loop { + match server_for_reader.read_data().await { + Ok(data) => { + log::debug!( + "OSSocketHandler: server.read_data() returned {} bytes.", + data.len() + ); + if !data.is_empty() { + // Attempt to parse the received data as a TLV message. + match crate::tlv::parse_tlv(&data) { + Ok(tlv) => { + log::debug!("OSSocketHandler: Successfully parsed TLV message with tag 0x{:08x}, length {}", tlv.tag, tlv.length); + // Pass message_producer instead of full app_state + // Handle the parsed TLV message. + if let Err(e) = Self::handle_tlv_message( + message_producer_for_handler.clone(), + tlv, + ) + .await + { + error!( + "OSSocketHandler: Error handling TLV message: {}", + e + ); + } + } + Err(parse_err) => { + // Log an error if TLV parsing fails. + let display_data_len = std::cmp::min(100, data.len()); + error!("OSSocketHandler: Failed to parse TLV message: {}. Raw data (first {} bytes): {:x?}", parse_err, display_data_len, &data[..display_data_len]); + } + } + } else { + // If no data is read, it might indicate the peer closed the connection. + log::info!("OSSocketHandler: server.read_data() returned 0 bytes. Peer has likely closed the connection. Stopping read loop."); + break; // Exit the loop. + } + } + Err(e) => { + // Log an error if reading from the socket fails, then retry after a delay. + error!("OSSocketHandler: Error in server.read_data(): {}. Will retry after delay.", e); + sleep(Duration::from_secs(1)).await; + } + } + } + log::info!("OSSocketHandler: Read loop terminated."); + }); + + // Spawn a task to periodically send telemetry data. + let server_for_sender = self.socket_server.clone(); + tokio::spawn(async move { + Self::start_telemetry_sender(server_for_sender).await; + }); + } + + /// Starts the telemetry data sender loop. + /// + /// This function periodically fetches local telemetry data and sends it + /// to the OS socket using a specific TLV tag. + async fn start_telemetry_sender(socket_server: OSSocketServer) { + info!("OSSocketHandler: Telemetry sender loop started."); + let telemetry_interval = Duration::from_secs(10); // Interval for sending telemetry. + + loop { + sleep(telemetry_interval).await; + info!("OSSocketHandler: Time to send telemetry data."); + + // Fetch the local telemetry data. + match fetch_local_telemetry_data().await { + Ok(telemetry_payload) => { + if telemetry_payload.is_empty() { + debug!("OSSocketHandler: No telemetry data to send."); + continue; // Skip if no data. + } + info!( + "OSSocketHandler: Sending telemetry data ({} bytes) with tag 0x{:08X}", + telemetry_payload.len(), + TAG_USER_CLOUD_PLATFORM_CLOUD_TO_OS // Use the designated tag for telemetry. + ); + // Send the telemetry data as a TLV message. + match socket_server + .send_tlv_message(TAG_USER_CLOUD_PLATFORM_CLOUD_TO_OS, &telemetry_payload) + .await + { + Ok(()) => { + info!( + "OSSocketHandler: Successfully sent telemetry data to OS socket." + ); + } + Err(e) => { + error!("OSSocketHandler: Failed to send telemetry data: {}", e); + } + } + } + Err(e) => { + error!( + "OSSocketHandler: Failed to fetch local telemetry data: {}", + e + ); + } + } + } + } + + /// Handles a received TLV message based on its tag. + /// + /// Depending on the tag, the message value is published to a specific topic on the message bus. + /// # Arguments + /// * `message_producer`: The message bus producer. + /// * `tlv`: The `TLVMessage` to handle. + async fn handle_tlv_message(message_producer: Arc

, tlv: TLVMessage) -> Result<()> { + // let message_client = app_state.message_cli.clone(); // Changed + debug!( + "Received TLV message with tag: 0x{:08x}, length: {}", + tlv.tag, tlv.length + ); + + // Match the TLV tag to determine how to process the message. + match tlv.tag { + TAG_TASK_PLANNING => { + debug!("Processing task planning message"); + let topic = "task_planning"; + Self::publish_message(&message_producer, topic, tlv.value).await?; + debug!( + "Successfully forwarded task planning message to topic: {}", + topic + ); + } + TAG_CLOUD_PLATFORM => { + debug!("Processing cloud platform message"); + let topic = "cloud_platform"; + Self::publish_message(&message_producer, topic, tlv.value).await?; + debug!( + "Successfully forwarded cloud platform message to topic: {}", + topic + ); + } + TAG_PROCESSING => { + debug!("Processing processing message"); + let topic = "processing"; + Self::publish_message(&message_producer, topic, tlv.value).await?; + debug!( + "Successfully forwarded processing message to topic: {}", + topic + ); + } + TAG_TASK_PARAMS => { + debug!("Processing task parameters message"); + let topic = "task_parameters"; + Self::publish_message(&message_producer, topic, tlv.value).await?; + debug!( + "Successfully forwarded task parameters message to topic: {}", + topic + ); + } + // TODO: Add cases for new TAG_USER_*_CLOUD_TO_OS tags if needed + // Handle unknown tags. + _ => { + debug!( + "Received unknown TLV message with tag: 0x{:08x}, length: {}", + tlv.tag, tlv.length + ); + } + } + Ok(()) + } + + /// Publishes a message to the message bus. + /// + /// # Arguments + /// * `message_producer`: The message bus producer. + /// * `topic`: The topic to publish the message to. + /// * `value`: The payload of the message (as raw bytes). + async fn publish_message(message_producer: &Arc

, topic: &str, value: Vec) -> Result<()> { + debug!( + "Preparing to publish message to topic: {}, value length: {}", + topic, + value.len() + ); + // Create a new message for the event bus. + // The value is serialized to JSON. + let message = Message::new( + topic.to_string(), + NativeEventAction::Other, // Default action type. + None, // No specific source ID. + Some(serde_json::to_value(value)?), // Serialize the byte vector to a JSON value. + None, // No specific destination ID. + ); + + log::debug!("OSSocketHandler Publishing message to topic: {}", topic); + // Publish the message using the provided producer. + if let Err(e) = message_producer.as_ref().publish(message).await { + log::error!( + "OSSocketHandler Failed to publish message to topic {}: {}", + topic, + e + ); + return Err(anyhow::anyhow!( + "Failed to publish message to topic {}: {}", + topic, + e + )); + } + log::debug!("OSSocketHandler Successfully published message to topic: {}", topic); + Ok(()) + } +} diff --git a/src/cores/servers/os_socket/mod.rs b/crates/os_socket_comms/src/lib.rs similarity index 32% rename from src/cores/servers/os_socket/mod.rs rename to crates/os_socket_comms/src/lib.rs index 238e7d34b56621f39417553a3b28ac92713ccbd8..17cf7ea2c7ec275d734439a4483b961d2d906eae 100644 --- a/src/cores/servers/os_socket/mod.rs +++ b/crates/os_socket_comms/src/lib.rs @@ -11,89 +11,63 @@ // Author: songpenglei, songpenglei@otcaix.iscas.ac.cn // Affiliation: Institute of Software, Chinese Academy of Sciences +// Modules for handling OS socket communication and TLV messaging. pub mod handler; +pub mod tlv; + +pub use handler::fetch_local_telemetry_data; +pub use handler::OSSocketHandler; +pub use tlv::TLVMessage; use anyhow::Result; -use async_trait::async_trait; -use log::error; +use log::{debug, error}; use std::io::{Read, Write}; use std::os::unix::net::UnixStream; use std::sync::Arc; use tokio::sync::Mutex; -use crate::cores::state::AppState; - -use super::Server; - -/// OS Socket 服务器管理器 -/// 负责创建和管理 OS Socket 处理器的生命周期 -pub struct OSSocketServerManager; - -#[async_trait] -impl Server for OSSocketServerManager { - async fn start(&self, app_state: AppState) { - log::info!("Attempting to start OS Socket Server Manager..."); - match handler::OSSocketHandler::new(std::sync::Arc::new(app_state)) { - Ok(Some(handler)) => { - log::info!("OSSocketHandler created successfully, starting handler."); - handler.start().await; - } - Ok(None) => { - log::warn!("OSSocketHandler::new returned None, OS socket service will not start. This typically means the OS socket file was not found."); - } - Err(e) => { - log::error!("Failed to create OSSocketHandler: {}. OS socket service will not start.", e); - } - } - } -} - -/// Unix domain socket 路径,用于与 OS 通信 +/// Path to the Unix domain socket for communication with the OS. pub const OS_SOCKET_PATH: &str = "/var/run/EulixOnBoardGuardian/os_fleet.sock"; -/// TLV 消息标签定义 -/// 0x03030100: 任务规划(OS->云) +// Original TLV message tag definitions. +/// 0x03030100: Task Planning (OS -> Cloud) pub const TAG_TASK_PLANNING: u32 = 0x03030100; -/// 0x03040100: 云平台(OS->云) +/// 0x03040100: Cloud Platform (OS -> Cloud) pub const TAG_CLOUD_PLATFORM: u32 = 0x03040100; -/// 0x03050100: 处理(OS->云) +/// 0x03050100: Processing (OS -> Cloud) pub const TAG_PROCESSING: u32 = 0x03050100; -/// 0x05010100: 任务规划工参(OS->云) +/// 0x05010100: Task Planning Parameters (OS -> Cloud) pub const TAG_TASK_PARAMS: u32 = 0x05010100; -/// TLV (Tag-Length-Value) 消息结构 -/// 用于与 OS 进行通信的消息格式 -#[derive(Debug, Clone)] -pub struct TLVMessage { - /// 消息标签,用于标识消息类型 - pub tag: u32, - /// 消息值的长度 - pub length: u32, - /// 消息的具体内容 - pub value: Vec, -} +// New Tags based on user request +// New TLV message tags based on user requirements. +/// 0x01030100: Task Planning (OS -> Cloud) +pub const TAG_USER_TASK_PLANNING_OS_TO_CLOUD: u32 = 0x01030100; +/// 0x01030200: Task Planning (Cloud -> OS) +pub const TAG_USER_TASK_PLANNING_CLOUD_TO_OS: u32 = 0x01030200; +/// 0x01040100: Cloud Platform (OS -> Cloud) +pub const TAG_USER_CLOUD_PLATFORM_OS_TO_CLOUD: u32 = 0x01040100; +/// 0x01040200: Cloud Platform (Cloud -> OS) +pub const TAG_USER_CLOUD_PLATFORM_CLOUD_TO_OS: u32 = 0x01040200; +/// 0x01050100: Processing (OS -> Cloud) +pub const TAG_USER_PROCESSING_OS_TO_CLOUD: u32 = 0x01050100; +/// 0x01050200: Processing (Cloud -> OS) +pub const TAG_USER_PROCESSING_CLOUD_TO_OS: u32 = 0x01050200; /// OS Socket 服务器 /// 负责与 OS 进行 Unix Domain Socket 通信 +/// Represents the OS Socket server, responsible for Unix Domain Socket communication with the OS. +#[derive(Clone)] pub struct OSSocketServer { - /// Unix domain socket 连接,使用 Arc 实现线程安全 socket_connection: Arc>, } -impl Clone for OSSocketServer { - fn clone(&self) -> Self { - Self { - socket_connection: self.socket_connection.clone(), - } - } -} - impl OSSocketServer { - /// 创建新的 OS Socket 服务器实例 - /// - /// # 返回值 - /// - `Result>`: 成功返回服务器实例,失败返回错误 - /// - 如果 socket 文件不存在或连接多次失败,返回 Ok(None) + /// Creates a new instance of `OSSocketServer`. + /// + /// Attempts to connect to the OS socket with a specified number of retries. + /// Returns `Ok(Some(Self))` on successful connection, `Ok(None)` if connection fails after all retries, + /// or an `Err` if an unexpected error occurs. pub fn new() -> Result> { const MAX_RETRIES: u32 = 5; const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1); @@ -138,24 +112,21 @@ impl OSSocketServer { OS_SOCKET_PATH, MAX_RETRIES ); - Ok(None) // Indicate that the server could not be started after all retries + Ok(None) } - /// 从 socket 读取数据 - /// - /// # 返回值 - /// - `Result>`: 成功返回读取的数据,失败返回错误 + /// Reads data from the OS socket. + /// + /// Returns a `Result` containing a `Vec` with the read data, + /// or an empty `Vec` if no data was read (e.g., connection closed by peer). + /// Returns an `Err` if a read error occurs. pub async fn read_data(&self) -> Result> { - let mut buffer = vec![0u8; 4096]; + let mut buffer = vec![0u8; 4096]; // Buffer to store read data. let mut socket = self.socket_connection.lock().await; - + match socket.read(&mut buffer) { - Ok(size) if size > 0 => { - Ok(buffer[..size].to_vec()) - } - Ok(_) => { - Ok(Vec::new()) - } + Ok(size) if size > 0 => Ok(buffer[..size].to_vec()), // Return the portion of the buffer with data. + Ok(_) => Ok(Vec::new()), // No data read, or 0 bytes read (connection closed). Err(e) => { error!("Error reading from OS socket: {}", e); Err(anyhow::anyhow!("Failed to read from socket: {}", e)) @@ -163,68 +134,59 @@ impl OSSocketServer { } } - /// 解析 TLV 格式的消息 - /// - /// # 参数 - /// - `data`: 原始字节数据 - /// - /// # 返回值 - /// - `Result`: 成功返回解析后的消息,失败返回错误 - pub fn parse_tlv(data: &[u8]) -> Result { - if data.len() < 8 { - return Err(anyhow::anyhow!("Invalid TLV message: too short")); - } - - let tag = u32::from_le_bytes(data[0..4].try_into()?); - - let intermediate_length_val = u32::from_le_bytes(data[4..8].try_into().map_err(|e| anyhow::anyhow!("Failed to parse intermediate length bytes: {}", e))?); - - let hex_string_repr = format!("{:x}", intermediate_length_val); - - let final_actual_length = u32::from_str_radix(&hex_string_repr, 10).map_err(|e| - anyhow::anyhow!( - "Failed to parse hex string representation '{}' (from intermediate value 0x{:08x}) as a decimal number: {}", - hex_string_repr, - intermediate_length_val, - e - ) - )?; - - log::debug!( - "parse_tlv: Intermediate length val 0x{:08x} -> hex string '{}' -> final actual length {}.", - intermediate_length_val, hex_string_repr, final_actual_length - ); - - if data.len() < (8 + final_actual_length as usize) { - return Err(anyhow::anyhow!( - "Invalid TLV message: incomplete value. Data len {} < 8 + final_actual_length ({}) = {}.", - data.len(), - final_actual_length, - 8 + final_actual_length as usize - )); - } - - let value = data[8..(8 + final_actual_length as usize)].to_vec(); - - Ok(TLVMessage { tag, length: final_actual_length, value }) - } - - /// 发送 TLV 格式的消息到 OS - /// - /// # 参数 - /// - `tag`: 消息标签 - /// - `value`: 消息内容 - /// - /// # 返回值 - /// - `Result<()>`: 成功返回空,失败返回错误 + /// Sends a TLV (Tag-Length-Value) formatted message to the OS socket. + /// + /// # Arguments + /// + /// * `tag`: The `u32` tag for the TLV message. + /// * `value`: A byte slice `&[u8]` representing the value of the TLV message. + /// + /// # Returns + /// + /// A `Result<()>` indicating success or failure. + /// + /// # TLV Structure + /// The message is constructed as: + /// - Tag (4 bytes, Little Endian) + /// - Length (4 bytes, Little Endian, representing the length of the *value*). + /// The length field itself is calculated by taking the decimal string representation + /// of the actual payload length, and then parsing that string as a hexadecimal number. + /// - Value (variable length) pub async fn send_tlv_message(&self, tag: u32, value: &[u8]) -> Result<()> { let mut socket = self.socket_connection.lock().await; - let mut message = Vec::with_capacity(8 + value.len()); - message.extend_from_slice(&tag.to_be_bytes()); - message.extend_from_slice(&(value.len() as u32).to_be_bytes()); - message.extend_from_slice(value); + // Convert tag to Little Endian bytes. + let tag_bytes = tag.to_le_bytes(); + + // Get the actual length of the payload. + let actual_payload_len = value.len() as u32; + // Convert the actual payload length to its decimal string representation. + let actual_payload_len_str = actual_payload_len.to_string(); + + // Parse this decimal string representation as a hexadecimal number + // to get the intermediate value that needs to be sent as the length field. + // This is a specific requirement for the TLV protocol being used. + let intermediate_length_val_to_send = u32::from_str_radix(&actual_payload_len_str, 16) + .map_err(|e| anyhow::anyhow!( + "Failed to parse actual payload length string '{}' as a hex number for TLV length field: {}", + actual_payload_len_str, e + ))?; + + // Convert the calculated length to Little Endian bytes. + let length_bytes = intermediate_length_val_to_send.to_le_bytes(); + + debug!( + "send_tlv_message: tag=0x{:08X} (LE), actual_payload_len={}, len_str='{}', intermediate_len_val=0x{:08X} (LE bytes: {:02X?}), value_len={}", + tag, actual_payload_len, actual_payload_len_str, intermediate_length_val_to_send, length_bytes, value.len() + ); + + // Construct the message: Tag + Length + Value. + let mut message = Vec::with_capacity(8 + value.len()); // Pre-allocate capacity. + message.extend_from_slice(&tag_bytes); // Tag in Little Endian + message.extend_from_slice(&length_bytes); // Calculated length field in Little Endian + message.extend_from_slice(value); // The actual payload. + // Send the entire message. socket.write_all(&message)?; Ok(()) } @@ -234,56 +196,58 @@ impl OSSocketServer { mod tests { use super::*; - #[test] - fn test_tlv_message_parsing() { - // 测试有效 TLV 消息的解析 - let tag = TAG_TASK_PLANNING; - let value = b"test message"; - let mut data = Vec::new(); - data.extend_from_slice(&tag.to_be_bytes()); - data.extend_from_slice(&(value.len() as u32).to_be_bytes()); - data.extend_from_slice(value); - - let tlv = OSSocketServer::parse_tlv(&data).unwrap(); - assert_eq!(tlv.tag, tag); - assert_eq!(tlv.length, value.len() as u32); - assert_eq!(tlv.value, value); - - // 测试消息太短的情况 - let invalid_data = vec![0u8; 4]; - assert!(OSSocketServer::parse_tlv(&invalid_data).is_err()); + // Helper function to encapsulate the core TLV encoding logic for testing + fn encode_tlv_parts(tag: u32, value: &[u8]) -> Result<(Vec, Vec, Vec), String> { + let tag_bytes = tag.to_le_bytes().to_vec(); - // 测试消息值不完整的情况 - let mut invalid_data = Vec::new(); - invalid_data.extend_from_slice(&tag.to_be_bytes()); - invalid_data.extend_from_slice(&(100u32).to_be_bytes()); // 长度大于实际数据 - assert!(OSSocketServer::parse_tlv(&invalid_data).is_err()); + let actual_payload_len = value.len() as u32; + let actual_payload_len_str = actual_payload_len.to_string(); + let intermediate_length_val_to_send = u32::from_str_radix(&actual_payload_len_str, 16) + .map_err(|e| format!("Failed to parse actual payload length string '{}' as a hex number: {}", actual_payload_len_str, e))?; + let length_bytes = intermediate_length_val_to_send.to_le_bytes().to_vec(); + + Ok((tag_bytes, length_bytes, value.to_vec())) } #[test] - fn test_tlv_message_construction() { - // 测试 TLV 消息的构建 - let tag = TAG_CLOUD_PLATFORM; - let value = b"cloud data"; - let mut expected = Vec::new(); - expected.extend_from_slice(&tag.to_be_bytes()); - expected.extend_from_slice(&(value.len() as u32).to_be_bytes()); - expected.extend_from_slice(value); - - let mut actual = Vec::with_capacity(8 + value.len()); - actual.extend_from_slice(&tag.to_be_bytes()); - actual.extend_from_slice(&(value.len() as u32).to_be_bytes()); - actual.extend_from_slice(value); - - assert_eq!(actual, expected); - } + fn test_send_tlv_message_encoding_logic() { + let tag = 0x01020304; + let value = b"hello"; // 5 bytes + + // Expected encoding based on the logic in send_tlv_message: + // actual_payload_len = 5 + // actual_payload_len_str = "5" + // intermediate_length_val_to_send = u32::from_str_radix("5", 16).unwrap() = 0x05 + + let (tag_bytes, length_bytes, value_bytes) = encode_tlv_parts(tag, value).unwrap(); - #[test] - fn test_tlv_message_tags() { - // 测试所有预定义的 TLV 标签值 - assert_eq!(TAG_TASK_PLANNING, 0x03030100); - assert_eq!(TAG_CLOUD_PLATFORM, 0x03040100); - assert_eq!(TAG_PROCESSING, 0x03050100); - assert_eq!(TAG_TASK_PARAMS, 0x05010100); + assert_eq!(tag_bytes, tag.to_le_bytes()); + + let expected_intermediate_len: u32 = 0x05; + assert_eq!(length_bytes, expected_intermediate_len.to_le_bytes()); + assert_eq!(value_bytes, value); + + // Test with a different length + let value2 = b"0123456789ABCDEF"; // 16 bytes + // actual_payload_len = 16 + // actual_payload_len_str = "16" + // intermediate_length_val_to_send = u32::from_str_radix("16", 16).unwrap() = 0x16 + let (tag_bytes2, length_bytes2, value_bytes2) = encode_tlv_parts(tag, value2).unwrap(); + let expected_intermediate_len2: u32 = 0x16; + assert_eq!(length_bytes2, expected_intermediate_len2.to_le_bytes()); + assert_eq!(value_bytes2, value2); + assert_eq!(tag_bytes2, tag.to_le_bytes()); + + // Test with zero length value + let value3 = b""; // 0 bytes + // actual_payload_len = 0 + // actual_payload_len_str = "0" + // intermediate_length_val_to_send = u32::from_str_radix("0", 16).unwrap() = 0x00 + let (tag_bytes3, length_bytes3, value_bytes3) = encode_tlv_parts(tag, value3).unwrap(); + let expected_intermediate_len3: u32 = 0x00; + assert_eq!(length_bytes3, expected_intermediate_len3.to_le_bytes()); + assert_eq!(value_bytes3, value3); + assert_eq!(tag_bytes3, tag.to_le_bytes()); } } + diff --git a/crates/os_socket_comms/src/tlv.rs b/crates/os_socket_comms/src/tlv.rs new file mode 100644 index 0000000000000000000000000000000000000000..02603aeed89b43291d9b266870093ee4a19b776f --- /dev/null +++ b/crates/os_socket_comms/src/tlv.rs @@ -0,0 +1,223 @@ +// Copyright (c) 2025 Institute of Software, Chinese Academy of Sciences +// fleet-datamgr is licensed under Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PSL v2 for more details. + +// Author: songpenglei, songpenglei@otcaix.iscas.ac.cn +// Affiliation: Institute of Software, Chinese Academy of Sciences + +use anyhow::Result; +use log::debug; // Corrected from log::debug to use the debug macro + +/// Represents a TLV (Tag-Length-Value) message. +#[derive(Debug, Clone)] +pub struct TLVMessage { + pub tag: u32, // The tag identifying the message type. + pub length: u32, // The length of the value part. + pub value: Vec, // The actual value (payload) of the message. +} + +/// Parses a byte slice into a `TLVMessage`. +/// +/// The TLV structure is assumed to be: +/// - Tag (4 bytes, Little Endian) +/// - Length (4 bytes, Little Endian). This is an *intermediate* length value. +/// - Value (variable length, determined by a transformation of the intermediate length) +/// +/// The actual length of the `value` is determined by a specific transformation: +/// 1. The 4-byte intermediate length is read as a Little Endian `u32`. +/// 2. This `u32` is formatted as a hexadecimal string (e.g., `0x1A` becomes `"1a"`). +/// 3. This hexadecimal string is then parsed as a *decimal* number to get the final actual length of the value. +/// +/// # Arguments +/// * `data`: A byte slice `&[u8]` containing the raw TLV data. +/// +/// # Returns +/// A `Result` which is `Ok(TLVMessage)` on successful parsing, +/// or an `Err` if the data is malformed or too short. +pub fn parse_tlv(data: &[u8]) -> Result { + // Check if the data is long enough to contain the tag and length fields (8 bytes). + if data.len() < 8 { + return Err(anyhow::anyhow!("Invalid TLV message: too short")); + } + + // Parse the tag (first 4 bytes, Little Endian). + let tag = u32::from_le_bytes(data[0..4].try_into()?); + + // Parse the intermediate length value (next 4 bytes, Little Endian). + let intermediate_length_val = + u32::from_le_bytes(data[4..8].try_into().map_err(|e| { + anyhow::anyhow!("Failed to parse intermediate length bytes: {}", e) + })?); + + // Convert the intermediate length value to its hexadecimal string representation. + let hex_string_repr = format!("{:x}", intermediate_length_val); + + // Parse the hexadecimal string representation as a decimal number to get the final actual length. + // This is a specific requirement of the TLV protocol being implemented. + let final_actual_length = u32::from_str_radix(&hex_string_repr, 10).map_err(|e| { + anyhow::anyhow!( + "Failed to parse hex string representation '{}' (from intermediate value 0x{:08x}) as a decimal number: {}", + hex_string_repr, + intermediate_length_val, + e + ) + })?; + + debug!( + "parse_tlv: Intermediate length val 0x{:08x} -> hex string '{}' -> final actual length {}.", + intermediate_length_val, hex_string_repr, final_actual_length + ); + + // Check if the data is long enough to contain the value based on the calculated final_actual_length. + if data.len() < (8 + final_actual_length as usize) { + return Err(anyhow::anyhow!( + "Invalid TLV message: incomplete value. Data len {} < 8 + final_actual_length ({}) = {}.", + data.len(), + final_actual_length, + 8 + final_actual_length as usize + )); + } + + // Extract the value part of the message. + let value = data[8..(8 + final_actual_length as usize)].to_vec(); + + // Construct and return the TLVMessage. + Ok(TLVMessage { + tag, + length: final_actual_length, // Store the final, actual length. + value, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_tlv_valid() { + // Simulates sending a value of decimal length 10. + // actual_payload_len = 10 + // actual_payload_len_str = "10" + // intermediate_length_val_to_send = u32::from_str_radix("10", 16).unwrap() = 0x10 = 16 (decimal) + // intermediate_length_val_bytes = 0x10000000 (LE) + let tag_val: u32 = 0x01020304; + let value_payload = b"0123456789".to_vec(); // 10 bytes + let intermediate_length: u32 = 0x10; // This is what should be in the length field + + let mut data = Vec::new(); + data.extend_from_slice(&tag_val.to_le_bytes()); + data.extend_from_slice(&intermediate_length.to_le_bytes()); + data.extend_from_slice(&value_payload); + + let result = parse_tlv(&data); + assert!(result.is_ok()); + let tlv_message = result.unwrap(); + assert_eq!(tlv_message.tag, tag_val); + assert_eq!(tlv_message.length, value_payload.len() as u32); + assert_eq!(tlv_message.value, value_payload); + } + + #[test] + fn test_parse_tlv_empty_value() { + // actual_payload_len = 0 + // actual_payload_len_str = "0" + // intermediate_length_val_to_send = u32::from_str_radix("0", 16).unwrap() = 0x0 + // intermediate_length_val_bytes = 0x00000000 (LE) + let tag_val: u32 = 0xDEADBEEF; + let value_payload = Vec::new(); // 0 bytes + let intermediate_length: u32 = 0x0; + + let mut data = Vec::new(); + data.extend_from_slice(&tag_val.to_le_bytes()); + data.extend_from_slice(&intermediate_length.to_le_bytes()); + data.extend_from_slice(&value_payload); + + let result = parse_tlv(&data); + assert!(result.is_ok()); + let tlv_message = result.unwrap(); + assert_eq!(tlv_message.tag, tag_val); + assert_eq!(tlv_message.length, 0); + assert_eq!(tlv_message.value, value_payload); + } + + #[test] + fn test_parse_tlv_data_too_short_for_header() { + let data = vec![0x01, 0x02, 0x03]; // Less than 8 bytes + let result = parse_tlv(&data); + assert!(result.is_err()); + assert_eq!(result.err().unwrap().to_string(), "Invalid TLV message: too short"); + } + + #[test] + fn test_parse_tlv_data_too_short_for_value() { + // intermediate_length_val (0x10) -> hex_string "10" -> final_actual_length 10 + // Expects 10 bytes of value, but only provide 5 + let tag_val: u32 = 0x01020304; + let intermediate_length: u32 = 0x10; // Indicates final length of 10 + let value_payload_incomplete = b"01234".to_vec(); // 5 bytes + + let mut data = Vec::new(); + data.extend_from_slice(&tag_val.to_le_bytes()); + data.extend_from_slice(&intermediate_length.to_le_bytes()); + data.extend_from_slice(&value_payload_incomplete); + + let result = parse_tlv(&data); + assert!(result.is_err()); + assert_eq!( + result.err().unwrap().to_string(), + "Invalid TLV message: incomplete value. Data len 13 < 8 + final_actual_length (10) = 18." + ); + } + + #[test] + fn test_parse_tlv_complex_length() { + // actual_payload_len = 25 (decimal) + // actual_payload_len_str = "25" + // intermediate_length_val_to_send = u32::from_str_radix("25", 16).unwrap() = 0x25 = 37 (decimal) + // intermediate_length_val_bytes = 0x25000000 (LE) + let tag_val: u32 = 0xCAFEBABE; + let value_payload = (0..25).map(|i| i as u8).collect::>(); // 25 bytes + let intermediate_length: u32 = 0x25; // This is what should be in the length field + + let mut data = Vec::new(); + data.extend_from_slice(&tag_val.to_le_bytes()); + data.extend_from_slice(&intermediate_length.to_le_bytes()); + data.extend_from_slice(&value_payload); + + let result = parse_tlv(&data); + assert!(result.is_ok(), "Parsing failed: {:?}", result.err()); + let tlv_message = result.unwrap(); + assert_eq!(tlv_message.tag, tag_val); + assert_eq!(tlv_message.length, value_payload.len() as u32); + assert_eq!(tlv_message.value, value_payload); + } + + #[test] + fn test_parse_tlv_invalid_hex_string_for_decimal_parse() { + // This test constructs a scenario where intermediate_length_val, when formatted as hex, + // results in a string that is not a valid decimal number if it contained non-digit hex chars. + // For example, if intermediate_length_val was 0x1A -> "1a" (hex string). + // u32::from_str_radix("1a", 10) would fail. + let tag_val: u32 = 0xABADCAFE; + let intermediate_length_problematic: u32 = 0x1A; // -> "1a" as hex string + let value_payload = b"dummy".to_vec(); + + let mut data = Vec::new(); + data.extend_from_slice(&tag_val.to_le_bytes()); + data.extend_from_slice(&intermediate_length_problematic.to_le_bytes()); + data.extend_from_slice(&value_payload); // Actual value doesn't matter as parsing length should fail + + let result = parse_tlv(&data); + assert!(result.is_err()); + let err_msg = result.err().unwrap().to_string(); + // Expected error: "Failed to parse hex string representation '1a' (from intermediate value 0x0000001a) as a decimal number: invalid digit found in string" + assert!(err_msg.contains("Failed to parse hex string representation '1a'")); + assert!(err_msg.contains("as a decimal number")); + } +} \ No newline at end of file diff --git a/src/cores/mod.rs b/src/cores/mod.rs index ab8f29f0b2804edbd390f5ecaa134e3f565d6e43..8cef0f9fa9d6635df64a363a946f6c4b979d878a 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -4,6 +4,7 @@ use consensus_kv::raft_config::{self, RaftConfig}; use std::iter::zip; use std::{collections::BTreeMap, ffi::CString}; + pub mod daemons; #[cfg(feature = "servers")] pub mod handlers; @@ -110,10 +111,12 @@ pub async fn start_server(params: ServerStartParams) -> anyhow::Result<()> { Some(Box::new(servers::MessagingServer)); // 启动 os socket server,如果 socket 文件不存在,将跳过启动 + #[cfg(feature = "os_socket")] let os_socket_server: Option> = - Some(Box::new(servers::os_socket::OSSocketServerManager)); + Some(Box::new(servers::OSSocketServerManager)); // Start OS Socket Server independently + #[cfg(feature = "os_socket")] if let Some(server_instance) = os_socket_server { log::info!("Preparing to spawn task for OS Socket Server (standalone)."); let app_state_os_socket = app_state.clone(); diff --git a/src/cores/servers/mod.rs b/src/cores/servers/mod.rs index 5fc2912aaf800ac780d92db6ef4d4205db8b0db4..47a72d3c7047b59994fb252bba28e625d2fe22fa 100644 --- a/src/cores/servers/mod.rs +++ b/src/cores/servers/mod.rs @@ -1,12 +1,33 @@ pub mod actix_web; pub mod message; -pub mod os_socket; use crate::cores::state::AppState; use async_trait::async_trait; pub use message::MessagingServer; +use os_socket_comms::handler; #[async_trait] pub trait Server: Send + Sync + Unpin + 'static { async fn start(&self, app_state: AppState); +} + +pub struct OSSocketServerManager; + +#[async_trait] +impl Server for OSSocketServerManager { + async fn start(&self, app_state: AppState) { + log::info!("Attempting to start OS Socket Server Manager (using os_socket_comms handler)..."); + match handler::OSSocketHandler::new_with_default_server(app_state.message_cli.clone()) { + Ok(Some(handler)) => { + log::info!("OSSocketHandler (from os_socket_comms) created successfully, starting handler."); + handler.start().await; + } + Ok(None) => { + log::warn!("OSSocketHandler::new_with_default_server returned None, OS socket service will not start. This typically means the OS socket file was not found."); + } + Err(e) => { + log::error!("Failed to create OSSocketHandler: {}. OS socket service will not start.", e); + } + } + } } \ No newline at end of file diff --git a/src/cores/servers/os_socket/README.md b/src/cores/servers/os_socket/README.md deleted file mode 100644 index a5779f77db4f2ea80affd923eb3a713152f79f79..0000000000000000000000000000000000000000 --- a/src/cores/servers/os_socket/README.md +++ /dev/null @@ -1,93 +0,0 @@ -# OS Socket 模块 - -该模块实现了与操作系统(OS)之间的 Unix Domain Socket 通信功能,用于接收和处理来自 OS 的 TLV(Tag-Length-Value)格式消息。 - -## 组件结构 - -模块包含三个主要组件: - -1. `OSSocketServerManager`(在 `servers/mod.rs` 中) - - 作为服务器管理器,实现 `Server` trait - - 负责创建和管理 `OSSocketHandler` 的生命周期 - - 在应用启动时初始化 socket 通信 - -2. `OSSocketServer`(在 `os_socket/mod.rs` 中) - - 负责底层的 socket 通信 - - 管理 socket 连接 - - 提供 TLV 消息的读写功能 - -3. `OSSocketHandler`(在 `os_socket/handler.rs` 中) - - 处理从 OS 接收到的消息 - - 负责消息的解析和转发 - - 与消息总线系统集成 - -## 组件关系 - -``` -OSSocketServerManager - │ - ├── 创建和管理 - │ - └── OSSocketHandler - │ - ├── 使用 - │ - └── OSSocketServer - │ - └── 处理底层 socket 通信 -``` - -## 功能特性 - -- 通过 Unix Domain Socket 与 OS 建立通信 -- 支持 TLV 消息格式的解析和处理 -- 自动检查 socket 文件存在性 -- 消息转发到消息总线系统 -- 异步处理机制 - -## 消息类型 - -模块支持以下 TLV 消息标签: - -| 标签值 | 描述 | 用途 | -|--------|------|------| -| 0x03030100 | 任务规划 | OS 向云平台发送任务规划数据 | -| 0x03040100 | 云平台数据 | OS 向云平台发送数据 | -| 0x03050100 | 处理数据 | OS 向云平台发送处理数据 | -| 0x05010100 | 任务参数 | OS 向云平台发送任务参数 | - -## 使用方法 - -```rust -// 创建服务器管理器 -let server_manager = OSSocketServerManager; - -// 启动服务器 -server_manager.start(app_state).await; -``` - -## 消息总线主题 - -- `task_planning`: 用于转发任务规划数据 -- `cloud_platform`: 用于转发云平台数据 -- `processing`: 用于转发处理数据 -- `task_parameters`: 用于转发任务参数数据 - -## 错误处理 - -- 自动处理 socket 连接错误 -- 消息解析错误处理 -- 消息转发错误处理 - -## 注意事项 - -1. 确保 socket 文件路径正确(默认为 `/var/run/EulixOnBoardGuardian/os_fleet.sock`) -2. 需要适当的文件系统权限 -3. 消息处理是异步的,不会阻塞主线程 - -## 依赖项 - -- tokio: 异步运行时 -- anyhow: 错误处理 -- log: 日志记录 -- feventbus: 消息总线系统 diff --git a/src/cores/servers/os_socket/handler.rs b/src/cores/servers/os_socket/handler.rs deleted file mode 100644 index 78d49cfee0c4bcec30f6b26a6af67024e64e0b2e..0000000000000000000000000000000000000000 --- a/src/cores/servers/os_socket/handler.rs +++ /dev/null @@ -1,176 +0,0 @@ -// Copyright (c) 2025 Institute of Software, Chinese Academy of Sciences -// fleet-datamgr is licensed under Mulan PSL v2. -// You can use this software according to the terms and conditions of the Mulan PSL v2. -// You may obtain a copy of Mulan PSL v2 at: -// http://license.coscl.org.cn/MulanPSL2 -// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -// See the Mulan PSL v2 for more details. - -// Author: songpenglei, songpenglei@otcaix.iscas.ac.cn -// Affiliation: Institute of Software, Chinese Academy of Sciences - -use crate::cores::servers::os_socket::{ - OSSocketServer, TLVMessage, TAG_CLOUD_PLATFORM, TAG_PROCESSING, TAG_TASK_PARAMS, - TAG_TASK_PLANNING, -}; -use crate::cores::state::AppState; -use anyhow::Result; -use feventbus::message::{Message, NativeEventAction}; -use feventbus::traits::producer::Producer; -use log::{debug, error}; -use std::sync::Arc; - -/// OS Socket 消息处理器 -/// 负责处理从 OS 接收到的 TLV 消息,并将其转发到相应的消息总线主题 -pub struct OSSocketHandler { - /// OS Socket 服务器实例 - socket_server: OSSocketServer, - /// 应用程序状态,包含消息总线客户端等 - app_state: Arc, -} - -impl OSSocketHandler { - /// 创建新的 OS Socket 处理器实例 - /// - /// # 参数 - /// - `app_state`: 应用程序状态 - /// - /// # 返回值 - /// - `Result>`: 成功返回处理器实例,如果 socket 文件不存在则返回 None - pub fn new(app_state: Arc) -> Result> { - match OSSocketServer::new()? { - Some(server) => Ok(Some(Self { - socket_server: server, - app_state, - })), - None => Ok(None), - } - } - - /// 启动处理器,开始处理来自 OS 的消息 - /// - /// 该方法会启动一个新的异步任务,持续监听和处理 socket 消息 - /// 如果 socket 服务器未初始化,则不会启动处理任务 - pub async fn start(&self) { - let server = self.socket_server.clone(); - let app_state = self.app_state.clone(); - - tokio::spawn(async move { - log::info!("OSSocketHandler: Read loop started for OS socket."); - loop { - match server.read_data().await { - Ok(data) => { - log::debug!("OSSocketHandler: server.read_data() returned {} bytes.", data.len()); - if !data.is_empty() { - match OSSocketServer::parse_tlv(&data) { - Ok(tlv) => { - log::debug!("OSSocketHandler: Successfully parsed TLV message with tag 0x{:08x}, length {}", tlv.tag, tlv.length); - if let Err(e) = Self::handle_tlv_message(&app_state, tlv).await { - error!("OSSocketHandler: Error handling TLV message: {}", e); - } - } - Err(parse_err) => { - let display_data_len = std::cmp::min(100, data.len()); - // Using {:x?} for a hex dump style for byte vectors if default debug isn't clear - error!("OSSocketHandler: Failed to parse TLV message: {}. Raw data (first {} bytes): {:x?}", parse_err, display_data_len, &data[..display_data_len]); - } - } - } else { - // This case (Ok(empty_data)) means read_data() in OSSocketServer received Ok(0) from socket.read(), indicating EOF. - log::info!("OSSocketHandler: server.read_data() returned 0 bytes. Peer has likely closed the connection. Stopping read loop."); - break; // Exit the loop as the connection is considered closed. - } - } - Err(e) => { // This error is from server.read_data() itself (e.g., an I/O error on the socket) - error!("OSSocketHandler: Error in server.read_data(): {}. Will retry after delay.", e); - // The original sleep was here, let's keep it if errors from read_data are potentially recoverable - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - } - } - log::info!("OSSocketHandler: Read loop terminated."); - }); - } - - /// 处理 TLV 消息 - /// - /// 根据消息标签将消息转发到相应的消息总线主题: - /// - 任务规划消息 -> "task_planning" 主题 - /// - 云平台消息 -> "cloud_platform" 主题 - /// - 处理消息 -> "processing" 主题 - /// - 任务参数消息 -> "task_parameters" 主题 - /// - /// # 参数 - /// - `app_state`: 应用程序状态 - /// - `tlv`: 要处理的 TLV 消息 - /// - /// # 返回值 - /// - `Result<()>`: 成功返回空,失败返回错误 - async fn handle_tlv_message(app_state: &AppState, tlv: TLVMessage) -> Result<()> { - let message_client = app_state.message_cli.clone(); - debug!("Received TLV message with tag: 0x{:08x}, length: {}", tlv.tag, tlv.length); - - match tlv.tag { - TAG_TASK_PLANNING => { - debug!("Processing task planning message"); - let topic = "task_planning"; - Self::publish_message(&message_client, topic, tlv.value).await?; - debug!("Successfully forwarded task planning message to topic: {}", topic); - } - TAG_CLOUD_PLATFORM => { - debug!("Processing cloud platform message"); - let topic = "cloud_platform"; - // TODO: 云平台接收到自己的遥控指令后,需要将遥控指令应用到云平台配置中 - Self::publish_message(&message_client, topic, tlv.value).await?; - debug!("Successfully forwarded cloud platform message to topic: {}", topic); - } - TAG_PROCESSING => { - debug!("Processing processing message"); - let topic = "processing"; - Self::publish_message(&message_client, topic, tlv.value).await?; - debug!("Successfully forwarded processing message to topic: {}", topic); - } - TAG_TASK_PARAMS => { - debug!("Processing task parameters message"); - let topic = "task_parameters"; - Self::publish_message(&message_client, topic, tlv.value).await?; - debug!("Successfully forwarded task parameters message to topic: {}", topic); - } - _ => { - debug!("Received unknown TLV message with tag: 0x{:08x}, length: {}", tlv.tag, tlv.length); - } - } - Ok(()) - } - - /// 发布消息到消息总线 - /// - /// # 参数 - /// - `message_client`: 消息总线客户端 - /// - `topic`: 目标主题 - /// - `value`: 消息内容 - /// - /// # 返回值 - /// - `Result<()>`: 成功返回空,失败返回错误 - async fn publish_message(message_client: &Arc

, topic: &str, value: Vec) -> Result<()> { - debug!("Preparing to publish message to topic: {}, value length: {}", topic, value.len()); - let message = Message::new( - topic.to_string(), - NativeEventAction::Other, - None, - Some(serde_json::to_value(value)?), - None, - ); - - log::debug!("OSSocketHandler Publishing message to topic: {}", topic); - if let Err(e) = message_client.as_ref().publish(message).await { - log::error!("OSSocketHandler Failed to publish message to topic {}: {}", topic, e); - return Err(anyhow::anyhow!("Failed to publish message to topic {}: {}", topic, e)); - } - log::debug!("OSSocketHandler Successfully published message to topic: {}", topic); - Ok(()) - } -} - diff --git a/src/lib.rs b/src/lib.rs index f1705c80e0606f58b783f793bb45e8f7d3e83e33..951deb1265788495547d650025140a3f45c62051 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,3 +17,6 @@ pub mod middleware; pub use cores::daemons::messaging; #[cfg(any(feature = "servers"))] pub use cores::{prepare_app_state, start_server}; + +#[cfg(feature = "os_socket")] +pub use os_socket_comms; \ No newline at end of file