From 93e65a253132bc564f53d4362eb55791e04ee670 Mon Sep 17 00:00:00 2001 From: wangziyue Date: Mon, 10 Mar 2025 10:16:27 +0800 Subject: [PATCH 1/5] Simple RS422 handler --- Cargo.toml | 1 + src/utils/mod.rs | 2 + src/utils/rs422_buffer.rs | 152 +++++++++++++++++++++++++++ src/utils/rs422_packet.rs | 213 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 368 insertions(+) create mode 100644 src/utils/rs422_buffer.rs create mode 100644 src/utils/rs422_packet.rs diff --git a/Cargo.toml b/Cargo.toml index 3a7fd22..6a592bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,3 +54,4 @@ tokio-stream = "0.1.17" enum-as-inner = "0.6.1" bon = "3.3.2" derive_more = { version = "2.0.1", features = ["full"] } +serialport = "4.6.1" diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 83db0bf..e14082c 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,5 +1,7 @@ pub mod headers; mod uuid; +mod rs422_buffer; +mod rs422_packet; #[cfg(feature = "test")] pub mod test; diff --git a/src/utils/rs422_buffer.rs b/src/utils/rs422_buffer.rs new file mode 100644 index 0000000..42410b4 --- /dev/null +++ b/src/utils/rs422_buffer.rs @@ -0,0 +1,152 @@ +// 用于RS422串口接收数据的缓冲区,SPSC模型,无需加锁 + +const BUFFER_SIZE: usize = 1024; + +pub struct RingBuffer { + buffer: [u8; BUFFER_SIZE + 1], // 使用堆分配的缓冲区 + head: usize, // 头指针 + tail: usize, // 尾指针 + size: usize, // 缓冲区的大小 +} + +impl RingBuffer { + // 初始化环形缓冲区 + pub const fn new() -> Self { + RingBuffer { + buffer: [0; BUFFER_SIZE + 1], + head: 0, + tail: 0, + size: BUFFER_SIZE + 1, + } + } + + // 获取当前缓冲区占用的大小 + pub fn used(&self) -> usize { + (self.tail - self.head + self.size) % self.size + } + + // 获取当前缓冲区剩余的大小 + pub fn free(&self) -> usize { + self.size - self.used() - 1 + } + + // 向缓冲区写入数据 + pub fn write(&mut self, data: &[u8]) { + let data_len = data.len(); + loop { + if self.free() >= data_len { + break; + } + } + + if self.tail + data_len > self.size { + let right = self.size - self.tail; + let left = data_len - right; + self.buffer[self.tail..].copy_from_slice(&data[..right]); + self.buffer[..left].copy_from_slice(&data[right..]); + } else { + self.buffer[self.tail..self.tail + data_len].copy_from_slice(&data); + } + self.tail = (self.tail + data_len) % self.size; + } + + // 从缓冲区读取数据 + pub fn read(&mut self, len: usize) -> Vec { + let mut result = vec![0u8; len]; + loop { + if self.used() >= len { + break; + } + } + + if self.head + len > self.size { + let right = self.size - self.head; + let left = len - right; + result[..right].copy_from_slice(&self.buffer[self.head..]); + result[right..].copy_from_slice(&self.buffer[..left]); + } else { + result.copy_from_slice(&self.buffer[self.head..self.head + len]); + } + self.head = (self.head + len) % self.size; + + result + } + + // 读数据包:从头指针开始,扫描缓冲区,直到读到一个完整的包 + pub fn read_packet(&mut self) -> Vec { + fn size_in_pointers(head: usize, tail: usize, buf_size: usize) -> usize { + (tail - head + buf_size) % buf_size + } + let mut pointer = self.head; + + // Get frame header sign + loop { + if size_in_pointers(pointer, self.tail, self.size) >= 2 { + if u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]) == 0xEB90 + { + self.head = pointer; + break; + } else { + pointer = (pointer + 1) % self.size; + } + } + } + + pointer = (pointer + 2) % self.size; + + // Get package length + let mut len = 10usize; + loop { + if size_in_pointers(pointer, self.tail, self.size) >= 2 { + let data_len = u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]); + len += usize::from(data_len); + break; + } + } + + // Assume that the package is valid, check the frame tail directly + // to get the package quickly. + if self.used() >= len { + let tail_sign = u16::from_be_bytes([ + self.buffer[(self.head + len - 2) % self.size], + self.buffer[(self.head + len - 1) % self.size], + ]); + if tail_sign == 0x09D7 { + return self.read(len); + } + } + + // Else, get the frame tail sign byte by byte + loop { + if size_in_pointers(pointer, self.tail, self.size) >= 2 { + if u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]) == 0xEB90 + { + self.head = pointer; + pointer = (pointer + 2) % self.size; + println!("Drop packet: get a new header before getting a tail."); + } else if u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]) == 0x09D7 + { + pointer = (pointer + 2) % self.size; + break; + } else { + pointer = (pointer + 1) % self.size; + } + } + } + len = size_in_pointers(self.head, pointer, self.size); + + self.read(len) + } +} diff --git a/src/utils/rs422_packet.rs b/src/utils/rs422_packet.rs new file mode 100644 index 0000000..d5f2674 --- /dev/null +++ b/src/utils/rs422_packet.rs @@ -0,0 +1,213 @@ +#![allow(static_mut_refs)] + +// For sending and receiving RS422 packets + +use crate::utils::rs422_buffer::RingBuffer; + +static mut BUFFER: RingBuffer = RingBuffer::new(); + +// Compute checksum +fn check_sum(data_type: u16, data: &[u8]) -> u16 { + let mut bytes: Vec = Vec::new(); + bytes.extend_from_slice(&data_type.to_be_bytes()); + bytes.extend_from_slice(data); + + bytes + .iter() + .fold(0u16, |acc, &byte| acc.wrapping_add(byte as u16)) +} + +// Get valid RS422 packet data types +fn get_data_types() -> Vec { + [0x0001, 0x1234, 0x5678].to_vec() +} + +fn get_telemetry_control_dtypes() -> Vec { + [0x0001].to_vec() +} + +fn get_update_dtypes() -> Vec { + [0x1234, 0x5678].to_vec() +} + +pub struct Frame { + head: u16, + length: u16, + data_type: u16, + data: Vec, + checksum: u16, + tail: u16, +} + +impl Frame { + pub fn new(data_type: u16, data: &[u8]) -> Frame { + Frame { + head: 0xEB90, + length: data.len() as u16, + data_type, + data: data.to_vec(), + checksum: check_sum(data_type, data), + tail: 0x09D7, + } + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + let len = bytes.len(); + if len > 10 { + let frame = Frame { + head: u16::from_be_bytes([bytes[0], bytes[1]]), + length: u16::from_be_bytes([bytes[2], bytes[3]]), + data_type: u16::from_be_bytes([bytes[4], bytes[5]]), + data: bytes[6..len - 4].to_vec(), + checksum: u16::from_be_bytes([bytes[len - 4], bytes[len - 3]]), + tail: u16::from_be_bytes([bytes[len - 2], bytes[len - 1]]), + }; + Ok(frame) + } else { + Err("Frame length too short.".to_string()) + } + } + + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + + bytes.extend_from_slice(&self.head.to_be_bytes()); + bytes.extend_from_slice(&self.length.to_be_bytes()); + bytes.extend_from_slice(&self.data_type.to_be_bytes()); + bytes.extend_from_slice(&self.data); + bytes.extend_from_slice(&self.checksum.to_be_bytes()); + bytes.extend_from_slice(&self.tail.to_be_bytes()); + + bytes + } + + pub fn check(&self) -> bool { + if (self.head == 0xEB90) + && (self.length == (self.data.len() as u16)) + && get_data_types().contains(&self.data_type) + && (self.checksum == check_sum(self.data_type, &self.data)) + && (self.tail == 0x09D7) + { + true + } else { + false + } + } +} + +fn open_port(path: &str) -> serialport::Result> { + let port = serialport::new(path, 921600) + .timeout(std::time::Duration::from_secs(1)) + .data_bits(serialport::DataBits::Eight) + .parity(serialport::Parity::Even) + .stop_bits(serialport::StopBits::One) + .open(); + + port +} + +pub fn send_packet(path: &str, packet: &Frame) { + let p = open_port(path); + + let mut port = match p { + Ok(res) => res, + Err(err) => { + println!("Failed to open port {} due to {:?}", path, err.kind()); + return; + } + }; + + let res = port.write_all(&packet.to_bytes()); + + if let Err(err) = res { + println!("Failed to send packet due to {:?}", err.kind()); + } +} + +pub fn recv_from_port(path: &str) { + let p = open_port(path); + + let mut port = match p { + Ok(res) => res, + Err(err) => { + println!("Failed to open port {} due to {:?}", path, err.kind()); + return; + } + }; + + loop { + let mut buf: Vec = vec![0; 1024]; + let res = port.read(buf.as_mut_slice()); + + match res { + Ok(size) => unsafe { + BUFFER.write(&buf[..size]); + }, + Err(err) => { + if err.kind() == std::io::ErrorKind::TimedOut { + println!("RS422 port {} received nothing after 1s", path); + } else { + println!("Failed to read RS422 port {}", path); + return; + } + } + } + } +} + +pub fn handle_packets() { + // Whether last packet is update data packet + let mut last_update_packet_dtype = 0x1234; + let mut last_update_packet_full = false; + let mut total_update_packets = 2; + + let mut update_packets = Vec::new(); + + loop { + unsafe { + let p = Frame::from_bytes(&BUFFER.read_packet()); + if let Ok(packet) = p { + if packet.check() { + if get_update_dtypes().contains(&packet.data_type) { + if last_update_packet_full { + if packet.data_type == last_update_packet_dtype { + println!("Redundant packet, drop"); + continue; + } else { + last_update_packet_dtype = packet.data_type; + last_update_packet_full = false; + update_packets.clear(); + let id = u32::from_be_bytes([ + packet.data[0], + packet.data[1], + packet.data[2], + packet.data[3], + ]); + update_packets.push((id, packet)); + } + } else { + let id = u32::from_be_bytes([ + packet.data[0], + packet.data[1], + packet.data[2], + packet.data[3], + ]); + update_packets.push((id, packet)); + } + if update_packets.len() == total_update_packets { + last_update_packet_full = true; + update_packets.sort_by_key(|p| p.0); + println!("Get full update data"); + } + } else { + println!("Get telemetry/control packet"); + } + } else { + println!("Receive invalid packet, drop"); + } + } else { + println!("Receive invalid packet, drop"); + } + } + } +} -- Gitee From 681a8d6ef845026e4286ee8d2d554c5b2269e276 Mon Sep 17 00:00:00 2001 From: wangziyue Date: Tue, 1 Apr 2025 00:58:59 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E4=BB=8Eutils=E4=B8=AD=E7=A7=BB=E9=99=A4RS?= =?UTF-8?q?422=E4=B8=B2=E5=8F=A3=E5=8A=9F=E8=83=BD,=E5=BE=85=E9=87=8D?= =?UTF-8?q?=E6=96=B0=E6=95=B4=E5=90=88=E5=85=A5servers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/utils/mod.rs | 2 - src/utils/rs422_buffer.rs | 152 --------------------------- src/utils/rs422_packet.rs | 213 -------------------------------------- 3 files changed, 367 deletions(-) delete mode 100644 src/utils/rs422_buffer.rs delete mode 100644 src/utils/rs422_packet.rs diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 909dc04..e2c0fe2 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,7 +1,5 @@ pub mod headers; mod uuid; -mod rs422_buffer; -mod rs422_packet; #[cfg(feature = "test")] pub mod test; pub mod password; diff --git a/src/utils/rs422_buffer.rs b/src/utils/rs422_buffer.rs deleted file mode 100644 index 42410b4..0000000 --- a/src/utils/rs422_buffer.rs +++ /dev/null @@ -1,152 +0,0 @@ -// 用于RS422串口接收数据的缓冲区,SPSC模型,无需加锁 - -const BUFFER_SIZE: usize = 1024; - -pub struct RingBuffer { - buffer: [u8; BUFFER_SIZE + 1], // 使用堆分配的缓冲区 - head: usize, // 头指针 - tail: usize, // 尾指针 - size: usize, // 缓冲区的大小 -} - -impl RingBuffer { - // 初始化环形缓冲区 - pub const fn new() -> Self { - RingBuffer { - buffer: [0; BUFFER_SIZE + 1], - head: 0, - tail: 0, - size: BUFFER_SIZE + 1, - } - } - - // 获取当前缓冲区占用的大小 - pub fn used(&self) -> usize { - (self.tail - self.head + self.size) % self.size - } - - // 获取当前缓冲区剩余的大小 - pub fn free(&self) -> usize { - self.size - self.used() - 1 - } - - // 向缓冲区写入数据 - pub fn write(&mut self, data: &[u8]) { - let data_len = data.len(); - loop { - if self.free() >= data_len { - break; - } - } - - if self.tail + data_len > self.size { - let right = self.size - self.tail; - let left = data_len - right; - self.buffer[self.tail..].copy_from_slice(&data[..right]); - self.buffer[..left].copy_from_slice(&data[right..]); - } else { - self.buffer[self.tail..self.tail + data_len].copy_from_slice(&data); - } - self.tail = (self.tail + data_len) % self.size; - } - - // 从缓冲区读取数据 - pub fn read(&mut self, len: usize) -> Vec { - let mut result = vec![0u8; len]; - loop { - if self.used() >= len { - break; - } - } - - if self.head + len > self.size { - let right = self.size - self.head; - let left = len - right; - result[..right].copy_from_slice(&self.buffer[self.head..]); - result[right..].copy_from_slice(&self.buffer[..left]); - } else { - result.copy_from_slice(&self.buffer[self.head..self.head + len]); - } - self.head = (self.head + len) % self.size; - - result - } - - // 读数据包:从头指针开始,扫描缓冲区,直到读到一个完整的包 - pub fn read_packet(&mut self) -> Vec { - fn size_in_pointers(head: usize, tail: usize, buf_size: usize) -> usize { - (tail - head + buf_size) % buf_size - } - let mut pointer = self.head; - - // Get frame header sign - loop { - if size_in_pointers(pointer, self.tail, self.size) >= 2 { - if u16::from_be_bytes([ - self.buffer[pointer], - self.buffer[(pointer + 1) % self.size], - ]) == 0xEB90 - { - self.head = pointer; - break; - } else { - pointer = (pointer + 1) % self.size; - } - } - } - - pointer = (pointer + 2) % self.size; - - // Get package length - let mut len = 10usize; - loop { - if size_in_pointers(pointer, self.tail, self.size) >= 2 { - let data_len = u16::from_be_bytes([ - self.buffer[pointer], - self.buffer[(pointer + 1) % self.size], - ]); - len += usize::from(data_len); - break; - } - } - - // Assume that the package is valid, check the frame tail directly - // to get the package quickly. - if self.used() >= len { - let tail_sign = u16::from_be_bytes([ - self.buffer[(self.head + len - 2) % self.size], - self.buffer[(self.head + len - 1) % self.size], - ]); - if tail_sign == 0x09D7 { - return self.read(len); - } - } - - // Else, get the frame tail sign byte by byte - loop { - if size_in_pointers(pointer, self.tail, self.size) >= 2 { - if u16::from_be_bytes([ - self.buffer[pointer], - self.buffer[(pointer + 1) % self.size], - ]) == 0xEB90 - { - self.head = pointer; - pointer = (pointer + 2) % self.size; - println!("Drop packet: get a new header before getting a tail."); - } else if u16::from_be_bytes([ - self.buffer[pointer], - self.buffer[(pointer + 1) % self.size], - ]) == 0x09D7 - { - pointer = (pointer + 2) % self.size; - break; - } else { - pointer = (pointer + 1) % self.size; - } - } - } - len = size_in_pointers(self.head, pointer, self.size); - - self.read(len) - } -} diff --git a/src/utils/rs422_packet.rs b/src/utils/rs422_packet.rs deleted file mode 100644 index d5f2674..0000000 --- a/src/utils/rs422_packet.rs +++ /dev/null @@ -1,213 +0,0 @@ -#![allow(static_mut_refs)] - -// For sending and receiving RS422 packets - -use crate::utils::rs422_buffer::RingBuffer; - -static mut BUFFER: RingBuffer = RingBuffer::new(); - -// Compute checksum -fn check_sum(data_type: u16, data: &[u8]) -> u16 { - let mut bytes: Vec = Vec::new(); - bytes.extend_from_slice(&data_type.to_be_bytes()); - bytes.extend_from_slice(data); - - bytes - .iter() - .fold(0u16, |acc, &byte| acc.wrapping_add(byte as u16)) -} - -// Get valid RS422 packet data types -fn get_data_types() -> Vec { - [0x0001, 0x1234, 0x5678].to_vec() -} - -fn get_telemetry_control_dtypes() -> Vec { - [0x0001].to_vec() -} - -fn get_update_dtypes() -> Vec { - [0x1234, 0x5678].to_vec() -} - -pub struct Frame { - head: u16, - length: u16, - data_type: u16, - data: Vec, - checksum: u16, - tail: u16, -} - -impl Frame { - pub fn new(data_type: u16, data: &[u8]) -> Frame { - Frame { - head: 0xEB90, - length: data.len() as u16, - data_type, - data: data.to_vec(), - checksum: check_sum(data_type, data), - tail: 0x09D7, - } - } - - pub fn from_bytes(bytes: &[u8]) -> Result { - let len = bytes.len(); - if len > 10 { - let frame = Frame { - head: u16::from_be_bytes([bytes[0], bytes[1]]), - length: u16::from_be_bytes([bytes[2], bytes[3]]), - data_type: u16::from_be_bytes([bytes[4], bytes[5]]), - data: bytes[6..len - 4].to_vec(), - checksum: u16::from_be_bytes([bytes[len - 4], bytes[len - 3]]), - tail: u16::from_be_bytes([bytes[len - 2], bytes[len - 1]]), - }; - Ok(frame) - } else { - Err("Frame length too short.".to_string()) - } - } - - pub fn to_bytes(&self) -> Vec { - let mut bytes = Vec::new(); - - bytes.extend_from_slice(&self.head.to_be_bytes()); - bytes.extend_from_slice(&self.length.to_be_bytes()); - bytes.extend_from_slice(&self.data_type.to_be_bytes()); - bytes.extend_from_slice(&self.data); - bytes.extend_from_slice(&self.checksum.to_be_bytes()); - bytes.extend_from_slice(&self.tail.to_be_bytes()); - - bytes - } - - pub fn check(&self) -> bool { - if (self.head == 0xEB90) - && (self.length == (self.data.len() as u16)) - && get_data_types().contains(&self.data_type) - && (self.checksum == check_sum(self.data_type, &self.data)) - && (self.tail == 0x09D7) - { - true - } else { - false - } - } -} - -fn open_port(path: &str) -> serialport::Result> { - let port = serialport::new(path, 921600) - .timeout(std::time::Duration::from_secs(1)) - .data_bits(serialport::DataBits::Eight) - .parity(serialport::Parity::Even) - .stop_bits(serialport::StopBits::One) - .open(); - - port -} - -pub fn send_packet(path: &str, packet: &Frame) { - let p = open_port(path); - - let mut port = match p { - Ok(res) => res, - Err(err) => { - println!("Failed to open port {} due to {:?}", path, err.kind()); - return; - } - }; - - let res = port.write_all(&packet.to_bytes()); - - if let Err(err) = res { - println!("Failed to send packet due to {:?}", err.kind()); - } -} - -pub fn recv_from_port(path: &str) { - let p = open_port(path); - - let mut port = match p { - Ok(res) => res, - Err(err) => { - println!("Failed to open port {} due to {:?}", path, err.kind()); - return; - } - }; - - loop { - let mut buf: Vec = vec![0; 1024]; - let res = port.read(buf.as_mut_slice()); - - match res { - Ok(size) => unsafe { - BUFFER.write(&buf[..size]); - }, - Err(err) => { - if err.kind() == std::io::ErrorKind::TimedOut { - println!("RS422 port {} received nothing after 1s", path); - } else { - println!("Failed to read RS422 port {}", path); - return; - } - } - } - } -} - -pub fn handle_packets() { - // Whether last packet is update data packet - let mut last_update_packet_dtype = 0x1234; - let mut last_update_packet_full = false; - let mut total_update_packets = 2; - - let mut update_packets = Vec::new(); - - loop { - unsafe { - let p = Frame::from_bytes(&BUFFER.read_packet()); - if let Ok(packet) = p { - if packet.check() { - if get_update_dtypes().contains(&packet.data_type) { - if last_update_packet_full { - if packet.data_type == last_update_packet_dtype { - println!("Redundant packet, drop"); - continue; - } else { - last_update_packet_dtype = packet.data_type; - last_update_packet_full = false; - update_packets.clear(); - let id = u32::from_be_bytes([ - packet.data[0], - packet.data[1], - packet.data[2], - packet.data[3], - ]); - update_packets.push((id, packet)); - } - } else { - let id = u32::from_be_bytes([ - packet.data[0], - packet.data[1], - packet.data[2], - packet.data[3], - ]); - update_packets.push((id, packet)); - } - if update_packets.len() == total_update_packets { - last_update_packet_full = true; - update_packets.sort_by_key(|p| p.0); - println!("Get full update data"); - } - } else { - println!("Get telemetry/control packet"); - } - } else { - println!("Receive invalid packet, drop"); - } - } else { - println!("Receive invalid packet, drop"); - } - } - } -} -- Gitee From a7e96e4b3a26f4a06db6c752c5a62d9d27ae8fe8 Mon Sep 17 00:00:00 2001 From: wangziyue Date: Wed, 2 Apr 2025 01:17:56 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E5=AE=8C=E6=88=90RS422=E4=B8=B2=E5=8F=A3?= =?UTF-8?q?=E6=8E=A5=E6=94=B6=E9=87=8D=E6=9E=84=E6=95=B0=E6=8D=AE=E5=B9=B6?= =?UTF-8?q?=E5=AD=98=E5=85=A5=E7=A1=AC=E7=9B=98=E7=9A=84=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/cores/servers/actix_web/mod.rs | 7 + src/cores/servers/actix_web/rs422/buffer.rs | 157 +++++++++++ src/cores/servers/actix_web/rs422/frame.rs | 74 ++++++ .../servers/actix_web/rs422/handle_packet.rs | 244 ++++++++++++++++++ src/cores/servers/actix_web/rs422/mod.rs | 5 + src/cores/servers/actix_web/rs422/port.rs | 30 +++ src/cores/servers/actix_web/rs422/server.rs | 29 +++ 7 files changed, 546 insertions(+) create mode 100755 src/cores/servers/actix_web/rs422/buffer.rs create mode 100644 src/cores/servers/actix_web/rs422/frame.rs create mode 100755 src/cores/servers/actix_web/rs422/handle_packet.rs create mode 100644 src/cores/servers/actix_web/rs422/mod.rs create mode 100644 src/cores/servers/actix_web/rs422/port.rs create mode 100644 src/cores/servers/actix_web/rs422/server.rs diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 811b016..2369283 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -4,10 +4,13 @@ mod udp; mod quic; mod utils; +mod rs422; + use crate::cores::router::RouterKey; use crate::cores::servers::actix_web::quic::QuicImpl; use crate::cores::servers::actix_web::tcp::TCPImpl; use crate::cores::servers::actix_web::udp::UDPImpl; +use crate::cores::servers::actix_web::rs422::server::RS422Impl; use crate::cores::servers::Server; use crate::cores::state::AppState; use crate::utils::headers; @@ -43,6 +46,10 @@ pub fn quic(addr: &str) -> Box { Box::new(QuicImpl::new(addr)) } +pub fn rs422(path: &str) -> Box { + Box::new(RS422Impl::new(path)) +} + #[macro_export] macro_rules! init_app { ($app_state:expr) => {{ diff --git a/src/cores/servers/actix_web/rs422/buffer.rs b/src/cores/servers/actix_web/rs422/buffer.rs new file mode 100755 index 0000000..bb096a2 --- /dev/null +++ b/src/cores/servers/actix_web/rs422/buffer.rs @@ -0,0 +1,157 @@ +// 用于RS422串口接收数据的缓冲区,SPSC模型,无需加锁 + +pub struct RingBuffer { + buffer: Vec, // 缓冲区 + head: usize, // 头指针 + tail: usize, // 尾指针 + size: usize, // 缓冲区的大小 +} + +impl RingBuffer { + // 初始化环形缓冲区 + pub fn new(size: usize) -> Self { + RingBuffer { + buffer: vec![0u8; size + 1], + head: 0, + tail: 0, + size: size + 1, + } + } + + // 获取当前缓冲区占用的大小 + pub fn used(&self) -> usize { + (self.tail + self.size - self.head) % self.size + } + + // 获取当前缓冲区剩余的大小 + pub fn free(&self) -> usize { + self.size - self.used() - 1 + } + + // 向缓冲区写入数据 + pub fn write(&mut self, data: &[u8]) { + let data_len = data.len(); + loop { + if self.free() >= data_len { + break; + } + } + + if self.tail + data_len > self.size { + let right = self.size - self.tail; + let left = data_len - right; + self.buffer[self.tail..].copy_from_slice(&data[..right]); + self.buffer[..left].copy_from_slice(&data[right..]); + } else { + self.buffer[self.tail..self.tail + data_len].copy_from_slice(&data); + } + self.tail = (self.tail + data_len) % self.size; + } + + // 从缓冲区读取数据 + pub fn read(&mut self, len: usize) -> Vec { + let mut result = vec![0u8; len]; + loop { + if self.used() >= len { + break; + } + } + + if self.head + len > self.size { + let right = self.size - self.head; + let left = len - right; + result[..right].copy_from_slice(&self.buffer[self.head..]); + result[right..].copy_from_slice(&self.buffer[..left]); + } else { + result.copy_from_slice(&self.buffer[self.head..self.head + len]); + } + self.head = (self.head + len) % self.size; + + result + } + + // 读数据包:从头指针开始,扫描缓冲区,直到读到一个完整的包 + pub fn read_packet(&mut self) -> Vec { + fn size_in_pointers(head: usize, tail: usize, buf_size: usize) -> usize { + (tail + buf_size - head) % buf_size + } + let mut pointer = self.head; + + loop { + // Get frame header sign + loop { + if size_in_pointers(pointer, self.tail, self.size) >= 2 { + if u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]) == 0xEB90 + { + self.head = pointer; + break; + } else { + pointer = (pointer + 1) % self.size; + } + } + } + + pointer = (pointer + 2) % self.size; + + // Get package length + let mut len = 10usize; + loop { + if size_in_pointers(pointer, self.tail, self.size) >= 2 { + let data_len = u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]); + len += usize::from(data_len); + break; + } + } + + // 检查收到的数据长度,若长度不够数据帧长度,等待10毫秒 + if self.used() < len { + std::thread::sleep(std::time::Duration::from_millis(10)); + } + + // Assume that the package is valid, check the frame tail directly + // to get the package quickly and avoid the case that there is frame + // tail in data field. + if self.used() >= len { + let tail_sign = u16::from_be_bytes([ + self.buffer[(self.head + len - 2) % self.size], + self.buffer[(self.head + len - 1) % self.size], + ]); + if tail_sign == 0x09D7 { + return self.read(len); + } + } + + // Else, get the frame tail sign byte by byte + loop { + if size_in_pointers(pointer, self.tail, self.size) >= 2 { + if u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]) == 0xEB90 + { + // 又读到帧头,退出循环从头开始 + println!("Drop packet: get a new header before getting a tail."); + break; + } else if u16::from_be_bytes([ + self.buffer[pointer], + self.buffer[(pointer + 1) % self.size], + ]) == 0x09D7 + { + pointer = (pointer + 2) % self.size; + len = size_in_pointers(self.head, pointer, self.size); + + return self.read(len); + } else { + pointer = (pointer + 1) % self.size; + } + } + } + } + } +} diff --git a/src/cores/servers/actix_web/rs422/frame.rs b/src/cores/servers/actix_web/rs422/frame.rs new file mode 100644 index 0000000..f377291 --- /dev/null +++ b/src/cores/servers/actix_web/rs422/frame.rs @@ -0,0 +1,74 @@ +pub struct Frame { + pub head: u16, + pub length: u16, + pub data_type: u16, + pub data: Vec, + pub checksum: u16, + pub tail: u16, +} + +impl Frame { + pub fn new(data_type: u16, data: &[u8]) -> Self { + Frame { + head: 0xEB90, + length: data.len() as u16, + data_type, + data: data.to_vec(), + checksum: check_sum(data_type, data), + tail: 0x09D7, + } + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + let len = bytes.len(); + if len > 10 { + let frame = Frame { + head: u16::from_be_bytes([bytes[0], bytes[1]]), + length: u16::from_be_bytes([bytes[2], bytes[3]]), + data_type: u16::from_be_bytes([bytes[4], bytes[5]]), + data: bytes[6..len - 4].to_vec(), + checksum: u16::from_be_bytes([bytes[len - 4], bytes[len - 3]]), + tail: u16::from_be_bytes([bytes[len - 2], bytes[len - 1]]), + }; + Ok(frame) + } else { + Err("Frame length too short.".to_string()) + } + } + + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + + bytes.extend_from_slice(&self.head.to_be_bytes()); + bytes.extend_from_slice(&self.length.to_be_bytes()); + bytes.extend_from_slice(&self.data_type.to_be_bytes()); + bytes.extend_from_slice(&self.data); + bytes.extend_from_slice(&self.checksum.to_be_bytes()); + bytes.extend_from_slice(&self.tail.to_be_bytes()); + + bytes + } + + pub fn check(&self) -> bool { + if (self.head == 0xEB90) + && (self.length == (self.data.len() as u16)) + && (self.checksum == check_sum(self.data_type, &self.data)) + && (self.tail == 0x09D7) + { + true + } else { + false + } + } +} + +// Compute checksum +fn check_sum(data_type: u16, data: &[u8]) -> u16 { + let mut bytes: Vec = Vec::new(); + bytes.extend_from_slice(&data_type.to_be_bytes()); + bytes.extend_from_slice(data); + + bytes + .iter() + .fold(0u16, |acc, &byte| acc.wrapping_add(byte as u16)) +} diff --git a/src/cores/servers/actix_web/rs422/handle_packet.rs b/src/cores/servers/actix_web/rs422/handle_packet.rs new file mode 100755 index 0000000..441167d --- /dev/null +++ b/src/cores/servers/actix_web/rs422/handle_packet.rs @@ -0,0 +1,244 @@ +#![allow(static_mut_refs)] + +// For sending and receiving RS422 packets + +use std::collections::HashMap; + +use super::{buffer::RingBuffer, frame::Frame, port::open_port}; + +// 接收串口数据缓冲区 +static mut REFACTOR_BUFFERS: Vec = Vec::new(); + +// 数据帧最小长度 +const REFACTOR_FRAME_LEAST_LENGTH: usize = 915; +// 单个分片中重构数据长度 +const REFACTOR_DATA_LENGTH: usize = 896; +// 不允许覆写的重构数据基础版本文件名 +const BASE_REFACTOR_DATA_FILE: &str = "0x11-BaseSoftware-v1-11-11.bin"; + +// 重构数据信息 +struct DataInfo { + file_name: String, // 文件名 + total_frame: usize, // 帧总数 + frame_idx: usize, // 帧序号 +} + +impl DataInfo { + fn from_bytes(data: &[u8]) -> Self { + let len = data.len(); + let name_end = len - 8; + DataInfo { + file_name: String::from_utf8_lossy(&data[..name_end]).to_string(), + total_frame: u32::from_be_bytes([ + data[name_end], + data[name_end + 1], + data[name_end + 2], + data[name_end + 3], + ]) as usize, + frame_idx: u32::from_be_bytes([ + data[name_end + 4], + data[name_end + 5], + data[name_end + 6], + data[name_end + 7], + ]) as usize, + } + } + + // 检查帧序号是否越界 + fn check_frame_idx(&self) -> bool { + self.frame_idx < self.total_frame + } +} + +// 重构数据拼接缓冲区 +struct RefactorData { + file_name: String, // 重构数据文件名 + total_frame: usize, // 数据帧总数 + received_frames: Vec, // 已收到的数据帧,下标为帧序号,true为已收到 + next_cons_idx: usize, // 已收到的连续数据帧的最大帧序号的下一个未收到的帧序号 + data: Vec, // 数据缓冲区,按帧序号放入 +} + +impl RefactorData { + fn new(file_name: &str, total_frame: usize) -> Self { + RefactorData { + file_name: file_name.to_string(), + total_frame, + received_frames: vec![false; total_frame + 1], + next_cons_idx: 0, + data: vec![0u8; REFACTOR_DATA_LENGTH * total_frame], + } + } + + // 重构数据是否接收完整 + fn complete(&self) -> bool { + self.total_frame == self.next_cons_idx + } + + // 插入重构数据 + fn insert(&mut self, frame_idx: usize, refactor_data: &[u8]) { + self.data[REFACTOR_DATA_LENGTH * frame_idx..REFACTOR_DATA_LENGTH * (frame_idx + 1)] + .copy_from_slice(refactor_data); + if self.received_frames[frame_idx] { + println!( + "Receive redundant refactor packet, file name: {}, frame index: {}, update data using it", + self.file_name, frame_idx + ); + } else { + self.received_frames[frame_idx] = true; + if self.next_cons_idx == frame_idx { + for i in frame_idx..self.total_frame + 1 { + if !self.received_frames[i] { + self.next_cons_idx = i; + break; + } + } + } + println!( + "Receive refactor packet, file name: {}, frame index: {}, next consecutive frame index to receive: {}", + self.file_name, frame_idx, self.next_cons_idx + ); + } + } +} + +// Get valid RS422 refactor packet data types +fn valid_refactor_data_types() -> Vec { + [0x1234, 0x5678].to_vec() +} + +fn recv_bytes_from_port(path: &str, buf_idx: usize) { + let p = open_port(path); + + let mut port = match p { + Ok(res) => res, + Err(err) => { + println!("Failed to open port {} due to {:?}", path, err.to_string()); + return; + } + }; + + loop { + let mut buf: Vec = vec![0; 1024]; + let res = port.read(buf.as_mut_slice()); + + match res { + Ok(size) => unsafe { + REFACTOR_BUFFERS[buf_idx].write(&buf[..size]); + }, + Err(err) => { + if err.kind() != std::io::ErrorKind::TimedOut { + println!( + "Failed to read refactor data from RS422 port {} due to {}, retry after 30s", + path, + err.to_string() + ); + std::thread::sleep(std::time::Duration::from_secs(30)); + } + } + } + } +} + +fn handle_packets(buf_idx: usize) { + let mut refactor_data_map: HashMap = HashMap::new(); + + loop { + unsafe { + let p = REFACTOR_BUFFERS[buf_idx].read_packet(); + if p.len() >= REFACTOR_FRAME_LEAST_LENGTH { + let packet = Frame::from_bytes(&p).unwrap(); + if packet.check() { + if valid_refactor_data_types().contains(&packet.data_type) { + let data_info_len = (packet.length as usize) - REFACTOR_DATA_LENGTH; + let data_info = DataInfo::from_bytes(&packet.data[..data_info_len]); + if data_info.check_frame_idx() { + if let Some(mut refactor_data) = + refactor_data_map.remove(&data_info.file_name) + { + refactor_data + .insert(data_info.frame_idx, &packet.data[data_info_len..]); + if refactor_data.complete() { + println!( + "Receive refactor data completely, file name: {}", + data_info.file_name + ); + std::thread::spawn(move || { + save_refactor_data( + &refactor_data.file_name, + &refactor_data.data, + ); + }); + } else { + refactor_data_map.insert(data_info.file_name, refactor_data); + } + } else { + let mut refactor_data = + RefactorData::new(&data_info.file_name, data_info.total_frame); + refactor_data + .insert(data_info.frame_idx, &packet.data[data_info_len..]); + if refactor_data.complete() { + println!( + "Receive refactor data completely, file name: {}", + data_info.file_name + ); + std::thread::spawn(move || { + save_refactor_data( + &refactor_data.file_name, + &refactor_data.data, + ); + }); + } else { + refactor_data_map.insert(data_info.file_name, refactor_data); + } + } + } else { + println!("Drop refactor packet: frame index out of bound"); + } + } else { + println!("Drop refactor packet: invalid data type"); + } + } else { + println!("Drop refactor packet: invalid frame received"); + } + } else { + println!("Drop refactor packet: invalid frame received length"); + } + } + } +} + +fn save_refactor_data(file_name: &str, data: &[u8]) { + if file_name != BASE_REFACTOR_DATA_FILE { + loop { + let result = std::fs::write("/gxccgkk/rjcg/rjb/ypt/".to_string() + file_name, data); + match result { + Ok(_) => { + println!("Save refactor data file {}", file_name); + return; + } + Err(err) => { + println!( + "Failed to save refactor data file {} due to {:?}, retry after 30s", + file_name, + err.to_string() + ); + std::thread::sleep(std::time::Duration::from_secs(30)); + } + } + } + } else { + println!("Failed to save refactor data in attempting to overwrite base file"); + } +} + +pub fn run_service(path: &str, buf_size: usize) { + unsafe { + let buf_idx = REFACTOR_BUFFERS.len(); + REFACTOR_BUFFERS.push(RingBuffer::new(buf_size)); + std::thread::spawn(move || { + handle_packets(buf_idx); + }); + recv_bytes_from_port(path, buf_idx); + } +} diff --git a/src/cores/servers/actix_web/rs422/mod.rs b/src/cores/servers/actix_web/rs422/mod.rs new file mode 100644 index 0000000..5789e45 --- /dev/null +++ b/src/cores/servers/actix_web/rs422/mod.rs @@ -0,0 +1,5 @@ +mod buffer; +mod frame; +mod port; +mod handle_packet; +pub mod server; \ No newline at end of file diff --git a/src/cores/servers/actix_web/rs422/port.rs b/src/cores/servers/actix_web/rs422/port.rs new file mode 100644 index 0000000..6a3cc68 --- /dev/null +++ b/src/cores/servers/actix_web/rs422/port.rs @@ -0,0 +1,30 @@ +use super::frame::Frame; + +pub fn open_port(path: &str) -> serialport::Result> { + let port = serialport::new(path, 921600) + .timeout(std::time::Duration::from_secs(1)) + .data_bits(serialport::DataBits::Eight) + .parity(serialport::Parity::Even) + .stop_bits(serialport::StopBits::One) + .open(); + + port +} + +pub fn send_packet(path: &str, packet: &Frame) { + let p = open_port(path); + + let mut port = match p { + Ok(res) => res, + Err(err) => { + println!("Failed to open port {} due to {:?}", path, err.to_string()); + return; + } + }; + + let res = port.write_all(&packet.to_bytes()); + + if let Err(err) = res { + println!("Failed to send packet due to {:?}", err.to_string()); + } +} diff --git a/src/cores/servers/actix_web/rs422/server.rs b/src/cores/servers/actix_web/rs422/server.rs new file mode 100644 index 0000000..63cefa3 --- /dev/null +++ b/src/cores/servers/actix_web/rs422/server.rs @@ -0,0 +1,29 @@ +use log::*; + +use crate::cores::servers::Server; +use crate::cores::state::AppState; + +use super::handle_packet::run_service; + +pub struct RS422Impl { + pub path: String, +} + +impl RS422Impl { + pub fn new(path: &str) -> Self { + RS422Impl { + path: path.to_string(), + } + } +} + +#[async_trait] +impl Server for RS422Impl { + async fn start(&self, app_state: AppState) { + info!("Starting RS422 mock Actix web server at {}", self.path); + let path = self.path.clone(); + actix_web::rt::task::spawn_blocking(move || { + run_service(&path, 10485760); + }); + } +} -- Gitee From 3aa8430ae8ce217ba26a563903f16ade0c84ecc1 Mon Sep 17 00:00:00 2001 From: wangziyue Date: Sun, 13 Apr 2025 15:49:50 +0800 Subject: [PATCH 4/5] =?UTF-8?q?RS422=E6=9C=8D=E5=8A=A1=E9=9A=8Fapiserver?= =?UTF-8?q?=E5=90=AF=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 12 +++ examples/basic_example.rs | 2 + src/cores/servers/actix_web/rs422/frame.rs | 1 + .../servers/actix_web/rs422/handle_packet.rs | 4 +- src/cores/servers/actix_web/rs422/port.rs | 5 +- src/cores/servers/actix_web/rs422/server.rs | 3 + src/utils/test.rs | 97 ++++++++++++++----- 7 files changed, 96 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index f4d9b93..a9a6ada 100644 --- a/README.md +++ b/README.md @@ -48,5 +48,17 @@ python3 client.py -v --host=127.0.0.1 --port=4433 /test \ --data '{"key1": "value1", "key2": "value2"}' ``` +### Mock HTTP over RS422 +安装socat用于模拟串口 + +```sh +sudo dnf install socat +``` + +使用socat创建发送和接收串口,波特率921600bps、8位数据位、1位奇校验位、1位停止位 + +```sh +sudo socat PTY,link=/dev/RS422_send,raw,echo=0,b921600,cs8,parenb=1,cstopb=0 PTY,link=/dev/RS422_recv,raw,echo=0,b921600,cs8,parenb=1,cstopb=0 +``` diff --git a/examples/basic_example.rs b/examples/basic_example.rs index 59702dd..3e76016 100644 --- a/examples/basic_example.rs +++ b/examples/basic_example.rs @@ -7,6 +7,7 @@ use std::ffi::CString; const TCP_ADDRESS: &str = "0.0.0.0:38080"; const UDP_ADDRESS: &str = "0.0.0.0:38081"; const QUIC_ADDRESS: &str = "0.0.0.0:38082"; +const RS422_ADDRESS: &str = "/dev/RS422_recv"; #[actix_web::main] async fn main() { @@ -18,6 +19,7 @@ async fn main() { .actix_web_udp_address(UDP_ADDRESS.to_string()) .actix_web_tcp_address(TCP_ADDRESS.to_string()) .actix_web_quic_address(QUIC_ADDRESS.to_string()) + .actix_web_rs422_address(RS422_ADDRESS.to_string()) .build()).await; // waiting for ctrl-c signal log::info!("APIServer started, cluster_id: {}", app_state.cluster_id); diff --git a/src/cores/servers/actix_web/rs422/frame.rs b/src/cores/servers/actix_web/rs422/frame.rs index f377291..21a4d72 100644 --- a/src/cores/servers/actix_web/rs422/frame.rs +++ b/src/cores/servers/actix_web/rs422/frame.rs @@ -7,6 +7,7 @@ pub struct Frame { pub tail: u16, } +#[allow(unused)] impl Frame { pub fn new(data_type: u16, data: &[u8]) -> Self { Frame { diff --git a/src/cores/servers/actix_web/rs422/handle_packet.rs b/src/cores/servers/actix_web/rs422/handle_packet.rs index 441167d..8e669fa 100755 --- a/src/cores/servers/actix_web/rs422/handle_packet.rs +++ b/src/cores/servers/actix_web/rs422/handle_packet.rs @@ -113,7 +113,7 @@ fn recv_bytes_from_port(path: &str, buf_idx: usize) { let mut port = match p { Ok(res) => res, Err(err) => { - println!("Failed to open port {} due to {:?}", path, err.to_string()); + println!("Failed to open port {} due to {}", path, err.to_string()); return; } }; @@ -219,7 +219,7 @@ fn save_refactor_data(file_name: &str, data: &[u8]) { } Err(err) => { println!( - "Failed to save refactor data file {} due to {:?}, retry after 30s", + "Failed to save refactor data file {} due to {}, retry after 30s", file_name, err.to_string() ); diff --git a/src/cores/servers/actix_web/rs422/port.rs b/src/cores/servers/actix_web/rs422/port.rs index 6a3cc68..5f59a46 100644 --- a/src/cores/servers/actix_web/rs422/port.rs +++ b/src/cores/servers/actix_web/rs422/port.rs @@ -11,13 +11,14 @@ pub fn open_port(path: &str) -> serialport::Result res, Err(err) => { - println!("Failed to open port {} due to {:?}", path, err.to_string()); + println!("Failed to open port {} due to {}", path, err.to_string()); return; } }; @@ -25,6 +26,6 @@ pub fn send_packet(path: &str, packet: &Frame) { let res = port.write_all(&packet.to_bytes()); if let Err(err) = res { - println!("Failed to send packet due to {:?}", err.to_string()); + println!("Failed to send packet due to {}", err.to_string()); } } diff --git a/src/cores/servers/actix_web/rs422/server.rs b/src/cores/servers/actix_web/rs422/server.rs index 63cefa3..0f8a56d 100644 --- a/src/cores/servers/actix_web/rs422/server.rs +++ b/src/cores/servers/actix_web/rs422/server.rs @@ -1,5 +1,7 @@ use log::*; +use async_trait::async_trait; + use crate::cores::servers::Server; use crate::cores::state::AppState; @@ -17,6 +19,7 @@ impl RS422Impl { } } +#[allow(unused)] #[async_trait] impl Server for RS422Impl { async fn start(&self, app_state: AppState) { diff --git a/src/utils/test.rs b/src/utils/test.rs index 6767a2c..9393a31 100644 --- a/src/utils/test.rs +++ b/src/utils/test.rs @@ -1,13 +1,13 @@ -use std::ffi::CString; -use std::os::raw::c_void; -use std::sync::Arc; -use bon::Builder; -use feventbus::impls::messaging::messaging::Messaging; -use feventbus::traits::controller::EventBus; use crate::cores::handlers::datamgr::datamgr_api; use crate::cores::servers::{self, MessagingServer, Server}; use crate::cores::state::AppState; use crate::prepare_app_state; +use bon::Builder; +use feventbus::impls::messaging::messaging::Messaging; +use feventbus::traits::controller::EventBus; +use std::ffi::CString; +use std::os::raw::c_void; +use std::sync::Arc; #[allow(unused)] pub async fn setup_message_cli() -> Messaging { @@ -17,7 +17,8 @@ pub async fn setup_message_cli() -> Messaging { let plugin_to_load_key = CString::new("core.pluginsToLoad").unwrap(); let plugin_to_load_value = CString::new("Messaging Storage Portal").unwrap(); - let leader_mq_plugin_manager = unsafe { init_mq_plugins(&plugin_to_load_key, &plugin_to_load_value) }; + let leader_mq_plugin_manager = + unsafe { init_mq_plugins(&plugin_to_load_key, &plugin_to_load_value) }; let leader_address = CString::new("127.0.0.1").unwrap(); unsafe { @@ -44,7 +45,10 @@ pub async fn setup_message_cli() -> Messaging { messaging_client } -unsafe fn init_mq_plugins(plugin_to_load_key: &CString, plugin_to_load_value: &CString) -> *mut c_void { +unsafe fn init_mq_plugins( + plugin_to_load_key: &CString, + plugin_to_load_value: &CString, +) -> *mut c_void { let master_plugin_manager = datamgr_api::NewPluginManager(); datamgr_api::SetParameter( master_plugin_manager, @@ -56,11 +60,11 @@ unsafe fn init_mq_plugins(plugin_to_load_key: &CString, plugin_to_load_value: &C master_plugin_manager } - - #[allow(unused)] pub fn tear_down_message_cli(messaging_client: Arc) { - let plugin_manager = messaging_client.get_plugin_manager().expect("Plugin manager is not set"); + let plugin_manager = messaging_client + .get_plugin_manager() + .expect("Plugin manager is not set"); unsafe { datamgr_api::StopUdp(plugin_manager); datamgr_api::UnloadPlugins(plugin_manager); @@ -69,7 +73,13 @@ pub fn tear_down_message_cli(messaging_client: Arc) { } #[allow(unused)] -pub async fn start_test_api_server(app_state: AppState, tcp_address: String, udp_address: Option, quic_address: Option) -> AppState { +pub async fn start_test_api_server( + app_state: AppState, + tcp_address: String, + udp_address: Option, + quic_address: Option, + rs422_address: Option, +) -> AppState { // 启动watch相关事件监听协程 let app_state_watch = app_state.clone(); tokio::spawn(async move { @@ -86,23 +96,41 @@ pub async fn start_test_api_server(app_state: AppState, tcp_address: String, udp }); let actix_web_tcp_server_app_state = app_state.clone(); tokio::spawn(async move { - let _ = actix_web_tcp_server.start(actix_web_tcp_server_app_state).await; + let _ = actix_web_tcp_server + .start(actix_web_tcp_server_app_state) + .await; }); if let Some(udp_address) = udp_address { let actix_web_udp_server: Box = servers::actix_web::udp(udp_address.as_str()); let actix_web_udp_server_app_state = app_state.clone(); tokio::spawn(async move { - let _ = actix_web_udp_server.start(actix_web_udp_server_app_state).await; - }); + let _ = actix_web_udp_server + .start(actix_web_udp_server_app_state) + .await; + }); } if let Some(quic_address) = quic_address { - let actix_web_quic_server: Box = servers::actix_web::quic(quic_address.as_str()); + let actix_web_quic_server: Box = + servers::actix_web::quic(quic_address.as_str()); let actix_web_quic_server_app_state = app_state.clone(); tokio::spawn(async move { - let _ = actix_web_quic_server.start(actix_web_quic_server_app_state).await; + let _ = actix_web_quic_server + .start(actix_web_quic_server_app_state) + .await; + }); + } + + if let Some(rs422_address) = rs422_address { + let actix_web_rs422_server: Box = + servers::actix_web::rs422(rs422_address.as_str()); + let actix_web_rs422_server_app_state = app_state.clone(); + tokio::spawn(async move { + let _ = actix_web_rs422_server + .start(actix_web_rs422_server_app_state) + .await; }); } - + app_state } @@ -115,10 +143,15 @@ pub async fn setup_test_env(address: Option<&str>) -> (Arc, Arc, pub actix_web_udp_address: Option, pub actix_web_quic_address: Option, + pub actix_web_rs422_address: Option, } #[allow(unused)] pub async fn setup_full_test_env(params: TestServerStartParams) -> (Arc, Arc) { - let tcp_address = params.actix_web_tcp_address.unwrap_or("127.0.0.1:38080".to_string()); + let tcp_address = params + .actix_web_tcp_address + .unwrap_or("127.0.0.1:38080".to_string()); let udp_address = params.actix_web_udp_address; let quic_address = params.actix_web_quic_address; + let rs422_address = params.actix_web_rs422_address; // 删除数据库文件 std::fs::remove_file(DATABASE_PATH).unwrap_or_default(); let msg_cli = Arc::new(setup_message_cli().await); - let app_state = prepare_app_state(format!("{}{}", DATABASE_URL_PREFIX, DATABASE_PATH).as_str(), msg_cli.clone()).await.expect("prepare app state failed"); + let app_state = prepare_app_state( + format!("{}{}", DATABASE_URL_PREFIX, DATABASE_PATH).as_str(), + msg_cli.clone(), + ) + .await + .expect("prepare app state failed"); let app_state_moved = app_state.clone(); tokio::spawn(async move { - start_test_api_server(app_state_moved, tcp_address, udp_address, quic_address).await; + start_test_api_server( + app_state_moved, + tcp_address, + udp_address, + quic_address, + rs422_address, + ) + .await; }); (msg_cli, Arc::new(app_state)) -} \ No newline at end of file +} -- Gitee From 6588ec4d6674c50bd53fb41098cdeaa137c3509f Mon Sep 17 00:00:00 2001 From: wangziyue Date: Sun, 13 Apr 2025 21:08:54 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E5=AE=8C=E6=88=90RS422=E9=80=9A=E8=BF=87da?= =?UTF-8?q?tamgr=E4=B8=8A=E4=BC=A0=E9=87=8D=E6=9E=84=E6=95=B0=E6=8D=AE,?= =?UTF-8?q?=E7=BB=99=E5=87=BA=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 2 +- README.md | 18 ++++++- examples/send_rs422_packet.rs | 18 +++++++ src/cores/servers/actix_web/mod.rs | 2 +- src/cores/servers/actix_web/rs422/frame.rs | 1 - .../servers/actix_web/rs422/handle_packet.rs | 48 +++++++++++++++++-- src/cores/servers/actix_web/rs422/mod.rs | 6 +-- src/cores/servers/actix_web/rs422/port.rs | 1 - 8 files changed, 84 insertions(+), 12 deletions(-) create mode 100644 examples/send_rs422_packet.rs diff --git a/Cargo.toml b/Cargo.toml index 8866019..20d7848 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,4 +67,4 @@ bcrypt = "0.14" jsonwebtoken = "9.2" rand = "0.8" futures-util = "0.3.31" -serialport = "4.6.1" +serialport = "=4.6.1" diff --git a/README.md b/README.md index a9a6ada..0c2faa4 100644 --- a/README.md +++ b/README.md @@ -57,8 +57,22 @@ python3 client.py -v --host=127.0.0.1 --port=4433 /test \ sudo dnf install socat ``` -使用socat创建发送和接收串口,波特率921600bps、8位数据位、1位奇校验位、1位停止位 +使用socat创建发送和接收串口,波特率921600bps、8位数据位、1位奇校验位、1位停止位,之后启动apiserver ```sh -sudo socat PTY,link=/dev/RS422_send,raw,echo=0,b921600,cs8,parenb=1,cstopb=0 PTY,link=/dev/RS422_recv,raw,echo=0,b921600,cs8,parenb=1,cstopb=0 +sudo socat PTY,link=/dev/RS422_send,raw,echo=0,b921600,cs8,parenb=1,cstopb=0,perm=0666 PTY,link=/dev/RS422_recv,raw,echo=0,b921600,cs8,parenb=1,cstopb=0,perm=0666 +``` + +生成约10MB随机数据,按896B对齐 + +```sh +dd if=/dev/urandom of=random_file.bin bs=896 count=12000 +``` + +运行`examples/send_rs422_packet.rs`,服务接收到数据通过接口存入`storage.replica.location/test.binapp云平台`,并在本地存为`test.bin`用于验证 + +检查数据接收是否正确 + +```sh +diff random_file.bin test.bin ``` diff --git a/examples/send_rs422_packet.rs b/examples/send_rs422_packet.rs new file mode 100644 index 0000000..b58117f --- /dev/null +++ b/examples/send_rs422_packet.rs @@ -0,0 +1,18 @@ +use std::fs; + +use fleet_apiserver::cores::servers::actix_web::rs422::{frame::Frame, handle_packet::DataInfo, port::send_packet}; + +fn main() { + let upload_file = fs::read("./random_file.bin").unwrap(); + let file_len = upload_file.len(); + let num_packets = file_len / 896; + for i in 0..num_packets { + let data = &upload_file[i * 896..(i + 1) * 896]; + let data_info = DataInfo::new("test.bin", num_packets, i).to_bytes(); + let mut bytes = Vec::new(); + bytes.extend_from_slice(&data_info); + bytes.extend_from_slice(data); + let packet = Frame::new(0x1234, &bytes); + send_packet("/dev/RS422_send", &packet); + } +} \ No newline at end of file diff --git a/src/cores/servers/actix_web/mod.rs b/src/cores/servers/actix_web/mod.rs index 2369283..21480b0 100644 --- a/src/cores/servers/actix_web/mod.rs +++ b/src/cores/servers/actix_web/mod.rs @@ -4,7 +4,7 @@ mod udp; mod quic; mod utils; -mod rs422; +pub mod rs422; use crate::cores::router::RouterKey; use crate::cores::servers::actix_web::quic::QuicImpl; diff --git a/src/cores/servers/actix_web/rs422/frame.rs b/src/cores/servers/actix_web/rs422/frame.rs index 21a4d72..f377291 100644 --- a/src/cores/servers/actix_web/rs422/frame.rs +++ b/src/cores/servers/actix_web/rs422/frame.rs @@ -7,7 +7,6 @@ pub struct Frame { pub tail: u16, } -#[allow(unused)] impl Frame { pub fn new(data_type: u16, data: &[u8]) -> Self { Frame { diff --git a/src/cores/servers/actix_web/rs422/handle_packet.rs b/src/cores/servers/actix_web/rs422/handle_packet.rs index 8e669fa..534cea7 100755 --- a/src/cores/servers/actix_web/rs422/handle_packet.rs +++ b/src/cores/servers/actix_web/rs422/handle_packet.rs @@ -2,7 +2,9 @@ // For sending and receiving RS422 packets -use std::collections::HashMap; +use std::{collections::HashMap, ffi::CString}; + +use crate::cores::handlers::datamgr::{datamgr_api, DATA_PLUGIN_MANAGER}; use super::{buffer::RingBuffer, frame::Frame, port::open_port}; @@ -17,13 +19,21 @@ const REFACTOR_DATA_LENGTH: usize = 896; const BASE_REFACTOR_DATA_FILE: &str = "0x11-BaseSoftware-v1-11-11.bin"; // 重构数据信息 -struct DataInfo { +pub struct DataInfo { file_name: String, // 文件名 total_frame: usize, // 帧总数 frame_idx: usize, // 帧序号 } impl DataInfo { + pub fn new(file_name: &str, total_frame: usize, frame_idx: usize) -> Self { + DataInfo { + file_name: file_name.to_string(), + total_frame, + frame_idx, + } + } + fn from_bytes(data: &[u8]) -> Self { let len = data.len(); let name_end = len - 8; @@ -48,6 +58,15 @@ impl DataInfo { fn check_frame_idx(&self) -> bool { self.frame_idx < self.total_frame } + + pub fn to_bytes(&self) -> Vec { + let mut bytes = Vec::new(); + bytes.extend_from_slice(self.file_name.as_bytes()); + bytes.extend_from_slice(&(self.total_frame as u32).to_be_bytes()); + bytes.extend_from_slice(&(self.frame_idx as u32).to_be_bytes()); + + bytes + } } // 重构数据拼接缓冲区 @@ -210,8 +229,9 @@ fn handle_packets(buf_idx: usize) { fn save_refactor_data(file_name: &str, data: &[u8]) { if file_name != BASE_REFACTOR_DATA_FILE { + upload_data(file_name, data); loop { - let result = std::fs::write("/gxccgkk/rjcg/rjb/ypt/".to_string() + file_name, data); + let result = std::fs::write("./".to_string() + file_name, data); match result { Ok(_) => { println!("Save refactor data file {}", file_name); @@ -232,6 +252,28 @@ fn save_refactor_data(file_name: &str, data: &[u8]) { } } +fn upload_data(file_name: &str, data: &[u8]) { + unsafe { + let data_type = CString::new("app").unwrap(); + let data_name = CString::new(file_name).unwrap(); + let to = CString::new("云平台").unwrap(); + let ret = datamgr_api::UploadData( + DATA_PLUGIN_MANAGER, + data_type.as_ptr(), + data_name.as_ptr(), + to.as_ptr(), + data.as_ptr() as *const i8, + data.len() as i32, + ); + if ret != 1 { + println!( + "Failed to upload refactor data file {}, retry after 30s", + file_name + ); + } + } +} + pub fn run_service(path: &str, buf_size: usize) { unsafe { let buf_idx = REFACTOR_BUFFERS.len(); diff --git a/src/cores/servers/actix_web/rs422/mod.rs b/src/cores/servers/actix_web/rs422/mod.rs index 5789e45..ef70fa5 100644 --- a/src/cores/servers/actix_web/rs422/mod.rs +++ b/src/cores/servers/actix_web/rs422/mod.rs @@ -1,5 +1,5 @@ mod buffer; -mod frame; -mod port; -mod handle_packet; +pub mod frame; +pub mod port; +pub mod handle_packet; pub mod server; \ No newline at end of file diff --git a/src/cores/servers/actix_web/rs422/port.rs b/src/cores/servers/actix_web/rs422/port.rs index 5f59a46..12aee4c 100644 --- a/src/cores/servers/actix_web/rs422/port.rs +++ b/src/cores/servers/actix_web/rs422/port.rs @@ -11,7 +11,6 @@ pub fn open_port(path: &str) -> serialport::Result