diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..2ebc5ea07bf1313e647d81885a3d71adbbfce4bd --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..d779d34229ae06a70b507bad63a74ac0144fba03 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,51 @@ +[package] +name = "async-io" +version = "0.1.0" +edition = "2021" +[lib] +crate-type = ["rlib"] +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["alloc", "single_thread"] +alloc = ["async-executor/alloc", "async-task/alloc"] +std = ["stdlib/std", "async-executor/std", "async-task/std"] + +single_thread = ["async-executor/signle_thread", "async-task/signle_thread"] +multiple_thread = [ + "async-executor/multiple_thread", + "async-task/multiple_thread", +] +[target.'cfg(unix)'.dependencies] +libc = { version = "0.2.126", default-features = false } + +[dependencies] +stdlib = { path = "../stdlib", default-features = false } +async-executor = { path = "../async-executor", default-features = false } +async-task = { path = "../async-task", default-features = false } +waker-fn = "1.1.0" + +[dev-dependencies] +cfg-if = "1.0.0" +hyper = { version = "0.14.19", default-features = false, features = [ + "client", + "http1", + "server", + "stream", +] } +tokio = "1.19.2" +[profile.release] +opt-level = 2 +debug = false +rpath = false +lto = true +debug-assertions = false +codegen-units = 256 +panic = 'abort' +incremental = false +overflow-checks = false + +[profile.dev] +panic = 'abort' + +[build] +target = "thumbv7em-none-eabihf" \ No newline at end of file diff --git a/README.en.md b/README.en.md index ca57d540f98fff4bb5bf68cf8aa7806ced2e2b6a..31f328aa333ec3bfd036666952e9253cb2c918c7 100644 --- a/README.en.md +++ b/README.en.md @@ -1,36 +1,5 @@ # async-io #### Description -异步io +Asynchronous io is internally implemented using the asynchronous model `EPOLL/IOCP` provided by the operating system. -#### Software Architecture -Software architecture description - -#### Installation - -1. xxxx -2. xxxx -3. xxxx - -#### Instructions - -1. xxxx -2. xxxx -3. xxxx - -#### Contribution - -1. Fork the repository -2. Create Feat_xxx branch -3. Commit your code -4. Create Pull Request - - -#### Gitee Feature - -1. You can use Readme\_XXX.md to support different languages, such as Readme\_en.md, Readme\_zh.md -2. Gitee blog [blog.gitee.com](https://blog.gitee.com) -3. Explore open source project [https://gitee.com/explore](https://gitee.com/explore) -4. The most valuable open source project [GVP](https://gitee.com/gvp) -5. The manual of Gitee [https://gitee.com/help](https://gitee.com/help) -6. The most popular members [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/README.md b/README.md index 7b7cf25c85f13c38a91df7848efc20fa6c6229e3..65bef98529fff826d6ce686b0407e332234e23da 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,5 @@ # async-io #### 介绍 -异步io +异步io,内部使用操作系统提供的异步模型`EPOLL/IOCP`进行实现。 -#### 软件架构 -软件架构说明 - - -#### 安装教程 - -1. xxxx -2. xxxx -3. xxxx - -#### 使用说明 - -1. xxxx -2. xxxx -3. xxxx - -#### 参与贡献 - -1. Fork 本仓库 -2. 新建 Feat_xxx 分支 -3. 提交代码 -4. 新建 Pull Request - - -#### 特技 - -1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md -2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com) -3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目 -4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目 -5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) -6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/examples/hyper-server.rs b/examples/hyper-server.rs new file mode 100644 index 0000000000000000000000000000000000000000..1224d245bbfce92952d1fb35e61401041d29fc19 --- /dev/null +++ b/examples/hyper-server.rs @@ -0,0 +1,130 @@ +fn main() {} +// #![cfg_attr( +// feature = "alloc", +// no_std, +// no_main, +// feature(default_alloc_error_handler) +// )] +// #![feature(read_buf)] +// extern crate cfg_if; +// use async_io::{block_on, spawn, Shutdown, Socket}; +// use core::future::Future; +// use core::pin::Pin; +// use core::result::Result; +// use core::str::from_utf8_unchecked; +// use core::task::{self, Context, Poll}; +// use hyper::server::{accept::Accept, Server}; +// use hyper::service::{make_service_fn, service_fn}; +// use stdlib::anyhow::AnyHow; +// use stdlib::anyhow::Context as ctx; + +// use hyper::{Body, Request, Response, StatusCode}; +// use std::io::{self, Read}; +// struct AsyncListener(Socket); +// use async_io::pin; +// impl Accept for AsyncListener { +// type Conn = Stream; +// type Error = stdlib::io::Error; +// fn poll_accept( +// self: core::pin::Pin<&mut Self>, +// cx: &mut task::Context<'_>, +// ) -> core::task::Poll>> { +// let fu = self.0.accept(); +// pin!(fu); +// let rtn = core::future::Future::poll(fu, cx); +// rtn.map(|op| Some(op.map(|op| Stream(op)))) +// } +// } +// #[derive(Clone)] +// struct Executor; +// impl hyper::rt::Executor for Executor { +// fn execute(&self, fut: F) { +// spawn(async { drop(fut.await) }).detach(); +// } +// } +// struct Stream(Socket); +// impl tokio::io::AsyncRead for Stream { +// fn poll_read( +// self: core::pin::Pin<&mut Self>, +// cx: &mut task::Context<'_>, +// buf: &mut tokio::io::ReadBuf<'_>, +// ) -> task::Poll> { +// let f = self.0.recv(buf.initialize_unfilled()); +// pin!(f); +// Future::poll(f, cx).map(|f| { +// f.map(|size| { +// buf.set_filled(size); +// }) +// .map_err(|e| e.into_raw()) +// }) +// } +// } +// fn main() { +// let rtn: Result<(), AnyHow> = block_on(async { +// let socket = Socket::bind([0, 0, 0, 0], 8080)?; +// Server::builder(AsyncListener(socket)) +// .executor(Executor) +// .serve(make_service_fn(move |_| async { +// Ok::<_, stdlib::io::Error>(service_fn(move |req| serve(req))) +// })) +// .await?; +// Ok(()) +// }); +// if let Err(e) = rtn { +// println!("{}", e); +// } +// } +// fn get_file_path<'a>(path: &'a str) -> &'a str { +// let a = path.split("?").by_ref().next().unwrap(); +// a +// } +// use std::fs::File; +// use std::io::Write; +// async fn serve(req: Request) -> io::Result> { +// let mut buf = [0u8; 256]; +// let uri = req.uri().path(); +// let uri = if uri == "/" { "/index.html" } else { uri }; +// let len = { +// let len = buf.len(); +// let mut p = &mut buf[..]; +// unsafe { write!(p, "/home/shuaihu/www{}", uri).unwrap_unchecked() }; +// len - p.len() +// }; +// let path: &str = unsafe { from_utf8_unchecked(&buf[..len]) }; +// let path = get_file_path(path); +// if let Ok(mut file) = File::open(path) { +// let mut buf = Vec::new(); +// file.read_to_end(&mut buf).unwrap(); +// Ok(Response::new(Body::from(buf))) +// } else { +// println!("{} file not found!", path); +// let res = unsafe { +// Response::builder() +// .status(StatusCode::NOT_FOUND) +// .body(Body::empty()) +// .unwrap_unchecked() +// }; +// Ok(res) +// } +// } +// #[allow(unused_variables, unused_mut)] +// impl tokio::io::AsyncWrite for Stream { +// fn poll_write( +// mut self: Pin<&mut Self>, +// cx: &mut Context<'_>, +// buf: &[u8], +// ) -> Poll> { +// let fu = self.0.send(buf); +// pin!(fu); +// Future::poll(fu, cx).map(|f| f.map_err(|op| op.into_raw())) +// } + +// fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +// Poll::Ready(Ok(())) +// } + +// fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +// unsafe { self.0.shutdown(Shutdown::Write).unwrap_unchecked() }; +// Poll::Ready(Ok(())) +// } +// } diff --git a/examples/tcpip.rs b/examples/tcpip.rs new file mode 100644 index 0000000000000000000000000000000000000000..64885468781b03b9797567023dee34cd92b3d8e2 --- /dev/null +++ b/examples/tcpip.rs @@ -0,0 +1,54 @@ +#![cfg_attr( + feature = "alloc", + no_std, + no_main, + feature(default_alloc_error_handler) +)] +#[macro_use] +extern crate cfg_if; +use async_io::{block_on, spawn, Socket}; +use stdlib::io::Result; + +cfg_if! { + if #[cfg(feature="alloc")]{ + #[no_mangle] + extern "C" fn main() -> i32 { + block_on(async { + let sock = Socket::bind([127, 0, 0, 1], 3356).unwrap(); + loop { + let abc = sock.accept().await.unwrap(); + spawn(service(abc)).detach(); + } + }); + 0 + } + + #[cfg(not(test))] + #[panic_handler] + pub fn painc_handler(_info: &core::panic::PanicInfo<'_>) -> ! { + unsafe { libc::abort() } + } + }else{ + fn main() { + block_on(async { + let sock = Socket::bind([127, 0, 0, 1], 3356).unwrap(); + loop { + let abc = sock.accept().await.unwrap(); + spawn(service(abc)).detach(); + } + }); + } + } +} + +async fn service(sock: Socket) -> Result<()> { + let mut buf = [0; 1024]; + loop { + let a = sock.recv(&mut buf).await?; + if a == 0 { + break; + }; + sock.send(&buf[0..a]).await?; + } + Ok(()) +} diff --git a/src/drive_s.rs b/src/drive_s.rs new file mode 100644 index 0000000000000000000000000000000000000000..c006262d632e8ef49c509d7130bfd4273e8f6036 --- /dev/null +++ b/src/drive_s.rs @@ -0,0 +1,33 @@ +use core::lazy::Lazy; +use core::{ + future::Future, + task::{Context, Poll}, +}; + +use async_executor::LocalExecutor; +use async_task::Task; +use waker_fn::waker_fn; + +use crate::reactor::Reactor; +pub fn get_executor() -> &'static LocalExecutor { + static mut GLOBAL: Lazy = Lazy::new(|| LocalExecutor::new()); + unsafe { &GLOBAL } +} +pub fn block_on(future: impl Future) -> T { + let exec = get_executor(); + let main = get_executor().spawn(future); + let waker = waker_fn(|| {}); + let cx = &mut Context::from_waker(&waker); + pin!(main); + loop { + exec.run(); + if let Poll::Ready(t) = main.as_mut().poll(cx) { + return t; + } + Reactor::get().react().ok(); + } +} +pub fn spawn(future: impl Future) -> Task { + let exec = get_executor(); + exec.spawn(future) +} diff --git a/src/driver.rs b/src/driver.rs new file mode 100644 index 0000000000000000000000000000000000000000..ddcb23d0f96f374906688400a4399856fe25783b --- /dev/null +++ b/src/driver.rs @@ -0,0 +1,16 @@ +use crate::reactor::Reactor; +use core::future::Future; +use core::pin::Pin; +use core::task::Context; + +pub fn block_on(mut future: impl Future) -> () { + let wk = waker_fn::waker_fn(move || {}); + let cx = &mut Context::from_waker(&wk); + loop { + let res = Future::poll(unsafe { Pin::new_unchecked(&mut future) }, cx); + if res.is_ready() { + break; + } + let _res = Reactor::get().react(); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..57dbd1f77a97e7295fd0081a01f7889dfa508883 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,27 @@ +#![cfg_attr(feature = "alloc", no_std, feature(prelude_import))] +#![feature(once_cell)] +#[allow(unused_imports)] +#[macro_use] +extern crate alloc; +#[macro_export] +macro_rules! pin { + ($($x:ident),* $(,)?) => { + $( + let mut $x = $x; + #[allow(unused_mut)] + let mut $x = unsafe { + core::pin::Pin::new_unchecked(&mut $x) + }; + )* + } +} +#[cfg(feature = "single_thread")] +#[path = "drive_s.rs"] +mod drive_s; +mod net; + +#[allow(dead_code)] +mod reactor; + +pub use drive_s::{block_on, spawn}; +pub use net::*; diff --git a/src/net/mod.rs b/src/net/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..6f3cd7a3b9f0d50e8f742f23c0323e34f34c18c0 --- /dev/null +++ b/src/net/mod.rs @@ -0,0 +1,4 @@ +#[allow(dead_code)] +#[allow(dead_code)] +mod sys; +pub use sys::*; diff --git a/src/net/sys/mod.rs b/src/net/sys/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..d1cce2d59fd153921eeff2a172d4702104f3cdbf --- /dev/null +++ b/src/net/sys/mod.rs @@ -0,0 +1,3 @@ +mod unix; + +pub use unix::*; diff --git a/src/net/sys/unix/epoll.rs b/src/net/sys/unix/epoll.rs new file mode 100644 index 0000000000000000000000000000000000000000..8672a33bfdc945d6204614bc25bcab000179180f --- /dev/null +++ b/src/net/sys/unix/epoll.rs @@ -0,0 +1,89 @@ +use libc::c_int; + +use super::{SocketInner, READ, WRITE}; + +use super::RawFd; +use core::lazy::Lazy; +use core::mem::zeroed; +use core::ops::Deref; +use core::task::Waker; +use stdlib::collections::Obj; +use stdlib::io::{self, Result}; +pub(crate) struct Poller { + ep: RawFd, +} +const READ_INTEREST: u32 = (libc::EPOLLIN | libc::EPOLLRDHUP) as u32; +const WRITE_INTEREST: u32 = (libc::EPOLLOUT) as u32; +impl Poller { + pub fn get() -> &'static Poller { + static mut POLLER: Lazy = Lazy::new(|| Poller::new().expect("Epoll create error.")); + unsafe { &POLLER } + } + pub(crate) fn new() -> Result { + let ep = syscall!(epoll_create1(libc::EPOLL_CLOEXEC))?; + Ok(Self { ep }) + } + pub(super) fn add(&self, socket: &Obj) -> Result { + self.ctl(libc::EPOLL_CTL_ADD, socket.fd, Some(socket)) + } + pub(super) fn del(&self, socket: &Obj) -> Result { + self.ctl(libc::EPOLL_CTL_DEL, socket.fd, None) + } + pub(super) fn modify(&self, socket: &Obj) -> Result { + self.ctl(libc::EPOLL_CTL_MOD, socket.fd, Some(socket)) + } + fn ctl(&self, op: c_int, fd: RawFd, socket: Option<&Obj>) -> Result { + let mut ev = socket.map(|sock| { + let mut flags = libc::EPOLLONESHOT as u32; + { + let lk = sock.wakers.lock(); + if lk[READ].is_some() { + flags |= READ_INTEREST; + } + if lk[WRITE].is_some() { + flags |= WRITE_INTEREST; + } + } + libc::epoll_event { + events: flags, + u64: Deref::deref(sock) as *const _ as u64, + } + }); + syscall!(epoll_ctl( + self.ep, + op, + fd, + ev.as_mut() + .map(|ev| { ev as *mut _ }) + .unwrap_or(core::ptr::null_mut()) + )) + } + + pub fn wait(&self, wakers: &mut [Option]) -> Result { + let mut events: [libc::epoll_event; 512] = unsafe { zeroed() }; + let len = syscall!(epoll_wait( + self.ep, + events.as_mut_ptr(), + events.len() as i32, + -1 + ))?; + + if wakers.len() < 1024 { + return Err(io::Error::from_raw_os_error(libc::EINVAL)); + } + let mut i = 0usize; + for ev in &events[0..len as usize] { + let inner = &unsafe { &*(ev.u64 as *const SocketInner) }; + let mut lk = inner.wakers.lock(); + if ev.events & READ_INTEREST > 0 && lk[READ].is_some() { + wakers[i].replace(unsafe { lk[READ].take().unwrap_unchecked() }); + i += 1; + } + if ev.events & WRITE_INTEREST > 0 && lk[WRITE].is_some() { + wakers[i].replace(unsafe { lk[WRITE].take().unwrap_unchecked() }); + i += 1; + } + } + Ok(i) + } +} diff --git a/src/net/sys/unix/mod.rs b/src/net/sys/unix/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..eb06be0bc5ac6f3c06dddb94c14bc5186ce44fbf --- /dev/null +++ b/src/net/sys/unix/mod.rs @@ -0,0 +1,17 @@ +macro_rules! syscall { + ($fn:ident $args:tt) => {{ + let res = unsafe { libc::$fn $args }; + if res == -1 { + Err(stdlib::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; +} +const READ: usize = 0; +const WRITE: usize = 1; +pub(crate) type RawFd = libc::c_int; +mod epoll; +mod socket; +pub(crate) use epoll::*; +pub use socket::*; diff --git a/src/net/sys/unix/socket.rs b/src/net/sys/unix/socket.rs new file mode 100644 index 0000000000000000000000000000000000000000..3e8c202d5c8b009f362ba1ffc3aecd5d6d584c7f --- /dev/null +++ b/src/net/sys/unix/socket.rs @@ -0,0 +1,220 @@ +use core::{ + future::Future, + mem::{size_of, zeroed, MaybeUninit}, + pin::Pin, + task::{Context, Poll, Waker}, +}; +use stdlib::{ + collections::{Obj, ObjPool}, + io::Result, + sync::Spinlock, +}; + +use libc::c_int; + +use super::{Poller, RawFd, READ, WRITE}; +pub(super) struct SocketInner { + pub(crate) fd: RawFd, + pub addr: [u8; 4], + pub port: u16, + pub(super) wakers: Spinlock<[Option; 2]>, +} +#[repr(transparent)] +pub struct Socket { + pub(super) inner: Obj<'static, SocketInner>, +} +pub enum Shutdown { + Read, + Write, + Both, +} +impl Socket { + pub const ADDR_SIZE: u32 = size_of::() as u32; + fn get_pool() -> &'static ObjPool { + static P: ObjPool = ObjPool::new(); + &P + } + fn new(fam: c_int, ty: c_int, addr: [u8; 4], port: u16) -> Result { + let fd = syscall!(socket( + fam, + ty | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, + 0 + ))?; + Self::from_raw(fd, addr, port) + } + pub fn from_raw(fd: RawFd, addr: [u8; 4], port: u16) -> Result { + let poller = Poller::get(); + syscall!(setsockopt( + fd, + libc::SOL_SOCKET, + libc::SO_REUSEADDR, + &1 as *const _ as *const _, + core::mem::size_of::() as libc::socklen_t, + ))?; + let rtn = Self { + inner: Self::get_pool().get(SocketInner { + fd, + addr, + port, + wakers: Spinlock::new([None, None]), + }), + }; + + poller.add(&rtn.inner)?; + Ok(rtn) + } + pub fn bind(addr: [u8; 4], port: u16) -> Result { + let socket = Self::new(libc::AF_INET, libc::SOCK_STREAM, addr, port)?; + let sock_addr = libc::sockaddr_in { + sin_addr: libc::in_addr { + s_addr: u32::from_be_bytes(addr).to_be(), + }, + sin_port: port.to_be(), + sin_family: libc::AF_INET as u16, + sin_zero: unsafe { zeroed() }, + }; + syscall!(bind( + socket.inner.fd, + &sock_addr as *const _ as *const libc::sockaddr, + Self::ADDR_SIZE, + ))?; + syscall!(listen(socket.inner.fd, 128))?; + Ok(socket) + } + pub fn shutdown(&self, how: Shutdown) -> Result<()> { + let how = match how { + Shutdown::Read => libc::SHUT_RD, + Shutdown::Write => libc::SHUT_WR, + Shutdown::Both => libc::SHUT_RDWR, + }; + syscall!(shutdown(self.inner.fd, how)).map(|_| ()) + } + pub fn accept(&self) -> AcceptFuture { + AcceptFuture { inner: &self.inner } + } + pub fn recv<'a>(&'a self, buf: &'a mut [u8]) -> RecvFuture<'a> { + RecvFuture { + inner: &self.inner, + buf, + flag: 0, + } + } + pub fn peek<'a>(&'a self, buf: &'a mut [u8]) -> RecvFuture<'a> { + RecvFuture { + inner: &self.inner, + buf, + flag: libc::MSG_PEEK, + } + } + pub fn send<'a>(&'a self, buf: &'a [u8]) -> SendFuture { + SendFuture { + inner: &self.inner, + buf, + flag: libc::MSG_PEEK, + } + } +} +impl Drop for Socket { + fn drop(&mut self) { + unsafe { libc::close(self.inner.fd) }; + let poller = Poller::get(); + let _ = poller.del(&self.inner); + } +} +trait AsyncSocket { + fn poll_set(&self, dir: usize, e: i32, cx: &mut Context<'_>) -> Poll>; +} +impl AsyncSocket for Obj<'static, SocketInner> { + fn poll_set(&self, dir: usize, e: i32, cx: &mut Context<'_>) -> Poll> { + if e == libc::EWOULDBLOCK { + { + let mut lk = self.wakers.lock(); + if let Some(wk) = lk[dir].take() { + if wk.will_wake(cx.waker()) { + lk[dir] = Some(wk); + return Poll::Pending; + } + lk[dir] = Some(cx.waker().clone()); + drop(lk); + wk.wake(); + } else { + lk[dir] = Some(cx.waker().clone()); + } + } + Poller::get().modify(self)?; + Poll::Pending + } else { + Poll::Ready(Ok(())) + } + } +} +pub struct AcceptFuture<'a> { + inner: &'a Obj<'static, SocketInner>, +} +impl<'a> Future for AcceptFuture<'a> { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> core::task::Poll { + let mut sockaddr: libc::sockaddr_in = unsafe { MaybeUninit::zeroed().assume_init() }; + let mut len: u32 = Socket::ADDR_SIZE; + match syscall!(accept4( + self.inner.fd, + &mut sockaddr as *mut _ as *mut _, + &mut len, + libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC, + )) { + Ok(fd) => { + let addr = sockaddr.sin_addr.s_addr.to_le_bytes(); + let port = sockaddr.sin_port.to_be(); + Poll::Ready(Socket::from_raw(fd, addr, port)) + } + Err(e) => self + .inner + .poll_set(READ, unsafe { e.raw_os_error().unwrap_unchecked() }, cx) + .map(|_| Err(e)), + } + } +} +pub struct RecvFuture<'a> { + inner: &'a Obj<'static, SocketInner>, + buf: &'a mut [u8], + flag: c_int, +} +impl<'a> Future for RecvFuture<'a> { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match syscall!(recv( + self.inner.fd, + self.buf.as_ptr() as *mut _, + self.buf.len(), + 0 + )) { + Ok(size) => Poll::Ready(Ok(size as usize)), + Err(e) => self + .inner + .poll_set(READ, unsafe { e.raw_os_error().unwrap_unchecked() }, cx) + .map(|_| Err(e)), + } + } +} +pub struct SendFuture<'a> { + inner: &'a Obj<'static, SocketInner>, + buf: &'a [u8], + flag: c_int, +} +impl<'a> Future for SendFuture<'a> { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match syscall!(send( + self.inner.fd, + self.buf.as_ptr() as *mut _, + self.buf.len(), + libc::MSG_NOSIGNAL + )) { + Ok(size) => Poll::Ready(Ok(size as usize)), + Err(e) => self + .inner + .poll_set(WRITE, unsafe { e.raw_os_error().unwrap_unchecked() }, cx) + .map(|_| Err(e)), + } + } +} diff --git a/src/reactor.rs b/src/reactor.rs new file mode 100644 index 0000000000000000000000000000000000000000..51a0b7efabc1971b99260b2a95fceadd05dbb3e5 --- /dev/null +++ b/src/reactor.rs @@ -0,0 +1,33 @@ +use core::{mem::MaybeUninit, task::Waker}; +use stdlib::{io::Result, lazy::SyncLazy}; + +use crate::net::Poller; +pub struct Reactor { + poll: &'static Poller, +} +trait Source {} +unsafe impl Sync for Reactor {} +unsafe impl Send for Reactor {} + +impl Reactor { + pub fn get() -> &'static Reactor { + static REACTOR: SyncLazy = SyncLazy::::new(|| Reactor { + poll: Poller::get(), + }); + &REACTOR + } + pub fn react(&self) -> Result<()> { + let mut wks: [Option; 1024] = unsafe { MaybeUninit::zeroed().assume_init() }; + let len = match self.poll.wait(&mut wks) { + Ok(v) => v, + Err(err) if unsafe { err.raw_os_error().unwrap_unchecked() } == libc::EINTR => { + return Ok(()) + } + Err(err) => return Err(err), + }; + for wk in &mut wks[0..len] { + unsafe { wk.take().unwrap_unchecked().wake() }; + } + Ok(()) + } +}