From d998dcd2649429c40eeeae92069f7bdaa072e87e Mon Sep 17 00:00:00 2001 From: songpenglei Date: Thu, 8 May 2025 16:54:02 +0800 Subject: [PATCH 1/2] Add OSSocketServer implementation and integrate it into the server startup process --- src/cores/mod.rs | 6 + src/cores/servers/mod.rs | 15 +- src/cores/servers/os_socket/README.md | 76 +++++++++ src/cores/servers/os_socket/handler.rs | 156 +++++++++++++++++ src/cores/servers/os_socket/mod.rs | 221 +++++++++++++++++++++++++ src/cores/state.rs | 6 +- 6 files changed, 476 insertions(+), 4 deletions(-) create mode 100644 src/cores/servers/os_socket/README.md create mode 100644 src/cores/servers/os_socket/handler.rs create mode 100644 src/cores/servers/os_socket/mod.rs diff --git a/src/cores/mod.rs b/src/cores/mod.rs index d919dba..c8dbe95 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -106,6 +106,11 @@ pub async fn start_server(params: ServerStartParams) -> anyhow::Result<()> { // 启动各个server let messaging_server: Option> = Some(Box::new(servers::MessagingServer)); + + // 启动os socket server + let os_socket_server: Option> = + Some(Box::new(servers::OSSocketServer)); + let actix_web_addresses = vec![ params.actix_web_tcp_address, params.actix_web_udp_address, @@ -122,6 +127,7 @@ pub async fn start_server(params: ServerStartParams) -> anyhow::Result<()> { let servers = messaging_server .into_iter() + .chain(os_socket_server.into_iter()) .chain(actix_web_servers) .collect::>(); for server in servers { diff --git a/src/cores/servers/mod.rs b/src/cores/servers/mod.rs index 29ed640..e9dfe4c 100644 --- a/src/cores/servers/mod.rs +++ b/src/cores/servers/mod.rs @@ -1,12 +1,25 @@ pub mod actix_web; pub mod message; +pub mod os_socket; +use crate::cores::state::AppState; use async_trait::async_trait; pub use message::MessagingServer; -use crate::cores::state::AppState; #[async_trait] pub trait Server: Send + Sync + Unpin + 'static { async fn start(&self, app_state: AppState); } +pub struct OSSocketServer; + +#[async_trait] +impl Server for OSSocketServer { + async fn start(&self, app_state: AppState) { + if let Ok(Some(handler)) = + os_socket::handler::OSSocketHandler::new(std::sync::Arc::new(app_state)) + { + handler.start().await; + } + } +} \ No newline at end of file diff --git a/src/cores/servers/os_socket/README.md b/src/cores/servers/os_socket/README.md new file mode 100644 index 0000000..9cd16d9 --- /dev/null +++ b/src/cores/servers/os_socket/README.md @@ -0,0 +1,76 @@ +# OS Socket 模块 + +该模块实现了与操作系统(OS)之间的 Unix Domain Socket 通信功能,用于接收和处理来自 OS 的 TLV(Tag-Length-Value)格式消息。 + +## 功能特性 + +- 通过 Unix Domain Socket 与 OS 建立通信 +- 支持 TLV 消息格式的解析和处理 +- 自动检查 socket 文件存在性 +- 消息转发到消息总线系统 +- 异步处理机制 + +## 消息类型 + +模块支持以下 TLV 消息标签: + +| 标签值 | 描述 | 用途 | +|--------|------|------| +| 0x03030100 | 任务规划 | OS 向云平台发送任务规划数据 | +| 0x03040100 | 云平台数据 | OS 向云平台发送数据 | +| 0x03050100 | 处理数据 | OS 向云平台发送处理数据 | +| 0x05010100 | 任务参数 | OS 向云平台发送任务参数 | + +## 组件说明 + +### OSSocketServer + +负责与 OS 进行 Unix Domain Socket 通信的核心组件: + +- 管理 socket 连接 +- 解析 TLV 消息 +- 发送消息到 OS + +### OSSocketHandler + +处理从 OS 接收到的消息,并将其转发到相应的消息总线主题: + +- 启动消息处理循环 +- 根据消息标签转发到不同主题 +- 错误处理和日志记录 + +## 使用方法 + +```rust +// 创建处理器实例 +let handler = OSSocketHandler::new(app_state)?; + +// 启动消息处理 +if let Some(handler) = handler { + handler.start().await; +} +``` + +## 消息总线主题 + +- `cloud_platform`: 用于转发任务规划、云平台和处理数据 +- `task_parameters`: 用于转发任务参数数据 + +## 错误处理 + +- 自动处理 socket 连接错误 +- 消息解析错误处理 +- 消息转发错误处理 + +## 注意事项 + +1. 确保 socket 文件路径正确(默认为 `/var/run/EulixOnBoardGuardian/os_fleet.sock`) +2. 需要适当的文件系统权限 +3. 消息处理是异步的,不会阻塞主线程 + +## 依赖项 + +- tokio: 异步运行时 +- anyhow: 错误处理 +- log: 日志记录 +- feventbus: 消息总线系统 diff --git a/src/cores/servers/os_socket/handler.rs b/src/cores/servers/os_socket/handler.rs new file mode 100644 index 0000000..656df41 --- /dev/null +++ b/src/cores/servers/os_socket/handler.rs @@ -0,0 +1,156 @@ +// Copyright (c) 2025 Institute of Software, Chinese Academy of Sciences +// fleet-datamgr is licensed under Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PSL v2 for more details. + +// Author: songpenglei, songpenglei@otcaix.iscas.ac.cn +// Affiliation: Institute of Software, Chinese Academy of Sciences + +use crate::cores::servers::os_socket::{ + OSSocketServer, TLVMessage, TAG_CLOUD_PLATFORM, TAG_PROCESSING, TAG_TASK_PARAMS, + TAG_TASK_PLANNING, +}; +use crate::cores::state::AppState; +use anyhow::Result; +use feventbus::message::{Message, NativeEventAction}; +use feventbus::traits::producer::Producer; +use log::{debug, error}; +use std::io::Read; +use std::sync::Arc; + +/// OS Socket 消息处理器 +/// 负责处理从 OS 接收到的 TLV 消息,并将其转发到相应的消息总线主题 +pub struct OSSocketHandler { + /// OS Socket 服务器实例 + server: OSSocketServer, + /// 应用程序状态,包含消息总线客户端等 + app_state: Arc, +} + +impl OSSocketHandler { + /// 创建新的 OS Socket 处理器实例 + /// + /// # 参数 + /// - `app_state`: 应用程序状态 + /// + /// # 返回值 + /// - `Result>`: 成功返回处理器实例,如果 socket 文件不存在则返回 None + pub fn new(app_state: Arc) -> Result> { + match OSSocketServer::new()? { + Some(server) => Ok(Some(Self { + server, + app_state, + })), + None => Ok(None), + } + } + + /// 启动处理器,开始处理来自 OS 的消息 + /// + /// 该方法会启动一个新的异步任务,持续监听和处理 socket 消息 + /// 如果 socket 服务器未初始化,则不会启动处理任务 + pub async fn start(&self) { + let server = self.server.clone(); + let app_state = self.app_state.clone(); + + tokio::spawn(async move { + let mut buf = vec![0u8; 4096]; + loop { + let mut socket = server.socket.lock().await; + match socket.read(&mut buf) { + Ok(size) if size > 0 => { + if let Ok(tlv) = OSSocketServer::parse_tlv(&buf[..size]) { + if let Err(e) = Self::handle_tlv_message(&app_state, tlv).await { + error!("Error handling TLV message: {}", e); + } + } + } + Ok(_) => continue, + Err(e) => { + error!("Error reading from OS socket: {}", e); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + } + }); + } + + /// 处理 TLV 消息 + /// + /// 根据消息标签将消息转发到相应的消息总线主题: + /// - 任务规划、云平台、处理消息 -> "cloud_platform" 主题 + /// - 任务参数消息 -> "task_parameters" 主题 + /// + /// # 参数 + /// - `app_state`: 应用程序状态 + /// - `tlv`: 要处理的 TLV 消息 + /// + /// # 返回值 + /// - `Result<()>`: 成功返回空,失败返回错误 + async fn handle_tlv_message(app_state: &AppState, tlv: TLVMessage) -> Result<()> { + let msg_cli = app_state.message_cli.clone(); + + match tlv.tag { + TAG_TASK_PLANNING | TAG_CLOUD_PLATFORM | TAG_PROCESSING => { + // 转发到云平台主题 + let topic: &str = "cloud_platform"; + let to_publish = Message::new( + topic.to_string(), + NativeEventAction::Other, // 该字段暂时无用 + None, // metadata + Some(serde_json::to_value(tlv.value).unwrap()), // body + None, // created_at + ); + let msg_cli = msg_cli.clone(); + // 另起一个协程来发布消息 + log::debug!( + "WatchEventPublisher Publishing message(s) to topic {}", + topic + ); + if let Err(e) = msg_cli.publish(to_publish).await { + log::error!("WatchEventPublisher Failed to publish event: {}", e); + } + log::debug!( + "WatchEventPublisher Published message(s) to topic {}", + topic + ); + debug!("Forwarded cloud platform data to topic: {}", topic); + } + TAG_TASK_PARAMS => { + // 转发到任务参数主题 + let topic = "task_parameters"; + let to_publish = Message::new( + topic.to_string(), + NativeEventAction::Other, // 该字段暂时无用 + None, // metadata + Some(serde_json::to_value(tlv.value).unwrap()), // body + None, // created_at + ); + let msg_cli = msg_cli.clone(); + // 另起一个协程来发布消息 + log::debug!( + "WatchEventPublisher Publishing message(s) to topic {}", + topic + ); + if let Err(e) = msg_cli.publish(to_publish).await { + log::error!("WatchEventPublisher Failed to publish event: {}", e); + } + log::debug!( + "WatchEventPublisher Published message(s) to topic {}", + topic + ); + debug!("Forwarded task parameters to topic: {}", topic); + } + _ => { + debug!("Received unknown TLV message with tag: 0x{:08x}", tlv.tag); + } + } + Ok(()) + } +} + diff --git a/src/cores/servers/os_socket/mod.rs b/src/cores/servers/os_socket/mod.rs new file mode 100644 index 0000000..af589f8 --- /dev/null +++ b/src/cores/servers/os_socket/mod.rs @@ -0,0 +1,221 @@ +// Copyright (c) 2025 Institute of Software, Chinese Academy of Sciences +// fleet-datamgr is licensed under Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PSL v2 for more details. + +// Author: songpenglei, songpenglei@otcaix.iscas.ac.cn +// Affiliation: Institute of Software, Chinese Academy of Sciences + +pub mod handler; + +use anyhow::Result; +use log::{debug, error}; +use std::io::{Read, Write}; +use std::os::unix::net::UnixStream; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// Unix domain socket 路径,用于与 OS 通信 +pub const OS_SOCKET_PATH: &str = "/var/run/EulixOnBoardGuardian/os_fleet.sock"; + +/// TLV 消息标签定义 +/// 0x03030100: 任务规划(OS->云) +pub const TAG_TASK_PLANNING: u32 = 0x03030100; +/// 0x03040100: 云平台(OS->云) +pub const TAG_CLOUD_PLATFORM: u32 = 0x03040100; +/// 0x03050100: 处理(OS->云) +pub const TAG_PROCESSING: u32 = 0x03050100; +/// 0x05010100: 任务规划工参(OS->云) +pub const TAG_TASK_PARAMS: u32 = 0x05010100; + +/// TLV (Tag-Length-Value) 消息结构 +/// 用于与 OS 进行通信的消息格式 +#[derive(Debug, Clone)] +pub struct TLVMessage { + /// 消息标签,用于标识消息类型 + pub tag: u32, + /// 消息值的长度 + pub length: u32, + /// 消息的具体内容 + pub value: Vec, +} + +/// OS Socket 服务器 +/// 负责与 OS 进行 Unix domain socket 通信 +pub struct OSSocketServer { + /// Unix domain socket 连接,使用 Arc 实现线程安全 + socket: Arc>, +} + +impl Clone for OSSocketServer { + fn clone(&self) -> Self { + Self { + socket: self.socket.clone(), + } + } +} + +impl OSSocketServer { + /// 创建新的 OS Socket 服务器实例 + /// + /// # 返回值 + /// - `Result`: 成功返回服务器实例,失败返回错误 + /// - 如果 socket 文件不存在,返回 Ok(None) + pub fn new() -> Result> { + // 检查 socket 文件是否存在 + if !std::path::Path::new(OS_SOCKET_PATH).exists() { + log::warn!("OS socket file {} does not exist, skipping OS socket server", OS_SOCKET_PATH); + return Ok(None); + } + + let socket = UnixStream::connect(OS_SOCKET_PATH)?; + Ok(Some(Self { + socket: Arc::new(Mutex::new(socket)), + })) + } + + /// 启动服务器,开始监听来自 OS 的消息 + /// + /// 该方法会启动一个新的异步任务,持续监听 socket 连接 + pub async fn start(&self) { + let socket = self.socket.clone(); + + tokio::spawn(async move { + let mut buf = vec![0u8; 4096]; + loop { + let mut socket = socket.lock().await; + match socket.read(&mut buf) { + Ok(size) if size > 0 => { + if let Ok(tlv) = Self::parse_tlv(&buf[..size]) { + match tlv.tag { + TAG_TASK_PLANNING | TAG_CLOUD_PLATFORM | TAG_PROCESSING => { + debug!("Received cloud platform data: {:?}", tlv); + // TODO: Forward to cloud platform + } + TAG_TASK_PARAMS => { + debug!("Received task parameters: {:?}", tlv); + // TODO: Handle task parameters + } + _ => { + debug!("Received unknown TLV message: {:?}", tlv); + } + } + } + } + Ok(_) => continue, + Err(e) => { + error!("Error reading from OS socket: {}", e); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + } + }); + } + + /// 解析 TLV 格式的消息 + /// + /// # 参数 + /// - `data`: 原始字节数据 + /// + /// # 返回值 + /// - `Result`: 成功返回解析后的消息,失败返回错误 + pub fn parse_tlv(data: &[u8]) -> Result { + if data.len() < 8 { + return Err(anyhow::anyhow!("Invalid TLV message: too short")); + } + + let tag = u32::from_be_bytes(data[0..4].try_into()?); + let length = u32::from_be_bytes(data[4..8].try_into()?); + + if data.len() < (8 + length as usize) { + return Err(anyhow::anyhow!("Invalid TLV message: incomplete value")); + } + + let value = data[8..(8 + length as usize)].to_vec(); + + Ok(TLVMessage { tag, length, value }) + } + + /// 发送 TLV 格式的消息到 OS + /// + /// # 参数 + /// - `tag`: 消息标签 + /// - `value`: 消息内容 + /// + /// # 返回值 + /// - `Result<()>`: 成功返回空,失败返回错误 + pub async fn send_message(&self, tag: u32, value: &[u8]) -> Result<()> { + let mut socket = self.socket.lock().await; + + let mut message = Vec::with_capacity(8 + value.len()); + message.extend_from_slice(&tag.to_be_bytes()); + message.extend_from_slice(&(value.len() as u32).to_be_bytes()); + message.extend_from_slice(value); + + socket.write_all(&message)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_tlv_message_parsing() { + // 测试有效 TLV 消息的解析 + let tag = TAG_TASK_PLANNING; + let value = b"test message"; + let mut data = Vec::new(); + data.extend_from_slice(&tag.to_be_bytes()); + data.extend_from_slice(&(value.len() as u32).to_be_bytes()); + data.extend_from_slice(value); + + let tlv = OSSocketServer::parse_tlv(&data).unwrap(); + assert_eq!(tlv.tag, tag); + assert_eq!(tlv.length, value.len() as u32); + assert_eq!(tlv.value, value); + + // 测试消息太短的情况 + let invalid_data = vec![0u8; 4]; + assert!(OSSocketServer::parse_tlv(&invalid_data).is_err()); + + // 测试消息值不完整的情况 + let mut invalid_data = Vec::new(); + invalid_data.extend_from_slice(&tag.to_be_bytes()); + invalid_data.extend_from_slice(&(100u32).to_be_bytes()); // 长度大于实际数据 + assert!(OSSocketServer::parse_tlv(&invalid_data).is_err()); + } + + #[test] + fn test_tlv_message_construction() { + // 测试 TLV 消息的构建 + let tag = TAG_CLOUD_PLATFORM; + let value = b"cloud data"; + let mut expected = Vec::new(); + expected.extend_from_slice(&tag.to_be_bytes()); + expected.extend_from_slice(&(value.len() as u32).to_be_bytes()); + expected.extend_from_slice(value); + + let mut actual = Vec::with_capacity(8 + value.len()); + actual.extend_from_slice(&tag.to_be_bytes()); + actual.extend_from_slice(&(value.len() as u32).to_be_bytes()); + actual.extend_from_slice(value); + + assert_eq!(actual, expected); + } + + #[test] + fn test_tlv_message_tags() { + // 测试所有预定义的 TLV 标签值 + assert_eq!(TAG_TASK_PLANNING, 0x03030100); + assert_eq!(TAG_CLOUD_PLATFORM, 0x03040100); + assert_eq!(TAG_PROCESSING, 0x03050100); + assert_eq!(TAG_TASK_PARAMS, 0x05010100); + } +} diff --git a/src/cores/state.rs b/src/cores/state.rs index 8deed5a..e05f398 100644 --- a/src/cores/state.rs +++ b/src/cores/state.rs @@ -1,8 +1,8 @@ -use std::sync::Arc; -use feventbus::impls::messaging::messaging::Messaging; use crate::cores::router::Router; use crate::db::db::DbPool; use crate::messaging::WatchDaemon; +use feventbus::impls::messaging::messaging::Messaging; +use std::sync::Arc; use super::daemons::consensus::ConsensusDaemon; @@ -15,4 +15,4 @@ pub struct AppState { pub consensus_daemon: Arc, pub cluster_id: String, pub token_secret: Vec, -} \ No newline at end of file +} -- Gitee From 03c78eed07344edb944716522e72fc0b1f9605f5 Mon Sep 17 00:00:00 2001 From: songpenglei Date: Fri, 9 May 2025 11:17:12 +0800 Subject: [PATCH 2/2] add tlv resolver and code structure --- README.md | 41 +++++++++ src/cores/mod.rs | 4 +- src/cores/servers/mod.rs | 6 +- src/cores/servers/os_socket/README.md | 67 ++++++++------ src/cores/servers/os_socket/handler.rs | 119 +++++++++++++------------ src/cores/servers/os_socket/mod.rs | 68 ++++++-------- 6 files changed, 176 insertions(+), 129 deletions(-) diff --git a/README.md b/README.md index 0c2faa4..3cb3ca7 100644 --- a/README.md +++ b/README.md @@ -76,3 +76,44 @@ dd if=/dev/urandom of=random_file.bin bs=896 count=12000 ```sh diff random_file.bin test.bin ``` + +## 项目结构 + +``` +apiserver/ +├── src/ # 源代码目录 +│ ├── cores/ # 核心功能实现 +│ ├── db/ # 数据库相关代码 +│ ├── middleware/ # 中间件实现 +│ ├── utils/ # 工具函数 +│ ├── lib.rs # 库入口文件 +│ └── schema.rs # 数据库模式定义 +├── examples/ # 示例代码 +│ ├── basic_example.rs # 基础示例 +│ └── send_rs422_packet.rs # RS422通信示例 +├── tests/ # 测试代码 +├── migrations/ # 数据库迁移文件 +├── docs/ # 文档 +├── images/ # 图片资源 +├── Cargo.toml # 项目配置和依赖 +├── Cargo.lock # 依赖版本锁定文件 +└── diesel.toml # Diesel ORM配置 +``` + +### 主要功能模块 + +- **cores/**: 实现核心API服务器功能 +- **db/**: 数据库操作和ORM相关代码 +- **middleware/**: 中间件实现,如认证、日志等 +- **utils/**: 通用工具函数和辅助代码 +- **examples/**: 包含各种使用示例,如HTTP、UDP、QUIC和RS422通信示例 + +### 主要依赖 + +- **actix-web**: Web框架 +- **diesel**: ORM框架 +- **tokio**: 异步运行时 +- **quiche**: QUIC协议实现 +- **serialport**: 串口通信 +- **consensus_kv**: 分布式键值存储 + diff --git a/src/cores/mod.rs b/src/cores/mod.rs index c8dbe95..4e2b5bd 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -107,9 +107,9 @@ pub async fn start_server(params: ServerStartParams) -> anyhow::Result<()> { let messaging_server: Option> = Some(Box::new(servers::MessagingServer)); - // 启动os socket server + // 启动 os socket server,如果 socket 文件不存在,将跳过启动 let os_socket_server: Option> = - Some(Box::new(servers::OSSocketServer)); + Some(Box::new(servers::OSSocketServerManager)); let actix_web_addresses = vec![ params.actix_web_tcp_address, diff --git a/src/cores/servers/mod.rs b/src/cores/servers/mod.rs index e9dfe4c..c05ba93 100644 --- a/src/cores/servers/mod.rs +++ b/src/cores/servers/mod.rs @@ -11,10 +11,12 @@ pub trait Server: Send + Sync + Unpin + 'static { async fn start(&self, app_state: AppState); } -pub struct OSSocketServer; +/// OS Socket 服务器管理器 +/// 负责创建和管理 OS Socket 处理器的生命周期 +pub struct OSSocketServerManager; #[async_trait] -impl Server for OSSocketServer { +impl Server for OSSocketServerManager { async fn start(&self, app_state: AppState) { if let Ok(Some(handler)) = os_socket::handler::OSSocketHandler::new(std::sync::Arc::new(app_state)) diff --git a/src/cores/servers/os_socket/README.md b/src/cores/servers/os_socket/README.md index 9cd16d9..a5779f7 100644 --- a/src/cores/servers/os_socket/README.md +++ b/src/cores/servers/os_socket/README.md @@ -2,6 +2,41 @@ 该模块实现了与操作系统(OS)之间的 Unix Domain Socket 通信功能,用于接收和处理来自 OS 的 TLV(Tag-Length-Value)格式消息。 +## 组件结构 + +模块包含三个主要组件: + +1. `OSSocketServerManager`(在 `servers/mod.rs` 中) + - 作为服务器管理器,实现 `Server` trait + - 负责创建和管理 `OSSocketHandler` 的生命周期 + - 在应用启动时初始化 socket 通信 + +2. `OSSocketServer`(在 `os_socket/mod.rs` 中) + - 负责底层的 socket 通信 + - 管理 socket 连接 + - 提供 TLV 消息的读写功能 + +3. `OSSocketHandler`(在 `os_socket/handler.rs` 中) + - 处理从 OS 接收到的消息 + - 负责消息的解析和转发 + - 与消息总线系统集成 + +## 组件关系 + +``` +OSSocketServerManager + │ + ├── 创建和管理 + │ + └── OSSocketHandler + │ + ├── 使用 + │ + └── OSSocketServer + │ + └── 处理底层 socket 通信 +``` + ## 功能特性 - 通过 Unix Domain Socket 与 OS 建立通信 @@ -21,39 +56,21 @@ | 0x03050100 | 处理数据 | OS 向云平台发送处理数据 | | 0x05010100 | 任务参数 | OS 向云平台发送任务参数 | -## 组件说明 - -### OSSocketServer - -负责与 OS 进行 Unix Domain Socket 通信的核心组件: - -- 管理 socket 连接 -- 解析 TLV 消息 -- 发送消息到 OS - -### OSSocketHandler - -处理从 OS 接收到的消息,并将其转发到相应的消息总线主题: - -- 启动消息处理循环 -- 根据消息标签转发到不同主题 -- 错误处理和日志记录 - ## 使用方法 ```rust -// 创建处理器实例 -let handler = OSSocketHandler::new(app_state)?; +// 创建服务器管理器 +let server_manager = OSSocketServerManager; -// 启动消息处理 -if let Some(handler) = handler { - handler.start().await; -} +// 启动服务器 +server_manager.start(app_state).await; ``` ## 消息总线主题 -- `cloud_platform`: 用于转发任务规划、云平台和处理数据 +- `task_planning`: 用于转发任务规划数据 +- `cloud_platform`: 用于转发云平台数据 +- `processing`: 用于转发处理数据 - `task_parameters`: 用于转发任务参数数据 ## 错误处理 diff --git a/src/cores/servers/os_socket/handler.rs b/src/cores/servers/os_socket/handler.rs index 656df41..797a086 100644 --- a/src/cores/servers/os_socket/handler.rs +++ b/src/cores/servers/os_socket/handler.rs @@ -20,14 +20,13 @@ use anyhow::Result; use feventbus::message::{Message, NativeEventAction}; use feventbus::traits::producer::Producer; use log::{debug, error}; -use std::io::Read; use std::sync::Arc; /// OS Socket 消息处理器 /// 负责处理从 OS 接收到的 TLV 消息,并将其转发到相应的消息总线主题 pub struct OSSocketHandler { /// OS Socket 服务器实例 - server: OSSocketServer, + socket_server: OSSocketServer, /// 应用程序状态,包含消息总线客户端等 app_state: Arc, } @@ -43,7 +42,7 @@ impl OSSocketHandler { pub fn new(app_state: Arc) -> Result> { match OSSocketServer::new()? { Some(server) => Ok(Some(Self { - server, + socket_server: server, app_state, })), None => Ok(None), @@ -55,16 +54,14 @@ impl OSSocketHandler { /// 该方法会启动一个新的异步任务,持续监听和处理 socket 消息 /// 如果 socket 服务器未初始化,则不会启动处理任务 pub async fn start(&self) { - let server = self.server.clone(); + let server = self.socket_server.clone(); let app_state = self.app_state.clone(); tokio::spawn(async move { - let mut buf = vec![0u8; 4096]; loop { - let mut socket = server.socket.lock().await; - match socket.read(&mut buf) { - Ok(size) if size > 0 => { - if let Ok(tlv) = OSSocketServer::parse_tlv(&buf[..size]) { + match server.read_data().await { + Ok(data) if !data.is_empty() => { + if let Ok(tlv) = OSSocketServer::parse_tlv(&data) { if let Err(e) = Self::handle_tlv_message(&app_state, tlv).await { error!("Error handling TLV message: {}", e); } @@ -83,7 +80,9 @@ impl OSSocketHandler { /// 处理 TLV 消息 /// /// 根据消息标签将消息转发到相应的消息总线主题: - /// - 任务规划、云平台、处理消息 -> "cloud_platform" 主题 + /// - 任务规划消息 -> "task_planning" 主题 + /// - 云平台消息 -> "cloud_platform" 主题 + /// - 处理消息 -> "processing" 主题 /// - 任务参数消息 -> "task_parameters" 主题 /// /// # 参数 @@ -93,64 +92,68 @@ impl OSSocketHandler { /// # 返回值 /// - `Result<()>`: 成功返回空,失败返回错误 async fn handle_tlv_message(app_state: &AppState, tlv: TLVMessage) -> Result<()> { - let msg_cli = app_state.message_cli.clone(); + let message_client = app_state.message_cli.clone(); + debug!("Received TLV message with tag: 0x{:08x}, length: {}", tlv.tag, tlv.length); match tlv.tag { - TAG_TASK_PLANNING | TAG_CLOUD_PLATFORM | TAG_PROCESSING => { - // 转发到云平台主题 - let topic: &str = "cloud_platform"; - let to_publish = Message::new( - topic.to_string(), - NativeEventAction::Other, // 该字段暂时无用 - None, // metadata - Some(serde_json::to_value(tlv.value).unwrap()), // body - None, // created_at - ); - let msg_cli = msg_cli.clone(); - // 另起一个协程来发布消息 - log::debug!( - "WatchEventPublisher Publishing message(s) to topic {}", - topic - ); - if let Err(e) = msg_cli.publish(to_publish).await { - log::error!("WatchEventPublisher Failed to publish event: {}", e); - } - log::debug!( - "WatchEventPublisher Published message(s) to topic {}", - topic - ); - debug!("Forwarded cloud platform data to topic: {}", topic); + TAG_TASK_PLANNING => { + debug!("Processing task planning message"); + let topic = "task_planning"; + Self::publish_message(&message_client, topic, tlv.value).await?; + debug!("Successfully forwarded task planning message to topic: {}", topic); + } + TAG_CLOUD_PLATFORM => { + debug!("Processing cloud platform message"); + let topic = "cloud_platform"; + // TODO: 云平台接收到自己的遥控指令后,需要将遥控指令应用到云平台配置中 + Self::publish_message(&message_client, topic, tlv.value).await?; + debug!("Successfully forwarded cloud platform message to topic: {}", topic); + } + TAG_PROCESSING => { + debug!("Processing processing message"); + let topic = "processing"; + Self::publish_message(&message_client, topic, tlv.value).await?; + debug!("Successfully forwarded processing message to topic: {}", topic); } TAG_TASK_PARAMS => { - // 转发到任务参数主题 + debug!("Processing task parameters message"); let topic = "task_parameters"; - let to_publish = Message::new( - topic.to_string(), - NativeEventAction::Other, // 该字段暂时无用 - None, // metadata - Some(serde_json::to_value(tlv.value).unwrap()), // body - None, // created_at - ); - let msg_cli = msg_cli.clone(); - // 另起一个协程来发布消息 - log::debug!( - "WatchEventPublisher Publishing message(s) to topic {}", - topic - ); - if let Err(e) = msg_cli.publish(to_publish).await { - log::error!("WatchEventPublisher Failed to publish event: {}", e); - } - log::debug!( - "WatchEventPublisher Published message(s) to topic {}", - topic - ); - debug!("Forwarded task parameters to topic: {}", topic); + Self::publish_message(&message_client, topic, tlv.value).await?; + debug!("Successfully forwarded task parameters message to topic: {}", topic); } _ => { - debug!("Received unknown TLV message with tag: 0x{:08x}", tlv.tag); + debug!("Received unknown TLV message with tag: 0x{:08x}, length: {}", tlv.tag, tlv.length); } } Ok(()) } + + /// 发布消息到消息总线 + /// + /// # 参数 + /// - `message_client`: 消息总线客户端 + /// - `topic`: 目标主题 + /// - `value`: 消息内容 + /// + /// # 返回值 + /// - `Result<()>`: 成功返回空,失败返回错误 + async fn publish_message(message_client: &Arc

, topic: &str, value: Vec) -> Result<()> { + debug!("Preparing to publish message to topic: {}, value length: {}", topic, value.len()); + let message = Message::new( + topic.to_string(), + NativeEventAction::Other, + None, + Some(serde_json::to_value(value)?), + None, + ); + + log::debug!("OSSocketHandler Publishing message to topic: {}", topic); + if let Err(e) = message_client.as_ref().publish(message).await { + log::error!("OSSocketHandler Failed to publish message to topic {}: {}", topic, e); + return Err(anyhow::anyhow!("Failed to publish message to topic {}: {}", topic, e)); + } + log::debug!("OSSocketHandler Successfully published message to topic: {}", topic); + Ok(()) + } } diff --git a/src/cores/servers/os_socket/mod.rs b/src/cores/servers/os_socket/mod.rs index af589f8..69c171d 100644 --- a/src/cores/servers/os_socket/mod.rs +++ b/src/cores/servers/os_socket/mod.rs @@ -14,7 +14,7 @@ pub mod handler; use anyhow::Result; -use log::{debug, error}; +use log::error; use std::io::{Read, Write}; use std::os::unix::net::UnixStream; use std::sync::Arc; @@ -46,16 +46,16 @@ pub struct TLVMessage { } /// OS Socket 服务器 -/// 负责与 OS 进行 Unix domain socket 通信 +/// 负责与 OS 进行 Unix Domain Socket 通信 pub struct OSSocketServer { /// Unix domain socket 连接,使用 Arc 实现线程安全 - socket: Arc>, + socket_connection: Arc>, } impl Clone for OSSocketServer { fn clone(&self) -> Self { Self { - socket: self.socket.clone(), + socket_connection: self.socket_connection.clone(), } } } @@ -64,7 +64,7 @@ impl OSSocketServer { /// 创建新的 OS Socket 服务器实例 /// /// # 返回值 - /// - `Result`: 成功返回服务器实例,失败返回错误 + /// - `Result>`: 成功返回服务器实例,失败返回错误 /// - 如果 socket 文件不存在,返回 Ok(None) pub fn new() -> Result> { // 检查 socket 文件是否存在 @@ -75,46 +75,30 @@ impl OSSocketServer { let socket = UnixStream::connect(OS_SOCKET_PATH)?; Ok(Some(Self { - socket: Arc::new(Mutex::new(socket)), + socket_connection: Arc::new(Mutex::new(socket)), })) } - /// 启动服务器,开始监听来自 OS 的消息 + /// 从 socket 读取数据 /// - /// 该方法会启动一个新的异步任务,持续监听 socket 连接 - pub async fn start(&self) { - let socket = self.socket.clone(); - - tokio::spawn(async move { - let mut buf = vec![0u8; 4096]; - loop { - let mut socket = socket.lock().await; - match socket.read(&mut buf) { - Ok(size) if size > 0 => { - if let Ok(tlv) = Self::parse_tlv(&buf[..size]) { - match tlv.tag { - TAG_TASK_PLANNING | TAG_CLOUD_PLATFORM | TAG_PROCESSING => { - debug!("Received cloud platform data: {:?}", tlv); - // TODO: Forward to cloud platform - } - TAG_TASK_PARAMS => { - debug!("Received task parameters: {:?}", tlv); - // TODO: Handle task parameters - } - _ => { - debug!("Received unknown TLV message: {:?}", tlv); - } - } - } - } - Ok(_) => continue, - Err(e) => { - error!("Error reading from OS socket: {}", e); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - } + /// # 返回值 + /// - `Result>`: 成功返回读取的数据,失败返回错误 + pub async fn read_data(&self) -> Result> { + let mut buffer = vec![0u8; 4096]; + let mut socket = self.socket_connection.lock().await; + + match socket.read(&mut buffer) { + Ok(size) if size > 0 => { + Ok(buffer[..size].to_vec()) + } + Ok(_) => { + Ok(Vec::new()) } - }); + Err(e) => { + error!("Error reading from OS socket: {}", e); + Err(anyhow::anyhow!("Failed to read from socket: {}", e)) + } + } } /// 解析 TLV 格式的消息 @@ -149,8 +133,8 @@ impl OSSocketServer { /// /// # 返回值 /// - `Result<()>`: 成功返回空,失败返回错误 - pub async fn send_message(&self, tag: u32, value: &[u8]) -> Result<()> { - let mut socket = self.socket.lock().await; + pub async fn send_tlv_message(&self, tag: u32, value: &[u8]) -> Result<()> { + let mut socket = self.socket_connection.lock().await; let mut message = Vec::with_capacity(8 + value.len()); message.extend_from_slice(&tag.to_be_bytes()); -- Gitee