From 5fcc81d5b95d6ed0c103f19f4a7b76308cf4a41c Mon Sep 17 00:00:00 2001 From: songpenglei Date: Tue, 27 May 2025 11:19:46 +0800 Subject: [PATCH 1/2] Update README with usage examples for basic and publisher scenarios --- crates/os_socket_comms/Cargo.toml | 2 + crates/os_socket_comms/README_zh.md | 150 ++++++++++++++++- .../examples/simple_publisher.rs | 157 ++++++++++++++++++ .../os_socket_comms/examples/simple_sender.rs | 115 +------------ crates/os_socket_comms/src/handler.rs | 36 ++-- 5 files changed, 322 insertions(+), 138 deletions(-) create mode 100644 crates/os_socket_comms/examples/simple_publisher.rs diff --git a/crates/os_socket_comms/Cargo.toml b/crates/os_socket_comms/Cargo.toml index 82b8c05..cc982c2 100644 --- a/crates/os_socket_comms/Cargo.toml +++ b/crates/os_socket_comms/Cargo.toml @@ -11,6 +11,8 @@ log = { workspace = true } tokio = { workspace = true } serde_json = { workspace = true } chrono = { workspace = true } +reqwest = { version = "0.11", features = ["json"] } +clap = { version = "4.4", features = ["derive"] } [dev-dependencies] bytes = "1.0" \ No newline at end of file diff --git a/crates/os_socket_comms/README_zh.md b/crates/os_socket_comms/README_zh.md index 4cf02d2..16b61c2 100644 --- a/crates/os_socket_comms/README_zh.md +++ b/crates/os_socket_comms/README_zh.md @@ -63,6 +63,8 @@ ## 使用示例 +### 基本使用示例 + ```rust use os_socket_comms::{OSSocketHandler, OSSocketServer}; use feventbus::Arc; // 假设 feventbus 用于生产者和消费者 @@ -125,19 +127,159 @@ async fn main() -> anyhow::Result<()> { 如果某个缓存文件不存在或读取失败,则跳过该特定源在该周期的遥测发送。 +### 发布者示例 + +该包还提供了一个发布者示例,演示如何连接到主节点并发布遥测数据: + +```rust +use feventbus::{ + impls::messaging::{datamgr_api, messaging::Messaging}, + message::{Message, NativeEventAction}, + traits::{controller::EventBus, producer::Producer}, +}; +use std::sync::Arc; +use tokio::time::Duration; +use serde_json::{json, Value}; +use clap::Parser; + +const TELEMETRY_PROCESSING_TOPIC: &str = "telemetry_processing"; + +/// Simple publisher example for os_socket_comms +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Master node IP address + #[arg(short, long, default_value = "192.168.122.238")] + master: String, + + /// Worker node IP address + #[arg(short, long, default_value = "192.168.122.91")] + worker: String, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Parse command line arguments + let args = Args::parse(); + println!("Using master node: {}", args.master); + println!("Using worker node: {}", args.worker); + + // 初始化消息客户端 + let mut message_bus_instance = Messaging::new().await?; + + // 设置插件管理器 + let plugin_key = CString::new("core.pluginsToLoad").unwrap(); + let plugin_value = CString::new("Messaging Storage Portal").unwrap(); + + let worker_mq_plugin_manager = unsafe { + init_mq_plugins(&plugin_key, &plugin_value) + }; + + // 获取工作节点和主节点地址 + let worker_cstr = CString::new(args.worker.as_str()).unwrap(); + let leader_cstr = CString::new(args.master.as_str()).unwrap(); + + // 从主节点获取节点信息 + let (node_id, udp_port) = get_node_info(args.master.as_str()).await?; + + // 连接工作节点到主节点 + unsafe { + datamgr_api::StartUdp(worker_mq_plugin_manager, worker_cstr.as_ptr()); + tokio::time::sleep(Duration::from_secs(1)).await; + datamgr_api::Join( + worker_mq_plugin_manager, + node_id_cstr.as_ptr(), + leader_cstr.as_ptr(), + udp_port.parse().unwrap(), + ); + } + + // 等待连接建立 + tokio::time::sleep(Duration::from_secs(1)).await; + + message_bus_instance.set_plugin_manager(worker_mq_plugin_manager); + let message_bus = Arc::new(message_bus_instance); + + // 开始定期发布消息 + let bus_for_periodic_sender = message_bus.clone(); + tokio::spawn(async move { + let mut counter: u64 = 0; + loop { + counter += 1; + let periodic_payload = json!({ + "source": "periodic_sender", + "count": counter, + "timestamp": chrono::Utc::now().to_rfc3339() + }); + + let event_bus_message = Message::new( + TELEMETRY_PROCESSING_TOPIC.to_string(), + NativeEventAction::Other, + None, + Some(periodic_payload.clone()), + None, + ); + + if let Err(e) = bus_for_periodic_sender.publish(event_bus_message).await { + eprintln!("发布消息失败: {}", e); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + + // 保持主线程活动 + tokio::signal::ctrl_c().await?; + println!("正在关闭..."); + Ok(()) +} + +运行发布者示例时,可以通过命令行参数指定主节点和工作节点的 IP 地址: + +```sh +# 使用默认 IP 地址 +cargo run --example simple_publisher + +# 指定主节点和工作节点的 IP 地址 +cargo run --example simple_publisher -- --master 192.168.1.100 --worker 192.168.1.101 + +# 使用短参数形式 +cargo run --example simple_publisher -- -m 192.168.1.100 -w 192.168.1.101 +``` + ## 构建和测试 -构建包: +构建包 simple sender ```sh cargo build ``` -运行测试: +构建包 simple publisher +```sh +cargo build +``` + +运行单元测试: ```sh cargo test ``` 测试包括对 `tlv.rs` 中的 TLV 解析逻辑和 `lib.rs` 中使用的 TLV 编码逻辑的检查。 +运行集成测试,需要包含两个节点或者docker容器,其中一个为 master 节点,另一个为 worker 节点 +1. 在 master 节点运行数据模拟发送端,在 apiserver 工程目录下运行 simple sender +```sh +sudo ./target/debug/examples/simple_sender +``` + +2. 在 master 节点运行 fleet 工程下的主进程, +```sh +sudo ./target/debug/fleet start core +``` + +3. 在 work 节点运行 apiserver 工程下的 simple publisher 测试遥控数据功能,需要将 simple publisher 发送到worker节点,使用如下命令运行 +```sh +sudo ./simple_publisher -m 192.168.122.238 -w 192.168.122.91 +``` + ## 依赖项 主要依赖项包括: @@ -147,6 +289,10 @@ cargo test * `feventbus`: 用于消息总线集成 (特别是 `Producer` 和 `Consumer` 特性)。 * `serde_json`: `OSSocketHandler` 用它来序列化/反序列化事件总线的消息负载。 * `std::fs`: 用于将遥测数据缓存到磁盘的文件操作。 +* `chrono`: 用于时间戳处理。 +* `reqwest`: 用于 HTTP 请求,用于获取节点信息。 +* `async-trait`: 用于异步特性实现。 +* `clap`: 用于命令行参数解析。 ## 许可证 diff --git a/crates/os_socket_comms/examples/simple_publisher.rs b/crates/os_socket_comms/examples/simple_publisher.rs new file mode 100644 index 0000000..b2b187b --- /dev/null +++ b/crates/os_socket_comms/examples/simple_publisher.rs @@ -0,0 +1,157 @@ +use std::ffi::CString; +use std::os::raw::c_void; +use feventbus::{ + impls::messaging::{datamgr_api, messaging::Messaging}, + message::{Message, NativeEventAction}, + traits::{controller::EventBus, producer::Producer}, +}; +use std::sync::Arc; +use tokio::time::Duration; +use serde_json::{json, Value}; +use std::error::Error; +use anyhow::Result; +use clap::Parser; + +const TELEMETRY_PROCESSING_TOPIC: &str = "telemetry_processing"; + +/// Simple publisher example for os_socket_comms +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Master node IP address + #[arg(short, long, default_value = "192.168.122.238")] + master: String, + + /// Worker node IP address + #[arg(short, long, default_value = "192.168.122.91")] + worker: String, +} + +async fn get_node_info(api_server: &str) -> Result<(String, String)> { + let url = format!("http://{}:8080/cluster/eventbus/info", api_server); + let response: Value = reqwest::get(&url) + .await? + .json() + .await?; + + let node_id = response["data"]["node_id"] + .as_str() + .unwrap_or("") + .to_string(); + let udp_port = response["data"]["udp_port"] + .as_str() + .unwrap_or("") + .to_string(); + + Ok((node_id, udp_port)) +} + +unsafe fn init_mq_plugins( + plugin_to_load_key: &CString, + plugin_to_load_value: &CString, +) -> *mut c_void { + let master_plugin_manager = datamgr_api::NewPluginManager(); + datamgr_api::SetParameter( + master_plugin_manager, + plugin_to_load_key.as_ptr(), + plugin_to_load_value.as_ptr(), + ); + + datamgr_api::LoadPlugins(master_plugin_manager); + master_plugin_manager +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Parse command line arguments + let args = Args::parse(); + println!("Using master node: {}", args.master); + println!("Using worker node: {}", args.worker); + + // Initialize messaging client + let mut message_bus_instance = Messaging::new().await?; + + // Setup plugin manager + let plugin_key = CString::new("core.pluginsToLoad").unwrap(); + let plugin_value = CString::new("Messaging Storage Portal").unwrap(); + + let worker_mq_plugin_manager = unsafe { + init_mq_plugins(&plugin_key, &plugin_value) + }; + + // Get worker and leader addresses + let worker_cstr = CString::new(args.worker.clone()).unwrap(); + let leader_cstr = CString::new(args.master.clone()).unwrap(); + + // Get node info from master + let (node_id, udp_port) = get_node_info(&args.master).await?; + println!("Retrieved node info - node_id: {}, udp_port: {}", node_id, udp_port); + + let node_id_cstr = CString::new(node_id).unwrap(); + + // Connect worker to master + unsafe { + datamgr_api::StartUdp(worker_mq_plugin_manager, worker_cstr.as_ptr()); + tokio::time::sleep(Duration::from_secs(1)).await; + datamgr_api::Join( + worker_mq_plugin_manager, + node_id_cstr.as_ptr(), + leader_cstr.as_ptr(), + udp_port.parse().unwrap(), + ); + } + + // Wait for connection to establish + tokio::time::sleep(Duration::from_secs(1)).await; + + message_bus_instance.set_plugin_manager(worker_mq_plugin_manager); + let message_bus = Arc::new(message_bus_instance); + + println!( + "Connected to master at {} as worker {}. Will publish to topic: '{}'", + args.master, + args.worker, + TELEMETRY_PROCESSING_TOPIC + ); + + // Start periodic message publishing + let bus_for_periodic_sender = message_bus.clone(); + tokio::spawn(async move { + let mut counter: u64 = 0; + loop { + counter += 1; + let periodic_payload = json!({ + "source": "periodic_sender", + "count": counter, + "timestamp": chrono::Utc::now().to_rfc3339() + }); + + let event_bus_message = Message::new( + TELEMETRY_PROCESSING_TOPIC.to_string(), + NativeEventAction::Other, + None, + Some(periodic_payload.clone()), + None, + ); + + println!( + "Publishing message to topic '{}': {:?}", + TELEMETRY_PROCESSING_TOPIC, + periodic_payload + ); + + if let Err(e) = bus_for_periodic_sender.publish(event_bus_message).await { + eprintln!( + "Failed to publish message to topic '{}': {}", + TELEMETRY_PROCESSING_TOPIC, e + ); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + }); + + // Keep the main thread alive + tokio::signal::ctrl_c().await?; + println!("Shutting down..."); + Ok(()) +} diff --git a/crates/os_socket_comms/examples/simple_sender.rs b/crates/os_socket_comms/examples/simple_sender.rs index 73d0327..6d52e46 100644 --- a/crates/os_socket_comms/examples/simple_sender.rs +++ b/crates/os_socket_comms/examples/simple_sender.rs @@ -1,19 +1,10 @@ -use feventbus::impls::messaging::datamgr_api; -use feventbus::traits::controller::EventBus; use tokio::net::{UnixListener}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::error::Error; -use std::ffi::CString; use std::time::Duration; use bytes::Buf; use std::fs; -use std::sync::Arc; -use feventbus::impls::messaging::messaging::Messaging; -use feventbus::traits::producer::Producer; -use feventbus::message::{Message, NativeEventAction}; use serde_json; -use serde_json::json; -use std::os::raw::c_void; // This should match the definition in your os_socket_comms::lib.rs const OS_SOCKET_PATH: &str = "/var/run/EulixOnBoardGuardian/os_fleet.sock"; @@ -81,86 +72,9 @@ fn parse_tlv_from_slice(data: &mut &[u8]) -> Result { } -unsafe fn init_mq_plugins( - plugin_to_load_key: &CString, - plugin_to_load_value: &CString, -) -> *mut c_void { - let master_plugin_manager = datamgr_api::NewPluginManager(); - datamgr_api::SetParameter( - master_plugin_manager, - plugin_to_load_key.as_ptr(), - plugin_to_load_value.as_ptr(), - ); - - datamgr_api::LoadPlugins(master_plugin_manager); - master_plugin_manager -} - - #[tokio::main] async fn main() -> Result<(), Box> { // Setup feventbus producer - let mut message_bus_instance = Messaging::new().await?; - - let plugin_to_load_key = CString::new("tests.pluginsToLoad").unwrap(); - let plugin_to_load_value = CString::new("test tlv Storage").unwrap(); - - let plugin_manager_ptr: *mut std::os::raw::c_void = unsafe { - init_mq_plugins(&plugin_to_load_key, &plugin_to_load_value) - }; - - message_bus_instance.set_plugin_manager(plugin_manager_ptr); - - let message_bus = Arc::new(message_bus_instance); - println!( - "Messaging instance created and plugin manager (placeholder) set. Will publish to topic: '{}'", - TELEMETRY_PROCESSING_TOPIC - ); - - // Clone message_bus for the periodic sender task - let bus_for_periodic_sender = message_bus.clone(); - tokio::spawn(async move { - let mut counter: u64 = 0; - loop { - counter += 1; - let periodic_payload = json!({ - "source": "periodic_sender", - "count": counter, - "timestamp": chrono::Utc::now().to_rfc3339() - }); - - let event_bus_message = Message::new( - TELEMETRY_PROCESSING_TOPIC.to_string(), - NativeEventAction::Other, - None, // metadata - Some(periodic_payload.clone()), // body - None, // created_at - ); - - println!( - "Periodic sender: Attempting to send message to topic '{}': {:?}", - TELEMETRY_PROCESSING_TOPIC, - periodic_payload - ); - - match bus_for_periodic_sender.publish(event_bus_message).await { - Ok(_) => { - println!( - "Periodic sender: Successfully published message to topic '{}'.", - TELEMETRY_PROCESSING_TOPIC - ); - } - Err(e) => { - eprintln!( - "Periodic sender: Failed to publish message to topic '{}': {}", - TELEMETRY_PROCESSING_TOPIC, e - ); - } - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - }); - // Attempt to remove the socket file if it already exists // This is important for idempotency, as bind will fail if the file exists. match fs::remove_file(OS_SOCKET_PATH) { @@ -183,7 +97,6 @@ async fn main() -> Result<(), Box> { match listener.accept().await { Ok((mut stream, client_addr)) => { println!("Accepted connection from: {:?}", client_addr); - let bus_clone_for_handler = message_bus.clone(); // Clone Arc for use in this handler scope // Attempt to read a message println!("Attempting to read message from client..."); @@ -211,28 +124,7 @@ async fn main() -> Result<(), Box> { // Construct the feventbus::message::Message let json_payload = serde_json::to_value(tlv_message.value.clone())?; - let event_bus_message = Message::new( - TELEMETRY_PROCESSING_TOPIC.to_string(), - NativeEventAction::Other, // Using Other as a default - None, // metadata - Some(json_payload), // body - None, // created_at or other optional fields - ); - match bus_clone_for_handler.publish(event_bus_message).await { - Ok(_) => { - println!( - "Successfully published payload to topic '{}'.", - TELEMETRY_PROCESSING_TOPIC - ); - } - Err(e) => { - eprintln!( - "Failed to publish payload to topic '{}': {}", - TELEMETRY_PROCESSING_TOPIC, e - ); - } - } // Optionally, send a response back to the client here // For example: @@ -275,8 +167,5 @@ async fn main() -> Result<(), Box> { // Optionally, add a small delay here or more robust error handling if accept fails repeatedly } } - } // End of the loop, will never be reached in this simple form - // To make it exit gracefully, you'd need a signal handler (e.g., Ctrl+C) - // println!("Server shutting down."); // This line is now unreachable - // Ok(()) -} \ No newline at end of file + } +} \ No newline at end of file diff --git a/crates/os_socket_comms/src/handler.rs b/crates/os_socket_comms/src/handler.rs index 240663a..a53f6c3 100644 --- a/crates/os_socket_comms/src/handler.rs +++ b/crates/os_socket_comms/src/handler.rs @@ -185,7 +185,7 @@ impl OSSocketHandler

let cache_file_path_for_handler = cache_file_path; let fut = async move { - debug!( + info!( "遥测订阅者收到主题 '{}' 的消息: {:?}", topic_name_for_handler, message ); @@ -247,28 +247,18 @@ impl OSSocketHandler

pinned_fut }); - // Loop to continuously attempt subscription - loop { - match self - .message_producer - .as_ref() - .subscribe(topic_name, message_handler.clone()) - .await - { - // Cloned message_handler for retries - Ok(_) => { - info!("OSSocketHandler: 成功订阅主题 '{}' (缓存到 {}) 以获取遥测数据。该订阅将保持活动状态。", topic_name, cache_file_path); - // The subscription is now active. feventbus will call the handler for new messages. - // This task can now block indefinitely, or if subscribe() internally manages the lifetime, - // it could even complete. For robust explicit blocking: - std::future::pending::<()>().await; // Keeps this spawned task alive indefinitely after successful subscription. - // This break will not be reached if std::future::pending is used. - break; - } - Err(e) => { - error!("OSSocketHandler: 订阅主题 '{}' (缓存到 {}) 以获取遥测数据失败: {}. 将在 5 秒后重试。", topic_name, cache_file_path, e); - tokio::time::sleep(Duration::from_secs(5)).await; - } + match self + .message_producer + .as_ref() + .subscribe(topic_name, message_handler.clone()) + .await + { + // Cloned message_handler for retries + Ok(_) => { + info!("OSSocketHandler: 成功订阅主题 '{}' (缓存到 {}) 以获取遥测数据。该订阅将保持活动状态。", topic_name, cache_file_path); + } + Err(e) => { + error!("OSSocketHandler: 订阅主题 '{}' (缓存到 {}) 以获取遥测数据失败: {}. 将在 5 秒后重试。", topic_name, cache_file_path, e); } } } -- Gitee From 54544fdf2b5801471ebde480ac1f662ba46f3437 Mon Sep 17 00:00:00 2001 From: songpenglei Date: Tue, 27 May 2025 11:25:19 +0800 Subject: [PATCH 2/2] Update README_zh.md with specific build commands for simple_sender and simple_publisher examples --- crates/os_socket_comms/README_zh.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/os_socket_comms/README_zh.md b/crates/os_socket_comms/README_zh.md index 16b61c2..2c69ccd 100644 --- a/crates/os_socket_comms/README_zh.md +++ b/crates/os_socket_comms/README_zh.md @@ -250,12 +250,12 @@ cargo run --example simple_publisher -- -m 192.168.1.100 -w 192.168.1.101 构建包 simple sender ```sh -cargo build +cargo build --example simple_sender -p os_socket_comms ``` 构建包 simple publisher ```sh -cargo build +cargo build --example simple_publisher -p os_socket_comms ``` 运行单元测试: -- Gitee