diff --git a/README.md b/README.md index 0c2faa4183b12c80e07a214a115223b22fb28a65..3cb3ca70bf2bffb0f5ee34ca10f58c1132a95333 100644 --- a/README.md +++ b/README.md @@ -76,3 +76,44 @@ dd if=/dev/urandom of=random_file.bin bs=896 count=12000 ```sh diff random_file.bin test.bin ``` + +## 项目结构 + +``` +apiserver/ +├── src/ # 源代码目录 +│ ├── cores/ # 核心功能实现 +│ ├── db/ # 数据库相关代码 +│ ├── middleware/ # 中间件实现 +│ ├── utils/ # 工具函数 +│ ├── lib.rs # 库入口文件 +│ └── schema.rs # 数据库模式定义 +├── examples/ # 示例代码 +│ ├── basic_example.rs # 基础示例 +│ └── send_rs422_packet.rs # RS422通信示例 +├── tests/ # 测试代码 +├── migrations/ # 数据库迁移文件 +├── docs/ # 文档 +├── images/ # 图片资源 +├── Cargo.toml # 项目配置和依赖 +├── Cargo.lock # 依赖版本锁定文件 +└── diesel.toml # Diesel ORM配置 +``` + +### 主要功能模块 + +- **cores/**: 实现核心API服务器功能 +- **db/**: 数据库操作和ORM相关代码 +- **middleware/**: 中间件实现,如认证、日志等 +- **utils/**: 通用工具函数和辅助代码 +- **examples/**: 包含各种使用示例,如HTTP、UDP、QUIC和RS422通信示例 + +### 主要依赖 + +- **actix-web**: Web框架 +- **diesel**: ORM框架 +- **tokio**: 异步运行时 +- **quiche**: QUIC协议实现 +- **serialport**: 串口通信 +- **consensus_kv**: 分布式键值存储 + diff --git a/src/cores/mod.rs b/src/cores/mod.rs index d919dba2399ddf224b781dfec661f97f4d3cbece..4e2b5bd7822b9959d22da91d96899e6ac03e3972 100644 --- a/src/cores/mod.rs +++ b/src/cores/mod.rs @@ -106,6 +106,11 @@ pub async fn start_server(params: ServerStartParams) -> anyhow::Result<()> { // 启动各个server let messaging_server: Option> = Some(Box::new(servers::MessagingServer)); + + // 启动 os socket server,如果 socket 文件不存在,将跳过启动 + let os_socket_server: Option> = + Some(Box::new(servers::OSSocketServerManager)); + let actix_web_addresses = vec![ params.actix_web_tcp_address, params.actix_web_udp_address, @@ -122,6 +127,7 @@ pub async fn start_server(params: ServerStartParams) -> anyhow::Result<()> { let servers = messaging_server .into_iter() + .chain(os_socket_server.into_iter()) .chain(actix_web_servers) .collect::>(); for server in servers { diff --git a/src/cores/servers/mod.rs b/src/cores/servers/mod.rs index 29ed6405bbf8375f0cf93cf4eb7167e11e75b2d8..c05ba93b5735867430cd9173f0cd34f91700046e 100644 --- a/src/cores/servers/mod.rs +++ b/src/cores/servers/mod.rs @@ -1,12 +1,27 @@ pub mod actix_web; pub mod message; +pub mod os_socket; +use crate::cores::state::AppState; use async_trait::async_trait; pub use message::MessagingServer; -use crate::cores::state::AppState; #[async_trait] pub trait Server: Send + Sync + Unpin + 'static { async fn start(&self, app_state: AppState); } +/// OS Socket 服务器管理器 +/// 负责创建和管理 OS Socket 处理器的生命周期 +pub struct OSSocketServerManager; + +#[async_trait] +impl Server for OSSocketServerManager { + async fn start(&self, app_state: AppState) { + if let Ok(Some(handler)) = + os_socket::handler::OSSocketHandler::new(std::sync::Arc::new(app_state)) + { + handler.start().await; + } + } +} \ No newline at end of file diff --git a/src/cores/servers/os_socket/README.md b/src/cores/servers/os_socket/README.md new file mode 100644 index 0000000000000000000000000000000000000000..a5779f77db4f2ea80affd923eb3a713152f79f79 --- /dev/null +++ b/src/cores/servers/os_socket/README.md @@ -0,0 +1,93 @@ +# OS Socket 模块 + +该模块实现了与操作系统(OS)之间的 Unix Domain Socket 通信功能,用于接收和处理来自 OS 的 TLV(Tag-Length-Value)格式消息。 + +## 组件结构 + +模块包含三个主要组件: + +1. `OSSocketServerManager`(在 `servers/mod.rs` 中) + - 作为服务器管理器,实现 `Server` trait + - 负责创建和管理 `OSSocketHandler` 的生命周期 + - 在应用启动时初始化 socket 通信 + +2. `OSSocketServer`(在 `os_socket/mod.rs` 中) + - 负责底层的 socket 通信 + - 管理 socket 连接 + - 提供 TLV 消息的读写功能 + +3. `OSSocketHandler`(在 `os_socket/handler.rs` 中) + - 处理从 OS 接收到的消息 + - 负责消息的解析和转发 + - 与消息总线系统集成 + +## 组件关系 + +``` +OSSocketServerManager + │ + ├── 创建和管理 + │ + └── OSSocketHandler + │ + ├── 使用 + │ + └── OSSocketServer + │ + └── 处理底层 socket 通信 +``` + +## 功能特性 + +- 通过 Unix Domain Socket 与 OS 建立通信 +- 支持 TLV 消息格式的解析和处理 +- 自动检查 socket 文件存在性 +- 消息转发到消息总线系统 +- 异步处理机制 + +## 消息类型 + +模块支持以下 TLV 消息标签: + +| 标签值 | 描述 | 用途 | +|--------|------|------| +| 0x03030100 | 任务规划 | OS 向云平台发送任务规划数据 | +| 0x03040100 | 云平台数据 | OS 向云平台发送数据 | +| 0x03050100 | 处理数据 | OS 向云平台发送处理数据 | +| 0x05010100 | 任务参数 | OS 向云平台发送任务参数 | + +## 使用方法 + +```rust +// 创建服务器管理器 +let server_manager = OSSocketServerManager; + +// 启动服务器 +server_manager.start(app_state).await; +``` + +## 消息总线主题 + +- `task_planning`: 用于转发任务规划数据 +- `cloud_platform`: 用于转发云平台数据 +- `processing`: 用于转发处理数据 +- `task_parameters`: 用于转发任务参数数据 + +## 错误处理 + +- 自动处理 socket 连接错误 +- 消息解析错误处理 +- 消息转发错误处理 + +## 注意事项 + +1. 确保 socket 文件路径正确(默认为 `/var/run/EulixOnBoardGuardian/os_fleet.sock`) +2. 需要适当的文件系统权限 +3. 消息处理是异步的,不会阻塞主线程 + +## 依赖项 + +- tokio: 异步运行时 +- anyhow: 错误处理 +- log: 日志记录 +- feventbus: 消息总线系统 diff --git a/src/cores/servers/os_socket/handler.rs b/src/cores/servers/os_socket/handler.rs new file mode 100644 index 0000000000000000000000000000000000000000..797a08695623d858cd23c17b01b6dd8f93078e20 --- /dev/null +++ b/src/cores/servers/os_socket/handler.rs @@ -0,0 +1,159 @@ +// Copyright (c) 2025 Institute of Software, Chinese Academy of Sciences +// fleet-datamgr is licensed under Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PSL v2 for more details. + +// Author: songpenglei, songpenglei@otcaix.iscas.ac.cn +// Affiliation: Institute of Software, Chinese Academy of Sciences + +use crate::cores::servers::os_socket::{ + OSSocketServer, TLVMessage, TAG_CLOUD_PLATFORM, TAG_PROCESSING, TAG_TASK_PARAMS, + TAG_TASK_PLANNING, +}; +use crate::cores::state::AppState; +use anyhow::Result; +use feventbus::message::{Message, NativeEventAction}; +use feventbus::traits::producer::Producer; +use log::{debug, error}; +use std::sync::Arc; + +/// OS Socket 消息处理器 +/// 负责处理从 OS 接收到的 TLV 消息,并将其转发到相应的消息总线主题 +pub struct OSSocketHandler { + /// OS Socket 服务器实例 + socket_server: OSSocketServer, + /// 应用程序状态,包含消息总线客户端等 + app_state: Arc, +} + +impl OSSocketHandler { + /// 创建新的 OS Socket 处理器实例 + /// + /// # 参数 + /// - `app_state`: 应用程序状态 + /// + /// # 返回值 + /// - `Result>`: 成功返回处理器实例,如果 socket 文件不存在则返回 None + pub fn new(app_state: Arc) -> Result> { + match OSSocketServer::new()? { + Some(server) => Ok(Some(Self { + socket_server: server, + app_state, + })), + None => Ok(None), + } + } + + /// 启动处理器,开始处理来自 OS 的消息 + /// + /// 该方法会启动一个新的异步任务,持续监听和处理 socket 消息 + /// 如果 socket 服务器未初始化,则不会启动处理任务 + pub async fn start(&self) { + let server = self.socket_server.clone(); + let app_state = self.app_state.clone(); + + tokio::spawn(async move { + loop { + match server.read_data().await { + Ok(data) if !data.is_empty() => { + if let Ok(tlv) = OSSocketServer::parse_tlv(&data) { + if let Err(e) = Self::handle_tlv_message(&app_state, tlv).await { + error!("Error handling TLV message: {}", e); + } + } + } + Ok(_) => continue, + Err(e) => { + error!("Error reading from OS socket: {}", e); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + } + } + }); + } + + /// 处理 TLV 消息 + /// + /// 根据消息标签将消息转发到相应的消息总线主题: + /// - 任务规划消息 -> "task_planning" 主题 + /// - 云平台消息 -> "cloud_platform" 主题 + /// - 处理消息 -> "processing" 主题 + /// - 任务参数消息 -> "task_parameters" 主题 + /// + /// # 参数 + /// - `app_state`: 应用程序状态 + /// - `tlv`: 要处理的 TLV 消息 + /// + /// # 返回值 + /// - `Result<()>`: 成功返回空,失败返回错误 + async fn handle_tlv_message(app_state: &AppState, tlv: TLVMessage) -> Result<()> { + let message_client = app_state.message_cli.clone(); + debug!("Received TLV message with tag: 0x{:08x}, length: {}", tlv.tag, tlv.length); + + match tlv.tag { + TAG_TASK_PLANNING => { + debug!("Processing task planning message"); + let topic = "task_planning"; + Self::publish_message(&message_client, topic, tlv.value).await?; + debug!("Successfully forwarded task planning message to topic: {}", topic); + } + TAG_CLOUD_PLATFORM => { + debug!("Processing cloud platform message"); + let topic = "cloud_platform"; + // TODO: 云平台接收到自己的遥控指令后,需要将遥控指令应用到云平台配置中 + Self::publish_message(&message_client, topic, tlv.value).await?; + debug!("Successfully forwarded cloud platform message to topic: {}", topic); + } + TAG_PROCESSING => { + debug!("Processing processing message"); + let topic = "processing"; + Self::publish_message(&message_client, topic, tlv.value).await?; + debug!("Successfully forwarded processing message to topic: {}", topic); + } + TAG_TASK_PARAMS => { + debug!("Processing task parameters message"); + let topic = "task_parameters"; + Self::publish_message(&message_client, topic, tlv.value).await?; + debug!("Successfully forwarded task parameters message to topic: {}", topic); + } + _ => { + debug!("Received unknown TLV message with tag: 0x{:08x}, length: {}", tlv.tag, tlv.length); + } + } + Ok(()) + } + + /// 发布消息到消息总线 + /// + /// # 参数 + /// - `message_client`: 消息总线客户端 + /// - `topic`: 目标主题 + /// - `value`: 消息内容 + /// + /// # 返回值 + /// - `Result<()>`: 成功返回空,失败返回错误 + async fn publish_message(message_client: &Arc

, topic: &str, value: Vec) -> Result<()> { + debug!("Preparing to publish message to topic: {}, value length: {}", topic, value.len()); + let message = Message::new( + topic.to_string(), + NativeEventAction::Other, + None, + Some(serde_json::to_value(value)?), + None, + ); + + log::debug!("OSSocketHandler Publishing message to topic: {}", topic); + if let Err(e) = message_client.as_ref().publish(message).await { + log::error!("OSSocketHandler Failed to publish message to topic {}: {}", topic, e); + return Err(anyhow::anyhow!("Failed to publish message to topic {}: {}", topic, e)); + } + log::debug!("OSSocketHandler Successfully published message to topic: {}", topic); + Ok(()) + } +} + diff --git a/src/cores/servers/os_socket/mod.rs b/src/cores/servers/os_socket/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..69c171dacbd7f399fa20838f15651ead17155aec --- /dev/null +++ b/src/cores/servers/os_socket/mod.rs @@ -0,0 +1,205 @@ +// Copyright (c) 2025 Institute of Software, Chinese Academy of Sciences +// fleet-datamgr is licensed under Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PSL v2 for more details. + +// Author: songpenglei, songpenglei@otcaix.iscas.ac.cn +// Affiliation: Institute of Software, Chinese Academy of Sciences + +pub mod handler; + +use anyhow::Result; +use log::error; +use std::io::{Read, Write}; +use std::os::unix::net::UnixStream; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// Unix domain socket 路径,用于与 OS 通信 +pub const OS_SOCKET_PATH: &str = "/var/run/EulixOnBoardGuardian/os_fleet.sock"; + +/// TLV 消息标签定义 +/// 0x03030100: 任务规划(OS->云) +pub const TAG_TASK_PLANNING: u32 = 0x03030100; +/// 0x03040100: 云平台(OS->云) +pub const TAG_CLOUD_PLATFORM: u32 = 0x03040100; +/// 0x03050100: 处理(OS->云) +pub const TAG_PROCESSING: u32 = 0x03050100; +/// 0x05010100: 任务规划工参(OS->云) +pub const TAG_TASK_PARAMS: u32 = 0x05010100; + +/// TLV (Tag-Length-Value) 消息结构 +/// 用于与 OS 进行通信的消息格式 +#[derive(Debug, Clone)] +pub struct TLVMessage { + /// 消息标签,用于标识消息类型 + pub tag: u32, + /// 消息值的长度 + pub length: u32, + /// 消息的具体内容 + pub value: Vec, +} + +/// OS Socket 服务器 +/// 负责与 OS 进行 Unix Domain Socket 通信 +pub struct OSSocketServer { + /// Unix domain socket 连接,使用 Arc 实现线程安全 + socket_connection: Arc>, +} + +impl Clone for OSSocketServer { + fn clone(&self) -> Self { + Self { + socket_connection: self.socket_connection.clone(), + } + } +} + +impl OSSocketServer { + /// 创建新的 OS Socket 服务器实例 + /// + /// # 返回值 + /// - `Result>`: 成功返回服务器实例,失败返回错误 + /// - 如果 socket 文件不存在,返回 Ok(None) + pub fn new() -> Result> { + // 检查 socket 文件是否存在 + if !std::path::Path::new(OS_SOCKET_PATH).exists() { + log::warn!("OS socket file {} does not exist, skipping OS socket server", OS_SOCKET_PATH); + return Ok(None); + } + + let socket = UnixStream::connect(OS_SOCKET_PATH)?; + Ok(Some(Self { + socket_connection: Arc::new(Mutex::new(socket)), + })) + } + + /// 从 socket 读取数据 + /// + /// # 返回值 + /// - `Result>`: 成功返回读取的数据,失败返回错误 + pub async fn read_data(&self) -> Result> { + let mut buffer = vec![0u8; 4096]; + let mut socket = self.socket_connection.lock().await; + + match socket.read(&mut buffer) { + Ok(size) if size > 0 => { + Ok(buffer[..size].to_vec()) + } + Ok(_) => { + Ok(Vec::new()) + } + Err(e) => { + error!("Error reading from OS socket: {}", e); + Err(anyhow::anyhow!("Failed to read from socket: {}", e)) + } + } + } + + /// 解析 TLV 格式的消息 + /// + /// # 参数 + /// - `data`: 原始字节数据 + /// + /// # 返回值 + /// - `Result`: 成功返回解析后的消息,失败返回错误 + pub fn parse_tlv(data: &[u8]) -> Result { + if data.len() < 8 { + return Err(anyhow::anyhow!("Invalid TLV message: too short")); + } + + let tag = u32::from_be_bytes(data[0..4].try_into()?); + let length = u32::from_be_bytes(data[4..8].try_into()?); + + if data.len() < (8 + length as usize) { + return Err(anyhow::anyhow!("Invalid TLV message: incomplete value")); + } + + let value = data[8..(8 + length as usize)].to_vec(); + + Ok(TLVMessage { tag, length, value }) + } + + /// 发送 TLV 格式的消息到 OS + /// + /// # 参数 + /// - `tag`: 消息标签 + /// - `value`: 消息内容 + /// + /// # 返回值 + /// - `Result<()>`: 成功返回空,失败返回错误 + pub async fn send_tlv_message(&self, tag: u32, value: &[u8]) -> Result<()> { + let mut socket = self.socket_connection.lock().await; + + let mut message = Vec::with_capacity(8 + value.len()); + message.extend_from_slice(&tag.to_be_bytes()); + message.extend_from_slice(&(value.len() as u32).to_be_bytes()); + message.extend_from_slice(value); + + socket.write_all(&message)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_tlv_message_parsing() { + // 测试有效 TLV 消息的解析 + let tag = TAG_TASK_PLANNING; + let value = b"test message"; + let mut data = Vec::new(); + data.extend_from_slice(&tag.to_be_bytes()); + data.extend_from_slice(&(value.len() as u32).to_be_bytes()); + data.extend_from_slice(value); + + let tlv = OSSocketServer::parse_tlv(&data).unwrap(); + assert_eq!(tlv.tag, tag); + assert_eq!(tlv.length, value.len() as u32); + assert_eq!(tlv.value, value); + + // 测试消息太短的情况 + let invalid_data = vec![0u8; 4]; + assert!(OSSocketServer::parse_tlv(&invalid_data).is_err()); + + // 测试消息值不完整的情况 + let mut invalid_data = Vec::new(); + invalid_data.extend_from_slice(&tag.to_be_bytes()); + invalid_data.extend_from_slice(&(100u32).to_be_bytes()); // 长度大于实际数据 + assert!(OSSocketServer::parse_tlv(&invalid_data).is_err()); + } + + #[test] + fn test_tlv_message_construction() { + // 测试 TLV 消息的构建 + let tag = TAG_CLOUD_PLATFORM; + let value = b"cloud data"; + let mut expected = Vec::new(); + expected.extend_from_slice(&tag.to_be_bytes()); + expected.extend_from_slice(&(value.len() as u32).to_be_bytes()); + expected.extend_from_slice(value); + + let mut actual = Vec::with_capacity(8 + value.len()); + actual.extend_from_slice(&tag.to_be_bytes()); + actual.extend_from_slice(&(value.len() as u32).to_be_bytes()); + actual.extend_from_slice(value); + + assert_eq!(actual, expected); + } + + #[test] + fn test_tlv_message_tags() { + // 测试所有预定义的 TLV 标签值 + assert_eq!(TAG_TASK_PLANNING, 0x03030100); + assert_eq!(TAG_CLOUD_PLATFORM, 0x03040100); + assert_eq!(TAG_PROCESSING, 0x03050100); + assert_eq!(TAG_TASK_PARAMS, 0x05010100); + } +} diff --git a/src/cores/state.rs b/src/cores/state.rs index 8deed5a0b5a43ac5f061e40cfd9fca3f2fef590a..e05f3982b8a34ff86a2b71ceda09ba91a26b7868 100644 --- a/src/cores/state.rs +++ b/src/cores/state.rs @@ -1,8 +1,8 @@ -use std::sync::Arc; -use feventbus::impls::messaging::messaging::Messaging; use crate::cores::router::Router; use crate::db::db::DbPool; use crate::messaging::WatchDaemon; +use feventbus::impls::messaging::messaging::Messaging; +use std::sync::Arc; use super::daemons::consensus::ConsensusDaemon; @@ -15,4 +15,4 @@ pub struct AppState { pub consensus_daemon: Arc, pub cluster_id: String, pub token_secret: Vec, -} \ No newline at end of file +}