diff --git a/Cargo.toml b/Cargo.toml index dc32d8217b63133528cdea9fa21c7c3915b70188..20d784892fe36be224ac68591baf9b9d61863b5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,4 +66,5 @@ base64 = "0.22.1" bcrypt = "0.14" jsonwebtoken = "9.2" rand = "0.8" -futures-util = "0.3.31" \ No newline at end of file +futures-util = "0.3.31" +serialport = "=4.6.1" diff --git a/README.md b/README.md index f4d9b9366400f25a9f0ae89854430d459c6c7d69..0c2faa4183b12c80e07a214a115223b22fb28a65 100644 --- a/README.md +++ b/README.md @@ -48,5 +48,31 @@ python3 client.py -v --host=127.0.0.1 --port=4433 /test \ --data '{"key1": "value1", "key2": "value2"}' ``` +### Mock HTTP over RS422 +安装socat用于模拟串口 + +```sh +sudo dnf install socat +``` + +使用socat创建发送和接收串口,波特率921600bps、8位数据位、1位奇校验位、1位停止位,之后启动apiserver + +```sh +sudo socat PTY,link=/dev/RS422_send,raw,echo=0,b921600,cs8,parenb=1,cstopb=0,perm=0666 PTY,link=/dev/RS422_recv,raw,echo=0,b921600,cs8,parenb=1,cstopb=0,perm=0666 +``` + +生成约10MB随机数据,按896B对齐 + +```sh +dd if=/dev/urandom of=random_file.bin bs=896 count=12000 +``` + +运行`examples/send_rs422_packet.rs`,服务接收到数据通过接口存入`storage.replica.location/test.binapp云平台`,并在本地存为`test.bin`用于验证 + +检查数据接收是否正确 + +```sh +diff random_file.bin test.bin +``` diff --git a/examples/basic_example.rs b/examples/basic_example.rs index 59702dd863dfaf01567dbd7b07cb307b6f0eab63..3e7601631d59b6c9c91a16ac607755bf0feab703 100644 --- a/examples/basic_example.rs +++ b/examples/basic_example.rs @@ -7,6 +7,7 @@ use std::ffi::CString; const TCP_ADDRESS: &str = "0.0.0.0:38080"; const UDP_ADDRESS: &str = "0.0.0.0:38081"; const QUIC_ADDRESS: &str = "0.0.0.0:38082"; +const RS422_ADDRESS: &str = "/dev/RS422_recv"; #[actix_web::main] async fn main() { @@ -18,6 +19,7 @@ async fn main() { .actix_web_udp_address(UDP_ADDRESS.to_string()) .actix_web_tcp_address(TCP_ADDRESS.to_string()) .actix_web_quic_address(QUIC_ADDRESS.to_string()) + .actix_web_rs422_address(RS422_ADDRESS.to_string()) .build()).await; // waiting for ctrl-c signal log::info!("APIServer started, cluster_id: {}", app_state.cluster_id); diff --git a/examples/send_rs422_packet.rs b/examples/send_rs422_packet.rs new file mode 100644 index 0000000000000000000000000000000000000000..b58117f3786e1d269a00f3dced915061438116de --- /dev/null +++ b/examples/send_rs422_packet.rs @@ -0,0 +1,18 @@ +use std::fs; + +use fleet_apiserver::cores::servers::actix_web::rs422::{frame::Frame, handle_packet::DataInfo, port::send_packet}; + +fn main() { + let upload_file = fs::read("./random_file.bin").unwrap(); + let file_len = upload_file.len(); + let num_packets = file_len / 896; + for i in 0..num_packets { + let data = &upload_file[i * 896..(i + 1) * 896]; + let data_info = DataInfo::new("test.bin", num_packets, i).to_bytes(); + let mut bytes = Vec::new(); + bytes.extend_from_slice(&data_info); + bytes.extend_from_slice(data); + let packet = Frame::new(0x1234, &bytes); + send_packet("/dev/RS422_send", &packet); + } +} \ No newline at end of file diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 811b01616de6199f376ee1b18206943efc141735..21480b0c44a3a649dd90c5b00037f9f21d60d068 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -4,10 +4,13 @@ mod udp; mod quic; mod utils; +pub mod rs422; + use crate::cores::router::RouterKey; use crate::cores::servers::actix_web::quic::QuicImpl; use crate::cores::servers::actix_web::tcp::TCPImpl; use crate::cores::servers::actix_web::udp::UDPImpl; +use crate::cores::servers::actix_web::rs422::server::RS422Impl; use crate::cores::servers::Server; use crate::cores::state::AppState; use crate::utils::headers; @@ -43,6 +46,10 @@ pub fn quic(addr: &str) -> Box { Box::new(QuicImpl::new(addr)) } +pub fn rs422(path: &str) -> Box { + Box::new(RS422Impl::new(path)) +} + #[macro_export] macro_rules! init_app { ($app_state:expr) => {{ diff --git a/src/cores/servers/actix_web/rs422/buffer.rs b/src/cores/servers/actix_web/rs422/buffer.rs new file mode 100755 index 0000000000000000000000000000000000000000..bb096a2f4c65f5f56db4f1a352e224f83a6ef68a --- /dev/null +++ b/src/cores/servers/actix_web/rs422/buffer.rs @@ -0,0 +1,157 @@ +// 用于RS422串口接收数据的缓冲区,SPSC模型,无需加锁 + +pub struct RingBuffer { + buffer: Vec, // 缓冲区 + head: usize, // 头指针 + tail: usize, // 尾指针 + size: usize, // 缓冲区的大小 +} + +impl RingBuffer { + // 初始化环形缓冲区 + pub fn new(size: usize) -> Self { + RingBuffer { + buffer: vec![0u8; size + 1], + head: 0, + tail: 0, + size: size + 1, + } + } + + // 获取当前缓冲区占用的大小 + pub fn used(&self) -> usize { + (self.tail + self.size - self.head) % self.size + } + + // 获取当前缓冲区剩余的大小 + pub fn free(&self) -> usize { + self.size - self.used() - 1 + } + + // 向缓冲区写入数据 + pub fn write(&mut self, data: &[u8]) { + let data_len = data.len(); + loop { + if self.free() >= data_len { + break; + } + } + + if self.tail + data_len > self.size { + let right = self.size - self.tail; + let left = data_len - right; + self.buffer[self.tail..].copy_from_slice(&data[..right]); + self.buffer[..left].copy_from_slice(&data[right..]); + } else { + self.buffer[self.tail..self.tail + data_len].copy_from_slice(&data); + } + self.tail = (self.tail + data_len) % self.size; + } + + // 从缓冲区读取数据 + pub fn read(&mut self, len: usize) -> Vec { + let mut result = vec![0u8; len]; + loop { + if self.used() >= len { + break; + } + } + + if self.head + len > self.size { + let right = self.size - self.head; + let left = len - right; + result[..right].copy_from_slice(&self.buffer[self.head..]); + result[right..].copy_from_slice(&self.buffer[..left]); + } else { + result.copy_from_slice(&self.buffer[self.head..self.head + len]); + } + self.head = (self.head + len) % self.size; + + result + } + + // 读数据包:从头指针开始,扫描缓冲区,直到读到一个完整的包 + pub fn read_packet(&mut self) -> Vec { + fn size_in_pointers(head: usize, tail: usize, buf_size: usize) -> usize { + (tail + buf_size - head) % buf_size + } + let mut pointer = self.head; + + loop { + // Get frame header sign + loop { + if size_in_pointers(pointer, self.tail, self.size) >= 2 { + if u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]) == 0xEB90 + { + self.head = pointer; + break; + } else { + pointer = (pointer + 1) % self.size; + } + } + } + + pointer = (pointer + 2) % self.size; + + // Get package length + let mut len = 10usize; + loop { + if size_in_pointers(pointer, self.tail, self.size) >= 2 { + let data_len = u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]); + len += usize::from(data_len); + break; + } + } + + // 检查收到的数据长度,若长度不够数据帧长度,等待10毫秒 + if self.used() < len { + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // Assume that the package is valid, check the frame tail directly + // to get the package quickly and avoid the case that there is frame + // tail in data field. + if self.used() >= len { + let tail_sign = u16::from_be_bytes([ + self.buffer[(self.head + len - 2) % self.size], + self.buffer[(self.head + len - 1) % self.size], + ]); + if tail_sign == 0x09D7 { + return self.read(len); + } + } + + // Else, get the frame tail sign byte by byte + loop { + if size_in_pointers(pointer, self.tail, self.size) >= 2 { + if u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]) == 0xEB90 + { + // 又读到帧头,退出循环从头开始 + println!("Drop packet: get a new header before getting a tail."); + break; + } else if u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]) == 0x09D7 + { + pointer = (pointer + 2) % self.size; + len = size_in_pointers(self.head, pointer, self.size); + + return self.read(len); + } else { + pointer = (pointer + 1) % self.size; + } + } + } + } + } +} diff --git a/src/cores/servers/actix_web/rs422/frame.rs b/src/cores/servers/actix_web/rs422/frame.rs new file mode 100644 index 0000000000000000000000000000000000000000..f3772910142d0032aca8361f2e8fae13fa22ee13 --- /dev/null +++ b/src/cores/servers/actix_web/rs422/frame.rs @@ -0,0 +1,74 @@ +pub struct Frame { + pub head: u16, + pub length: u16, + pub data_type: u16, + pub data: Vec, + pub checksum: u16, + pub tail: u16, +} + +impl Frame { + pub fn new(data_type: u16, data: &[u8]) -> Self { + Frame { + head: 0xEB90, + length: data.len() as u16, + data_type, + data: data.to_vec(), + checksum: check_sum(data_type, data), + tail: 0x09D7, + } + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + let len = bytes.len(); + if len > 10 { + let frame = Frame { + head: u16::from_be_bytes([bytes[0], bytes[1]]), + length: u16::from_be_bytes([bytes[2], bytes[3]]), + data_type: u16::from_be_bytes([bytes[4], bytes[5]]), + data: bytes[6..len - 4].to_vec(), + checksum: u16::from_be_bytes([bytes[len - 4], bytes[len - 3]]), + tail: u16::from_be_bytes([bytes[len - 2], bytes[len - 1]]), + }; + Ok(frame) + } else { + Err("Frame length too short.".to_string()) + } + } + + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + + bytes.extend_from_slice(&self.head.to_be_bytes()); + bytes.extend_from_slice(&self.length.to_be_bytes()); + bytes.extend_from_slice(&self.data_type.to_be_bytes()); + bytes.extend_from_slice(&self.data); + bytes.extend_from_slice(&self.checksum.to_be_bytes()); + bytes.extend_from_slice(&self.tail.to_be_bytes()); + + bytes + } + + pub fn check(&self) -> bool { + if (self.head == 0xEB90) + && (self.length == (self.data.len() as u16)) + && (self.checksum == check_sum(self.data_type, &self.data)) + && (self.tail == 0x09D7) + { + true + } else { + false + } + } +} + +// Compute checksum +fn check_sum(data_type: u16, data: &[u8]) -> u16 { + let mut bytes: Vec = Vec::new(); + bytes.extend_from_slice(&data_type.to_be_bytes()); + bytes.extend_from_slice(data); + + bytes + .iter() + .fold(0u16, |acc, &byte| acc.wrapping_add(byte as u16)) +} diff --git a/src/cores/servers/actix_web/rs422/handle_packet.rs b/src/cores/servers/actix_web/rs422/handle_packet.rs new file mode 100755 index 0000000000000000000000000000000000000000..534cea7075dd8126f703dfc90f963ec3c9e655f5 --- /dev/null +++ b/src/cores/servers/actix_web/rs422/handle_packet.rs @@ -0,0 +1,286 @@ +#![allow(static_mut_refs)] + +// For sending and receiving RS422 packets + +use std::{collections::HashMap, ffi::CString}; + +use crate::cores::handlers::datamgr::{datamgr_api, DATA_PLUGIN_MANAGER}; + +use super::{buffer::RingBuffer, frame::Frame, port::open_port}; + +// 接收串口数据缓冲区 +static mut REFACTOR_BUFFERS: Vec = Vec::new(); + +// 数据帧最小长度 +const REFACTOR_FRAME_LEAST_LENGTH: usize = 915; +// 单个分片中重构数据长度 +const REFACTOR_DATA_LENGTH: usize = 896; +// 不允许覆写的重构数据基础版本文件名 +const BASE_REFACTOR_DATA_FILE: &str = "0x11-BaseSoftware-v1-11-11.bin"; + +// 重构数据信息 +pub struct DataInfo { + file_name: String, // 文件名 + total_frame: usize, // 帧总数 + frame_idx: usize, // 帧序号 +} + +impl DataInfo { + pub fn new(file_name: &str, total_frame: usize, frame_idx: usize) -> Self { + DataInfo { + file_name: file_name.to_string(), + total_frame, + frame_idx, + } + } + + fn from_bytes(data: &[u8]) -> Self { + let len = data.len(); + let name_end = len - 8; + DataInfo { + file_name: String::from_utf8_lossy(&data[..name_end]).to_string(), + total_frame: u32::from_be_bytes([ + data[name_end], + data[name_end + 1], + data[name_end + 2], + data[name_end + 3], + ]) as usize, + frame_idx: u32::from_be_bytes([ + data[name_end + 4], + data[name_end + 5], + data[name_end + 6], + data[name_end + 7], + ]) as usize, + } + } + + // 检查帧序号是否越界 + fn check_frame_idx(&self) -> bool { + self.frame_idx < self.total_frame + } + + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + bytes.extend_from_slice(self.file_name.as_bytes()); + bytes.extend_from_slice(&(self.total_frame as u32).to_be_bytes()); + bytes.extend_from_slice(&(self.frame_idx as u32).to_be_bytes()); + + bytes + } +} + +// 重构数据拼接缓冲区 +struct RefactorData { + file_name: String, // 重构数据文件名 + total_frame: usize, // 数据帧总数 + received_frames: Vec, // 已收到的数据帧,下标为帧序号,true为已收到 + next_cons_idx: usize, // 已收到的连续数据帧的最大帧序号的下一个未收到的帧序号 + data: Vec, // 数据缓冲区,按帧序号放入 +} + +impl RefactorData { + fn new(file_name: &str, total_frame: usize) -> Self { + RefactorData { + file_name: file_name.to_string(), + total_frame, + received_frames: vec![false; total_frame + 1], + next_cons_idx: 0, + data: vec![0u8; REFACTOR_DATA_LENGTH * total_frame], + } + } + + // 重构数据是否接收完整 + fn complete(&self) -> bool { + self.total_frame == self.next_cons_idx + } + + // 插入重构数据 + fn insert(&mut self, frame_idx: usize, refactor_data: &[u8]) { + self.data[REFACTOR_DATA_LENGTH * frame_idx..REFACTOR_DATA_LENGTH * (frame_idx + 1)] + .copy_from_slice(refactor_data); + if self.received_frames[frame_idx] { + println!( + "Receive redundant refactor packet, file name: {}, frame index: {}, update data using it", + self.file_name, frame_idx + ); + } else { + self.received_frames[frame_idx] = true; + if self.next_cons_idx == frame_idx { + for i in frame_idx..self.total_frame + 1 { + if !self.received_frames[i] { + self.next_cons_idx = i; + break; + } + } + } + println!( + "Receive refactor packet, file name: {}, frame index: {}, next consecutive frame index to receive: {}", + self.file_name, frame_idx, self.next_cons_idx + ); + } + } +} + +// Get valid RS422 refactor packet data types +fn valid_refactor_data_types() -> Vec { + [0x1234, 0x5678].to_vec() +} + +fn recv_bytes_from_port(path: &str, buf_idx: usize) { + let p = open_port(path); + + let mut port = match p { + Ok(res) => res, + Err(err) => { + println!("Failed to open port {} due to {}", path, err.to_string()); + return; + } + }; + + loop { + let mut buf: Vec = vec![0; 1024]; + let res = port.read(buf.as_mut_slice()); + + match res { + Ok(size) => unsafe { + REFACTOR_BUFFERS[buf_idx].write(&buf[..size]); + }, + Err(err) => { + if err.kind() != std::io::ErrorKind::TimedOut { + println!( + "Failed to read refactor data from RS422 port {} due to {}, retry after 30s", + path, + err.to_string() + ); + std::thread::sleep(std::time::Duration::from_secs(30)); + } + } + } + } +} + +fn handle_packets(buf_idx: usize) { + let mut refactor_data_map: HashMap = HashMap::new(); + + loop { + unsafe { + let p = REFACTOR_BUFFERS[buf_idx].read_packet(); + if p.len() >= REFACTOR_FRAME_LEAST_LENGTH { + let packet = Frame::from_bytes(&p).unwrap(); + if packet.check() { + if valid_refactor_data_types().contains(&packet.data_type) { + let data_info_len = (packet.length as usize) - REFACTOR_DATA_LENGTH; + let data_info = DataInfo::from_bytes(&packet.data[..data_info_len]); + if data_info.check_frame_idx() { + if let Some(mut refactor_data) = + refactor_data_map.remove(&data_info.file_name) + { + refactor_data + .insert(data_info.frame_idx, &packet.data[data_info_len..]); + if refactor_data.complete() { + println!( + "Receive refactor data completely, file name: {}", + data_info.file_name + ); + std::thread::spawn(move || { + save_refactor_data( + &refactor_data.file_name, + &refactor_data.data, + ); + }); + } else { + refactor_data_map.insert(data_info.file_name, refactor_data); + } + } else { + let mut refactor_data = + RefactorData::new(&data_info.file_name, data_info.total_frame); + refactor_data + .insert(data_info.frame_idx, &packet.data[data_info_len..]); + if refactor_data.complete() { + println!( + "Receive refactor data completely, file name: {}", + data_info.file_name + ); + std::thread::spawn(move || { + save_refactor_data( + &refactor_data.file_name, + &refactor_data.data, + ); + }); + } else { + refactor_data_map.insert(data_info.file_name, refactor_data); + } + } + } else { + println!("Drop refactor packet: frame index out of bound"); + } + } else { + println!("Drop refactor packet: invalid data type"); + } + } else { + println!("Drop refactor packet: invalid frame received"); + } + } else { + println!("Drop refactor packet: invalid frame received length"); + } + } + } +} + +fn save_refactor_data(file_name: &str, data: &[u8]) { + if file_name != BASE_REFACTOR_DATA_FILE { + upload_data(file_name, data); + loop { + let result = std::fs::write("./".to_string() + file_name, data); + match result { + Ok(_) => { + println!("Save refactor data file {}", file_name); + return; + } + Err(err) => { + println!( + "Failed to save refactor data file {} due to {}, retry after 30s", + file_name, + err.to_string() + ); + std::thread::sleep(std::time::Duration::from_secs(30)); + } + } + } + } else { + println!("Failed to save refactor data in attempting to overwrite base file"); + } +} + +fn upload_data(file_name: &str, data: &[u8]) { + unsafe { + let data_type = CString::new("app").unwrap(); + let data_name = CString::new(file_name).unwrap(); + let to = CString::new("云平台").unwrap(); + let ret = datamgr_api::UploadData( + DATA_PLUGIN_MANAGER, + data_type.as_ptr(), + data_name.as_ptr(), + to.as_ptr(), + data.as_ptr() as *const i8, + data.len() as i32, + ); + if ret != 1 { + println!( + "Failed to upload refactor data file {}, retry after 30s", + file_name + ); + } + } +} + +pub fn run_service(path: &str, buf_size: usize) { + unsafe { + let buf_idx = REFACTOR_BUFFERS.len(); + REFACTOR_BUFFERS.push(RingBuffer::new(buf_size)); + std::thread::spawn(move || { + handle_packets(buf_idx); + }); + recv_bytes_from_port(path, buf_idx); + } +} diff --git a/src/cores/servers/actix_web/rs422/mod.rs b/src/cores/servers/actix_web/rs422/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..ef70fa5613bcf43870ec529ef46ed986b3c9e1f1 --- /dev/null +++ b/src/cores/servers/actix_web/rs422/mod.rs @@ -0,0 +1,5 @@ +mod buffer; +pub mod frame; +pub mod port; +pub mod handle_packet; +pub mod server; \ No newline at end of file diff --git a/src/cores/servers/actix_web/rs422/port.rs b/src/cores/servers/actix_web/rs422/port.rs new file mode 100644 index 0000000000000000000000000000000000000000..12aee4cd63b34b7e24f24d04a0e05de867094eb8 --- /dev/null +++ b/src/cores/servers/actix_web/rs422/port.rs @@ -0,0 +1,30 @@ +use super::frame::Frame; + +pub fn open_port(path: &str) -> serialport::Result> { + let port = serialport::new(path, 921600) + .timeout(std::time::Duration::from_secs(1)) + .data_bits(serialport::DataBits::Eight) + .parity(serialport::Parity::Even) + .stop_bits(serialport::StopBits::One) + .open(); + + port +} + +pub fn send_packet(path: &str, packet: &Frame) { + let p = open_port(path); + + let mut port = match p { + Ok(res) => res, + Err(err) => { + println!("Failed to open port {} due to {}", path, err.to_string()); + return; + } + }; + + let res = port.write_all(&packet.to_bytes()); + + if let Err(err) = res { + println!("Failed to send packet due to {}", err.to_string()); + } +} diff --git a/src/cores/servers/actix_web/rs422/server.rs b/src/cores/servers/actix_web/rs422/server.rs new file mode 100644 index 0000000000000000000000000000000000000000..0f8a56d1a5ec41631e205aee215840332dbc2426 --- /dev/null +++ b/src/cores/servers/actix_web/rs422/server.rs @@ -0,0 +1,32 @@ +use log::*; + +use async_trait::async_trait; + +use crate::cores::servers::Server; +use crate::cores::state::AppState; + +use super::handle_packet::run_service; + +pub struct RS422Impl { + pub path: String, +} + +impl RS422Impl { + pub fn new(path: &str) -> Self { + RS422Impl { + path: path.to_string(), + } + } +} + +#[allow(unused)] +#[async_trait] +impl Server for RS422Impl { + async fn start(&self, app_state: AppState) { + info!("Starting RS422 mock Actix web server at {}", self.path); + let path = self.path.clone(); + actix_web::rt::task::spawn_blocking(move || { + run_service(&path, 10485760); + }); + } +} diff --git a/src/utils/test.rs b/src/utils/test.rs index 6767a2cdd51abce1179b0cbf3ba56e456585cb63..9393a31005828e48a785a5589c0f394a68f52752 100644 --- a/src/utils/test.rs +++ b/src/utils/test.rs @@ -1,13 +1,13 @@ -use std::ffi::CString; -use std::os::raw::c_void; -use std::sync::Arc; -use bon::Builder; -use feventbus::impls::messaging::messaging::Messaging; -use feventbus::traits::controller::EventBus; use crate::cores::handlers::datamgr::datamgr_api; use crate::cores::servers::{self, MessagingServer, Server}; use crate::cores::state::AppState; use crate::prepare_app_state; +use bon::Builder; +use feventbus::impls::messaging::messaging::Messaging; +use feventbus::traits::controller::EventBus; +use std::ffi::CString; +use std::os::raw::c_void; +use std::sync::Arc; #[allow(unused)] pub async fn setup_message_cli() -> Messaging { @@ -17,7 +17,8 @@ pub async fn setup_message_cli() -> Messaging { let plugin_to_load_key = CString::new("core.pluginsToLoad").unwrap(); let plugin_to_load_value = CString::new("Messaging Storage Portal").unwrap(); - let leader_mq_plugin_manager = unsafe { init_mq_plugins(&plugin_to_load_key, &plugin_to_load_value) }; + let leader_mq_plugin_manager = + unsafe { init_mq_plugins(&plugin_to_load_key, &plugin_to_load_value) }; let leader_address = CString::new("127.0.0.1").unwrap(); unsafe { @@ -44,7 +45,10 @@ pub async fn setup_message_cli() -> Messaging { messaging_client } -unsafe fn init_mq_plugins(plugin_to_load_key: &CString, plugin_to_load_value: &CString) -> *mut c_void { +unsafe fn init_mq_plugins( + plugin_to_load_key: &CString, + plugin_to_load_value: &CString, +) -> *mut c_void { let master_plugin_manager = datamgr_api::NewPluginManager(); datamgr_api::SetParameter( master_plugin_manager, @@ -56,11 +60,11 @@ unsafe fn init_mq_plugins(plugin_to_load_key: &CString, plugin_to_load_value: &C master_plugin_manager } - - #[allow(unused)] pub fn tear_down_message_cli(messaging_client: Arc) { - let plugin_manager = messaging_client.get_plugin_manager().expect("Plugin manager is not set"); + let plugin_manager = messaging_client + .get_plugin_manager() + .expect("Plugin manager is not set"); unsafe { datamgr_api::StopUdp(plugin_manager); datamgr_api::UnloadPlugins(plugin_manager); @@ -69,7 +73,13 @@ pub fn tear_down_message_cli(messaging_client: Arc) { } #[allow(unused)] -pub async fn start_test_api_server(app_state: AppState, tcp_address: String, udp_address: Option, quic_address: Option) -> AppState { +pub async fn start_test_api_server( + app_state: AppState, + tcp_address: String, + udp_address: Option, + quic_address: Option, + rs422_address: Option, +) -> AppState { // 启动watch相关事件监听协程 let app_state_watch = app_state.clone(); tokio::spawn(async move { @@ -86,23 +96,41 @@ pub async fn start_test_api_server(app_state: AppState, tcp_address: String, udp }); let actix_web_tcp_server_app_state = app_state.clone(); tokio::spawn(async move { - let _ = actix_web_tcp_server.start(actix_web_tcp_server_app_state).await; + let _ = actix_web_tcp_server + .start(actix_web_tcp_server_app_state) + .await; }); if let Some(udp_address) = udp_address { let actix_web_udp_server: Box = servers::actix_web::udp(udp_address.as_str()); let actix_web_udp_server_app_state = app_state.clone(); tokio::spawn(async move { - let _ = actix_web_udp_server.start(actix_web_udp_server_app_state).await; - }); + let _ = actix_web_udp_server + .start(actix_web_udp_server_app_state) + .await; + }); } if let Some(quic_address) = quic_address { - let actix_web_quic_server: Box = servers::actix_web::quic(quic_address.as_str()); + let actix_web_quic_server: Box = + servers::actix_web::quic(quic_address.as_str()); let actix_web_quic_server_app_state = app_state.clone(); tokio::spawn(async move { - let _ = actix_web_quic_server.start(actix_web_quic_server_app_state).await; + let _ = actix_web_quic_server + .start(actix_web_quic_server_app_state) + .await; + }); + } + + if let Some(rs422_address) = rs422_address { + let actix_web_rs422_server: Box = + servers::actix_web::rs422(rs422_address.as_str()); + let actix_web_rs422_server_app_state = app_state.clone(); + tokio::spawn(async move { + let _ = actix_web_rs422_server + .start(actix_web_rs422_server_app_state) + .await; }); } - + app_state } @@ -115,10 +143,15 @@ pub async fn setup_test_env(address: Option<&str>) -> (Arc, Arc, pub actix_web_udp_address: Option, pub actix_web_quic_address: Option, + pub actix_web_rs422_address: Option, } #[allow(unused)] pub async fn setup_full_test_env(params: TestServerStartParams) -> (Arc, Arc) { - let tcp_address = params.actix_web_tcp_address.unwrap_or("127.0.0.1:38080".to_string()); + let tcp_address = params + .actix_web_tcp_address + .unwrap_or("127.0.0.1:38080".to_string()); let udp_address = params.actix_web_udp_address; let quic_address = params.actix_web_quic_address; + let rs422_address = params.actix_web_rs422_address; // 删除数据库文件 std::fs::remove_file(DATABASE_PATH).unwrap_or_default(); let msg_cli = Arc::new(setup_message_cli().await); - let app_state = prepare_app_state(format!("{}{}", DATABASE_URL_PREFIX, DATABASE_PATH).as_str(), msg_cli.clone()).await.expect("prepare app state failed"); + let app_state = prepare_app_state( + format!("{}{}", DATABASE_URL_PREFIX, DATABASE_PATH).as_str(), + msg_cli.clone(), + ) + .await + .expect("prepare app state failed"); let app_state_moved = app_state.clone(); tokio::spawn(async move { - start_test_api_server(app_state_moved, tcp_address, udp_address, quic_address).await; + start_test_api_server( + app_state_moved, + tcp_address, + udp_address, + quic_address, + rs422_address, + ) + .await; }); (msg_cli, Arc::new(app_state)) -} \ No newline at end of file +}