From 333bd0e8e03fc8e06d6137ff81dbbfa1b5b1e562 Mon Sep 17 00:00:00 2001 From: overweight Date: Fri, 7 Jan 2022 18:01:22 +0800 Subject: [PATCH] add events module --- src/event/src/event/events.rs | 99 +++++++++++++++++++++++++++++++++++ src/event/src/event/mod.rs | 66 +++++++++++++++++++++++ src/event/src/event/source.rs | 77 +++++++++++++++++++++++++++ src/event/src/lib.rs | 18 +------ src/event/src/poll/epoll.rs | 30 +++++++---- src/event/src/poll/mod.rs | 15 +++--- 6 files changed, 271 insertions(+), 34 deletions(-) create mode 100644 src/event/src/event/events.rs create mode 100644 src/event/src/event/mod.rs create mode 100644 src/event/src/event/source.rs diff --git a/src/event/src/event/events.rs b/src/event/src/event/events.rs new file mode 100644 index 00000000..567f83fd --- /dev/null +++ b/src/event/src/event/events.rs @@ -0,0 +1,99 @@ +use super::Source; +use crate::poll::Poll; +use std::{ + collections::{BinaryHeap, HashMap}, + rc::Rc, cell::RefCell, +}; + +pub struct Events { + poller: Rc>, + exit: bool, + sources: HashMap>, + pending: BinaryHeap>, +} + +impl Events { + pub fn new() -> Self { + Self { + poller: Rc::new(RefCell::new(Poll::new().unwrap())), + exit: false, + sources: HashMap::new(), + pending: BinaryHeap::new(), + } + } + + pub fn add_source(&mut self, s: Rc) { + s.register(self.poller.clone()); + self.sources.insert(s.token(), s); + } + + /// Wait for the event event through poller + /// And add the corresponding events to the pengding queue + pub fn wait(&mut self, timeout: i32) -> bool { + let events = { + loop { + let result = self.poller.borrow().poll(timeout); + + match result { + Ok(events) => break events, + Err(_err) => return false, + }; + } + }; + + for event in events.iter() { + #[allow(unaligned_references)] + let token = &event.u64; + let s = self.sources.get(&token).unwrap(); + self.pending.push(s.clone()); + } + + true + } + + /// Wait for the event event through poller + /// And add the corresponding events to the pengding queue + pub fn prepare(&mut self) -> bool { + false + } + + /// Fetch the highest priority event processing on the pending queue + fn dispatch(&mut self) { + if let Some(first) = self.pending.peek() { + first.as_ref().dispatch(); + self.pending.pop(); + } + } + + /// Scheduling once, processing an event + pub fn run(&mut self, timeout: i32) { + if self.prepare() || self.pending.is_empty() { + self.wait(timeout); + } + self.dispatch(); + } + + /// Process the event in a loop until exiting actively + pub fn rloop(&mut self) { + loop { + if self.exit == true { + break; + } + self.run(-1i32); + } + } + + pub fn exit(&mut self) { + self.exit = true; + } + + pub fn now() { + todo!(); + } +} + +impl Default for Events { + fn default() -> Self { + Self::new() + } +} diff --git a/src/event/src/event/mod.rs b/src/event/src/event/mod.rs new file mode 100644 index 00000000..f2fefe0f --- /dev/null +++ b/src/event/src/event/mod.rs @@ -0,0 +1,66 @@ +pub mod events; +pub mod source; + +pub use events::Events; +pub use source::Source; + +#[cfg(test)] +mod test { + use super::Source; + use std::{ + net::{TcpListener, TcpStream}, + os::unix::io::{AsRawFd, RawFd}, + rc::Rc, + }; + + #[derive(Debug)] + struct Io { + t: TcpStream, + d: &'static str, + } + + impl Io { + fn new(s: &'static str) -> Io { + Io { + t: TcpStream::connect(s).unwrap(), + d: s, + } + } + } + + impl Source for Io { + fn fd(&self) -> RawFd { + self.t.as_raw_fd() + } + + fn description(&self) -> &'static str { + self.d + } + } + + #[test] + fn test_run() { + use super::Events; + use std::thread; + use std::time::Duration; + + thread::spawn(move || { + let listener = TcpListener::bind("0.0.0.0:9097").unwrap(); + loop { + let (_stream, addr) = listener.accept().unwrap(); + println!("Accepted a new connection: {}", addr); + } + }); + + thread::sleep(Duration::from_millis(100)); + let mut e = Events::new(); + let s: Rc = Rc::new(Io::new("0.0.0.0:9097")); + let s2: Rc = Rc::new(Io::new("127.0.0.1:9097")); + e.add_source(s.clone()); + e.add_source(s2.clone()); + e.run(100); + e.run(100); + e.run(100); + // e.rloop(); + } +} diff --git a/src/event/src/event/source.rs b/src/event/src/event/source.rs new file mode 100644 index 00000000..3f192b7b --- /dev/null +++ b/src/event/src/event/source.rs @@ -0,0 +1,77 @@ +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + os::unix::io::RawFd, cell::RefCell, rc::Rc, +}; + +use crate::poll::Poll; + +pub trait Source { + fn fd(&self) -> RawFd; + + fn epoll_event(&self) -> libc::epoll_event { + libc::epoll_event { + events: (libc::EPOLLIN | libc::EPOLLONESHOT) as u32, + u64: self.token(), + } + } + + fn description(&self) -> &'static str; + + fn token(&self) -> u64 { + let mut s = DefaultHasher::new(); + self.description().hash(&mut s); + s.finish() + } + + fn priority(&self) -> i8 { + 0i8 + } + + fn dispatch(&self) { + println!("Dispatching!"); + } + + fn register(&self, poller: Rc>) { + let _ = poller.borrow_mut().register(self.fd(), &mut self.epoll_event()); + } + + fn reregister(&self, poller: Rc>) { + let _ = poller.borrow_mut().reregister(self.fd(), &mut self.epoll_event()); + } + + fn deregister(&self, poller: Rc>) { + let _ = poller.borrow_mut().unregister(self.fd()); + } +} + +// for HashSet +impl Hash for dyn Source { + fn hash(&self, state: &mut H) + where + H: Hasher, + { + self.token().hash(state); + } +} + +impl PartialEq for dyn Source { + fn eq(&self, other: &dyn Source) -> bool { + self.token() == other.token() + } +} + +impl Eq for dyn Source {} + +// for BinaryHeap +impl Ord for dyn Source { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.priority().cmp(&other.priority()).reverse() + } +} + +impl PartialOrd for dyn Source { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.priority().cmp(&other.priority()).reverse()) + } +} diff --git a/src/event/src/lib.rs b/src/event/src/lib.rs index 0c047083..e96af2b4 100644 --- a/src/event/src/lib.rs +++ b/src/event/src/lib.rs @@ -1,23 +1,9 @@ -//pub mod events; +pub mod event; pub mod poll; -//pub mod sources; - -#[allow(unused_macros)] -#[macro_export] -macro_rules! syscall { - ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ - let res = unsafe { libc::$fn($($arg, )*) }; - if res == -1 { - Err(std::io::Error::last_os_error()) - } else { - Ok(res) - } - }}; -} +// pub mod sources; #[cfg(test)] mod test { - #[cfg(unix)] #[test] fn build() {} } diff --git a/src/event/src/poll/epoll.rs b/src/event/src/poll/epoll.rs index 8811efe9..24754648 100644 --- a/src/event/src/poll/epoll.rs +++ b/src/event/src/poll/epoll.rs @@ -3,7 +3,18 @@ use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; use std::{io, ptr}; -use crate::syscall; +#[allow(unused_macros)] +#[macro_export] +macro_rules! syscall { + ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + let res = unsafe { libc::$fn($($arg, )*) }; + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; +} const LOWEST_FD: libc::c_int = 3; @@ -28,19 +39,18 @@ impl Epoll { }) } - pub(crate) fn poll( - &mut self, - timeout: Option, - ) -> io::Result> { + pub(crate) fn poll(&self, timeout: i32) -> io::Result> { let mut events = Vec::::with_capacity(self.n_sources.load(Relaxed)); - let _n_ready = syscall!(epoll_wait( + events.clear(); + let n_ready = syscall!(epoll_wait( self.epoll_fd, events.as_mut_ptr(), events.capacity() as i32, - timeout - .map(|to| to.as_millis() as libc::c_int) - .unwrap_or(-1), + timeout, )); + unsafe { + events.set_len(n_ready.unwrap() as usize); + } Ok(events) } @@ -76,13 +86,11 @@ mod test { use super::Epoll; use libc::EPOLLIN; - #[cfg(unix)] #[test] fn epoll_new() { let _ = Epoll::new(); } - #[cfg(unix)] #[test] fn epoll_add() { let mut poll = Epoll::new().unwrap(); diff --git a/src/event/src/poll/mod.rs b/src/event/src/poll/mod.rs index 00125ca3..0d9eb224 100644 --- a/src/event/src/poll/mod.rs +++ b/src/event/src/poll/mod.rs @@ -1,19 +1,20 @@ -mod epoll; - -use epoll::Epoll; use libc::epoll_event; +use std::io; use std::os::unix::{io::AsRawFd, io::RawFd}; -use std::{io, time}; + +pub mod epoll; +#[cfg(unix)] +use epoll::Epoll as Poller; #[derive(Debug, Default)] pub struct Poll { - poller: Epoll, + poller: Poller, } impl Poll { pub fn new() -> io::Result { Ok(Poll { - poller: Epoll::new()?, + poller: Poller::new()?, }) } @@ -23,7 +24,7 @@ impl Poll { }) } - pub fn poll(&mut self, timeout: Option) -> io::Result> { + pub fn poll(&self, timeout: i32) -> io::Result> { self.poller.poll(timeout) } -- Gitee