From 21eb3b3a7d3db3218fcae850ad18129f1c17413c Mon Sep 17 00:00:00 2001 From: hans <837713748@qq.com> Date: Mon, 1 Jan 2024 16:48:49 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85socketd?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 10 +- src/lib.rs | 82 ++++++++++ src/socketd/mod.rs | 39 ++++- src/socketd/transport/core/channel.rs | 78 ++++++++++ src/socketd/transport/core/codec.rs | 37 +++++ src/socketd/transport/core/config.rs | 212 ++++++++++++++++++++++++++ src/socketd/transport/core/error.rs | 39 +++++ src/socketd/transport/core/mod.rs | 13 +- 8 files changed, 506 insertions(+), 4 deletions(-) create mode 100644 src/socketd/transport/core/channel.rs create mode 100644 src/socketd/transport/core/codec.rs create mode 100644 src/socketd/transport/core/config.rs create mode 100644 src/socketd/transport/core/error.rs diff --git a/Cargo.toml b/Cargo.toml index d4252ac..8a34d21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,4 +10,12 @@ base64 = "0.21.5" jni = "0.21.1" serde_json = "1.0.108" serde = { version = "1.0", features = ["derive"] } -libloading = "0.8.1" \ No newline at end of file +libloading = "0.8.1" +# 异步trait支持 +async-trait = "0.1.75" +# 0拷贝bytes库 +bytes = { version = "1", features = ["serde"] } +# 异步运行时 +tokio = { version = "1.35.1", features = ["full"] } +# 日志标准库 +log = "0.4.20" diff --git a/src/lib.rs b/src/lib.rs index 3363dcd..757b251 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,8 @@ pub fn add(left: usize, right: usize) -> usize { #[cfg(test)] mod tests { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpStream; use super::*; #[test] @@ -15,4 +17,84 @@ mod tests { let result = add(2, 2); assert_eq!(result, 4); } + + #[test] + fn test () { + let url="http://127.0.0.1"; + let a =url.find("://").unwrap(); + println!("{a}"); + let schema=&url[..a]; + println!("{schema}"); + } + + #[test] + fn test_tcp() { + let mut stream = TcpStream::connect("127.0.0.1:8602").unwrap(); + println!("123"); + let mut input = "abc".to_string(); + //定义一个String类型的输入 + // io::stdin().read_line(&mut input).expect("Failed to read!"); + //从标准输入读入一行,读入input里面,如果有问题的话,提示“读取失败” + stream.write(input.as_bytes()).expect("Failed to write!"); + //把input读取的内容,转换成bytes后,写到stream流里面去,如果写入失败,提示“写入失败” + + } + #[tokio::test] + async fn test_tcpc() { + let mut stream = tokio::net::TcpStream::connect("127.0.0.1:8602").await.unwrap(); + let mut sid="00000000000"; + let mut u32_bytes:u32 =32/8; + let mut evetb="tcp://127.0.0.1:8602/?u=a&p=2"; + let mut meta="SocketD=1.0"; + let mut data=""; + println!("data.length:{0}",data.as_bytes().len()); + println!("\n.length:{0}","\n".as_bytes().len()); + let mut frame_size =u32_bytes+u32_bytes+ + sid.as_bytes().len() as u32+ + evetb.as_bytes().len() as u32+ + meta.as_bytes().len() as u32+ + data.as_bytes().len() as u32 +("\n".as_bytes().len() * 3)as u32; + let mut buf:Vec=Vec::new(); + for i in frame_size.to_be_bytes() { + buf.push(i); + } + for i in 10_u32.to_be_bytes() { + buf.push(i); + } + for i in sid.as_bytes() { + buf.push(*i); + } + for x in "\n".as_bytes() { + buf.push(*x); + } + for x in evetb.as_bytes() { + buf.push(*x); + } + for x in "\n".as_bytes() { + buf.push(*x); + } + for x in meta.as_bytes() { + buf.push(*x); + } + for x in "\n".as_bytes() { + buf.push(*x); + } + for x in data.as_bytes() { + buf.push(*x); + } + println!("分割线-------------------------"); + println!("{:?}",&buf); + println!("{:?}",&buf.len()); + let s=String::from_utf8(buf.clone()).unwrap(); + println!("{:?}",s); + // io::stdout().write(&buf[0..buf.len()]).unwrap(); + println!("分割线-------------------------"); + stream.write(&buf).await.expect("TODO: panic message"); + loop { + let mut buf:Vec = Vec::new(); + let b=stream.read(buf.as_mut()).await; + let a= String::from_utf8(buf.clone()); + println!("{:?}",a); + } + } } diff --git a/src/socketd/mod.rs b/src/socketd/mod.rs index 703733b..4e6949d 100644 --- a/src/socketd/mod.rs +++ b/src/socketd/mod.rs @@ -1 +1,38 @@ -pub mod transport; \ No newline at end of file +use crate::socketd::transport::core::config::{ClientConfig, Config}; + +pub mod transport; + + + + +pub enum ClientType {} + +pub struct SocketD; + +impl SocketD { + /** + * 创建客户端(支持 url 自动识别) + * 一些结构不一致需要修改一下 + * @param serverUrl 服务器地址 + */ + // pub fn create_client(server_url: &str) ->Box { + // let config=ClientConfig::new_server_url(server_url); + // let schema=config.get_schema(); + // let client_model:ClientModel=schema.into(); + // let mut client=client_model.to_client(); + // client.set_config(config); + // Box::new(client) + // } +} + +#[cfg(test)] +mod tests { + use crate::socketd::SocketD; + const SERVER_URL: &str = "sd:tcp://127.0.0.1:8623"; + + #[tokio::test] + async fn test_create_client() { + let mut client =SocketD::create_client(SERVER_URL); + let session=client.open().await; + } +} \ No newline at end of file diff --git a/src/socketd/transport/core/channel.rs b/src/socketd/transport/core/channel.rs new file mode 100644 index 0000000..46ed6f5 --- /dev/null +++ b/src/socketd/transport/core/channel.rs @@ -0,0 +1,78 @@ +use async_trait::async_trait; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use tokio::io::AsyncReadExt; +use tokio::net::TcpStream; +use crate::socketd::transport::core::codec::Coder; +use crate::socketd::transport::core::config::Config; +use crate::socketd::transport::core::error::SocketDError; +use crate::socketd::transport::core::Frame; + +pub trait Channel {} + + +pub struct ChannelDefault {} + +/** + * 通道助理 + * + * @return 通道 + */ +#[async_trait] +pub trait ChannelAssistant { + + /** + * 读取 + * + * @param target 目标 + * @param frame 帧 + */ + async fn read_socket(&self, target: &T) -> Result; +} + + +pub struct DefaultChannelAssistant { + config: Box, +} + +impl DefaultChannelAssistant { + fn init(config: &impl Config)->Self{ + DefaultChannelAssistant { + config: Box::new(config), + } + } +} + +#[async_trait] +impl ChannelAssistant for DefaultChannelAssistant { + + + async fn read_socket(&self, target: &mut TcpStream) -> Result { + let _ = target.readable().await; + let mut buf = BytesMut::with_capacity(4); + let _ = match target.try_read(&mut buf) { + Ok(0) => Err("read 0 bytes"), + Ok(n) => Some(n), + Err(err) => err + }?; + let len = buf.get_u32(); + if len == 0 { + Err("read 0 bytes") + } + let mut byte_buff = BytesMut::with_capacity(len as usize); + byte_buff.put_u32(len); + let buff_size = self.config.get_read_buffer_size() as usize; + let mut read_buff = BytesMut::with_capacity(buff_size); + loop { + if read_buff.capacity().eq(&0) { + read_buff.clear(); + } + if target.try_read(read_buff.into())? > 0 { + byte_buff.put(&buf); + } else { + break; + } + } + read_buff.clear(); + Coder::read(byte_buff) + } +} \ No newline at end of file diff --git a/src/socketd/transport/core/codec.rs b/src/socketd/transport/core/codec.rs new file mode 100644 index 0000000..00b2516 --- /dev/null +++ b/src/socketd/transport/core/codec.rs @@ -0,0 +1,37 @@ +use std::ops::Add; +use bytes::BytesMut; +use crate::socketd::transport::core::error::SocketDError; +use crate::socketd::transport::core::Frame; + +pub struct Coder{ + +} + +impl Coder{ + /// 编码器 + /// encoder + /// + /// 用于将Tcp流转化为Frame + /// Used to convert Tcp stream to Frame + /// + /// # Examples + /// ``` + /// use bytes::BytesMut; + /// let read = Coder::read(BytesMut::new()); + /// assert!(read.is_err()); + /// ``` + pub fn read(mut source:BytesMut) -> Result{ + let frame_size =source.first().unwrap_or_else(0); + if frame_size > (source.len().add(*4)){ + Err(SocketDError::Channel("Illegal frame stack")) + } + let flag=source.get(2); + if frame_size==8{ + //len[int] + flag[int] + // Fream结构与我的不一致需要重新改一下 + // Ok(Frame::new(flag.into(),MessageDefault::default())) + }else{ + + } + } +} \ No newline at end of file diff --git a/src/socketd/transport/core/config.rs b/src/socketd/transport/core/config.rs new file mode 100644 index 0000000..1ca5a77 --- /dev/null +++ b/src/socketd/transport/core/config.rs @@ -0,0 +1,212 @@ +use crate::socketd::transport::core::MAX_SIZE_DATA; + +pub trait Config { + /** + * 是否客户端模式 + */ + fn client_mode(&self) -> bool; + + /** + * 获取流管理器 + */ + fn get_stream_manger(&self) -> bool; + + /** + * 获取角色名 + */ + fn get_role_name(&self) -> String; + + /** + * 获取字符集 + */ + fn get_charset(&self) -> &str; + + /** + * 获取模式 + */ + fn get_schema(&self) -> &str; + + /** + * 获取地址 + */ + fn get_address(&self) -> &str; + + /** + * 获取写缓冲大小 + */ + fn get_pwrite_buffer_size(&self) -> u32; + + /** + * 获取读缓冲大小 + */ + fn get_read_buffer_size(&self) -> u32; + /** + * 获取连接超时时间 + */ + fn get_connect_timeout(&self) -> u64; +} + +/** + * Id 生成器 + */ +pub trait IdGenerator { + /** + * 生成 + */ + fn generate() -> String; +} + +#[derive(Debug, Clone)] +pub struct ClientConfig { + /// 是否客户端模式 + client_mode: bool, + /// 流管理器 + stream_manger: bool, + // /// 编解码器 + // codec:bool, + // /// id生成器 + // id_generator: dyn IdGenerator, + // //分片处理 + // fragment_handler:String, + // //分片大小 + fragment_size: u16, + // //ssl 上下文 + // ssl_context:String, + // + // //字符集 + // charset:String, + // + // //内核线程数 + // core_threads:u32, + // //最大线程数 + // max_threads:u32, + // + /// 读缓冲大小 + read_buffer_size: u32, + /// 写缓冲大小 + pwrite_buffer_size: u32, + // + // //连接空闲超时 + idle_timeout: u32, + // //请求默认超时 + request_timeout: u32, + // //消息流超时(从发起到应答结束) + // stream_timeout:u64, + // //最大udp包大小 + // max_udp_size:u32, + // //通讯架构(tcp, ws, udp) + // schema:String, + // //连接地址 + // link_url:String, + // url:String, + // uri:String, + // port:u16, + // + /// 心跳间隔(毫秒) + heartbeat_interval: u64, + /// 连接越时(毫秒) + connect_timeout: u64, + /// 是否自动重链 + auto_reconnect: bool, + /** 通讯架构 */ + schema: String, + /** 连接地址 */ + link_url: String, + /** 连接地址 */ + url: String, + /** 协议地址 */ + address: String, + +} + +impl ClientConfig { + pub fn new_server_url(server_url: &str) -> Self { + // 校验合法性 + let sd_idx = server_url.find("sd").expect("No socketd client providers were found."); + let idx = match server_url.to_string().find("://") { + None => { panic!("The serverUrl invalid: {0}", server_url) } + Some(idx) => { idx } + }; + if idx < 2 { + panic!("The serverUrl invalid: {0}", server_url); + } + ClientConfig { + client_mode: true, + stream_manger: false, + fragment_size: MAX_SIZE_DATA, + read_buffer_size: 512, + pwrite_buffer_size: 512, + idle_timeout: 60_000,//60秒(心跳默认为20秒) + request_timeout: 10_000,//10秒(默认与连接超时同) + heartbeat_interval: 20_000, + connect_timeout: 10_000, + auto_reconnect: false, + schema: server_url[sd_idx + 3..idx].to_string(), + link_url: server_url.to_string(), + url: server_url[sd_idx + 3..].to_string(), + address: server_url[idx + 3..].to_string(), + } + } +} + +impl Default for ClientConfig { + fn default() -> Self { + ClientConfig { + client_mode: true, + stream_manger: false, + fragment_size: 0, + read_buffer_size: 0, + pwrite_buffer_size: 0, + idle_timeout: 0, + request_timeout: 0, + heartbeat_interval: 0, + connect_timeout: 0, + auto_reconnect: false, + schema: "".to_string(), + link_url: "".to_string(), + url: "".to_string(), + address: "".to_string(), + } + } +} + + +impl Config for ClientConfig { + fn client_mode(&self) -> bool { + self.client_mode + } + + fn get_stream_manger(&self) -> bool { + self.stream_manger + } + + fn get_role_name(&self) -> String { + match self.client_mode() { + true => { String::from("Client") } + false => { String::from("Server") } + } + } + + fn get_charset(&self) -> &str { + "demo" + } + + fn get_schema(&self) -> &str { + &self.schema + } + + fn get_address(&self) -> &str { + &self.address + } + fn get_pwrite_buffer_size(&self) -> u32 { + self.pwrite_buffer_size + } + + fn get_read_buffer_size(&self) -> u32 { + self.read_buffer_size + } + + fn get_connect_timeout(&self) -> u64 { + self.connect_timeout + } +} diff --git a/src/socketd/transport/core/error.rs b/src/socketd/transport/core/error.rs new file mode 100644 index 0000000..8e42d3f --- /dev/null +++ b/src/socketd/transport/core/error.rs @@ -0,0 +1,39 @@ +use std::borrow::Cow; +use std::error::Error; +use std::fmt; +use std::io::ErrorKind; + + +/** + * socketD Error 用于处理SocketD中的所有异常 + */ +pub enum SocketDError{ + Connect(&'static str), + Channel(&'static str), + IO(&'static str), +} + +impl SocketDError { + pub fn as_str(&self) -> &str { + match self { + SocketDError::Connect(s) => format!("connect error:{0}", s), + SocketDError::Channel(s) => format!("channel error:{0}", s), + SocketDError::IO(s) => format!("Io error:{0}", s), + }.as_str() + } + +} + +impl fmt::Display for SocketDError { + /// Shows a human-readable description of the `ErrorKind`. + /// + /// This is similar to `impl Display for Error`, but doesn't require first converting to Error. + /// + /// # Examples + /// ``` + /// assert_eq!("connect error:", SocketDError::Connect); + /// ``` + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.write_str(self.as_str()) + } +} diff --git a/src/socketd/transport/core/mod.rs b/src/socketd/transport/core/mod.rs index 496cd3a..124392c 100644 --- a/src/socketd/transport/core/mod.rs +++ b/src/socketd/transport/core/mod.rs @@ -1,2 +1,11 @@ -mod domain; -pub use domain::*; \ No newline at end of file +pub mod domain; +pub mod codec; +pub mod error; +pub mod channel; +pub mod config; + +pub use domain::*; + +const MAX_SIZE_DATA:u16=1024 * 1024 * 16; + +const MAX_SIZE_META_STRING: u32 =4069; -- Gitee