diff --git a/.gitignore b/.gitignore index 5d947ca8879f8a9072fe485c566204e3c2929e80..4fffb2f89cbd8f2169ce9914bd16bd43785bb368 100644 --- a/.gitignore +++ b/.gitignore @@ -1,18 +1,2 @@ -# Build and Release Folders -bin-debug/ -bin-release/ -[Oo]bj/ -[Bb]in/ - -# Other files and folders -.settings/ - -# Executables -*.swf -*.air -*.ipa -*.apk - -# Project files, i.e. `.project`, `.actionScriptProperties` and `.flexProperties` -# should NOT be excluded as they contain compiler settings and other important -# information for Eclipse / Flash Builder. +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..0c03bc38ad00e591b24fe77b22340d6c42e017d4 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,23 @@ +[package] +edition = "2021" +name = "async-task" +version = "1.0.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["signle_thread", "alloc"] +alloc = ["stdlib/alloc"] +std = ["stdlib/std"] + +multiple_thread = [] +signle_thread = [] + +debug = [] + +[dependencies] +cfg-if = "1.0.0" +stdlib = { path = "../stdlib", default-features = false } +[profile.dev] +panic = "abort" +[profile.release] +panic = "abort" \ No newline at end of file diff --git a/README.en.md b/README.en.md index 7a782488ba2ff6f66d8c9c7d670940957644c2e8..5b7efb0d5572df0a37ff2dd1a19c01883d1474ce 100644 --- a/README.en.md +++ b/README.en.md @@ -1,36 +1,5 @@ # async-task #### Description -异步运行状态 +A task abstraction that builds an executor, representing the running state of an asynchronous task. -#### Software Architecture -Software architecture description - -#### Installation - -1. xxxx -2. xxxx -3. xxxx - -#### Instructions - -1. xxxx -2. xxxx -3. xxxx - -#### Contribution - -1. Fork the repository -2. Create Feat_xxx branch -3. Commit your code -4. Create Pull Request - - -#### Gitee Feature - -1. You can use Readme\_XXX.md to support different languages, such as Readme\_en.md, Readme\_zh.md -2. Gitee blog [blog.gitee.com](https://blog.gitee.com) -3. Explore open source project [https://gitee.com/explore](https://gitee.com/explore) -4. The most valuable open source project [GVP](https://gitee.com/gvp) -5. The manual of Gitee [https://gitee.com/help](https://gitee.com/help) -6. The most popular members [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/README.md b/README.md index 185bc01b4ba87e15a40f16076ff503302dad4e46..cf0120048e1b02ca2e0fae455d6d0b28784776ae 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,5 @@ # async-task #### 介绍 -异步运行状态 +构建执行器的任务抽象,代表异步任务的运行状态。 -#### 软件架构 -软件架构说明 - - -#### 安装教程 - -1. xxxx -2. xxxx -3. xxxx - -#### 使用说明 - -1. xxxx -2. xxxx -3. xxxx - -#### 参与贡献 - -1. Fork 本仓库 -2. 新建 Feat_xxx 分支 -3. 提交代码 -4. 新建 Pull Request - - -#### 特技 - -1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md -2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com) -3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目 -4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目 -5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) -6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..4c8aa113a686324b72fa7b093a505ef184d39cbd --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,12 @@ +#![cfg_attr(feature = "alloc", no_std)] + +extern crate alloc; +#[macro_use] +extern crate cfg_if; + +mod raw; +mod runnable; +mod state; +mod task; +pub use runnable::{spawn_unchecked, Runnable}; +pub use task::Task; diff --git a/src/raw.rs b/src/raw.rs new file mode 100644 index 0000000000000000000000000000000000000000..d53b6296ce6a089652e4ca6a01ddc25a8df329ae --- /dev/null +++ b/src/raw.rs @@ -0,0 +1,477 @@ +use core::{ + alloc::Layout, + cell::UnsafeCell, + future::Future, + mem::{transmute, ManuallyDrop}, + ops::DerefMut, + pin::Pin, + ptr::NonNull, + sync::atomic::{AtomicUsize, Ordering}, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, +}; +cfg_if! { + if #[cfg(feature="debug")]{ + use core::{fmt, panic::Location}; + use alloc::format; + use core::fmt::write; + use stdlib::println; + } +} + +use alloc::alloc::{alloc, dealloc}; + +use crate::{ + state::{ + AWAITER, CLOSED, COMPLETED, NOTIFYING, REFERENCE, REGISTERING, RUNNING, SCHEDULED, TASK, + }, + Runnable, +}; +#[cfg(feature = "debug")] +macro_rules! RawState { + ($ty:ty) => { + RawStatus<$ty> + }; +} +#[cfg(feature = "debug")] +pub(crate) struct RawStatus(T); +#[cfg(feature = "debug")] +impl RawStatus { + #[track_caller] + fn new(v: usize) -> Self { + let locate = core::panic::Location::caller(); + Self::debug(locate, v); + Self(AtomicUsize::new(v)) + } + #[track_caller] + pub(crate) fn fetch_and(&self, v: usize, ord: Ordering) -> usize { + let locate = core::panic::Location::caller(); + let rtn = self.0.fetch_and(v, ord); + Self::debug(locate, rtn & v); + rtn + } + #[track_caller] + pub(crate) fn fetch_or(&self, v: usize, ord: Ordering) -> usize { + let locate = core::panic::Location::caller(); + let rtn = self.0.fetch_or(v, ord); + Self::debug(locate, v | rtn); + rtn + } + #[track_caller] + pub(crate) fn load(&self, ord: Ordering) -> usize { + let locate = core::panic::Location::caller(); + let v = self.0.load(ord); + Self::debug(locate, v); + v + } + #[track_caller] + pub(crate) fn compare_exchange_weak( + &self, + old: usize, + new: usize, + ord1: Ordering, + ord2: Ordering, + ) -> Result { + let locate = core::panic::Location::caller(); + let v = self.0.compare_exchange_weak(old, new, ord1, ord2); + if let Ok(_) = v { + Self::debug(locate, new) + }; + v + } + #[track_caller] + fn fetch_add(&self, v: usize, ord: Ordering) -> usize { + let locate = core::panic::Location::caller(); + let rtn = self.0.fetch_add(v, ord); + Self::debug(locate, v + rtn); + rtn + } + #[track_caller] + fn fetch_sub(&self, v: usize, ord: Ordering) -> usize { + let locate = core::panic::Location::caller(); + let rtn = self.0.fetch_sub(v, ord); + Self::debug(locate, rtn - v); + rtn + } + fn debug(locate: &Location, v: usize) { + let mut s = format!("{}:{}:{}\t", locate.file(), locate.line(), v >> 8); + let mut state = v & 0xff; + while let Ok(_) = match state { + _ if state & SCHEDULED > 0 => { + state &= !SCHEDULED; + s.write_str("SCHEDULE|") + } + _ if state & RUNNING > 0 => { + state &= !RUNNING; + s.write_str("RUNNING|") + } + _ if state & COMPLETED > 0 => { + state &= !COMPLETED; + s.write_str("COMPLETED|") + } + _ if state & CLOSED > 0 => { + state &= !CLOSED; + s.write_str("CLOSED|") + } + _ if state & TASK > 0 => { + state &= !TASK; + s.write_str("TASK|") + } + _ if state & AWAITER > 0 => { + state &= !AWAITER; + s.write_str("AWAITER|") + } + _ if state & REGISTERING > 0 => { + state &= !REGISTERING; + s.write_str("REGISTERING|") + } + _ if state & NOTIFYING > 0 => { + state &= !NOTIFYING; + s.write_str("NOTIFYING|") + } + _ => Err(fmt::Error), + } {} + println!("{}\n\0", s); + println!(); + } +} +#[cfg(not(feature = "debug"))] +macro_rules! RawState { + ($ty:ty) => { + $ty + }; +} +pub(crate) struct RawTask { + pub(crate) state: RawState!(AtomicUsize), + pub(crate) awaiter: UnsafeCell>, + pub(crate) output: *mut (), + dynfn: NonNull, +} +trait TaskDynFn { + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()>; + fn schedule(&self, runnable: Runnable); + fn drop_future(&mut self); + fn drop_output(&mut self); + fn destory(&mut self); +} +impl RawTask { + #[allow(function_item_references)] + const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + unsafe { transmute(Self::waker_clone as *const u8) }, + unsafe { transmute(Self::waker_wake as *const u8) }, + unsafe { transmute(Self::waker_wake_by_ref as *const u8) }, + unsafe { transmute(Self::drop_ref as *const u8) }, + ); + /// Allocate a memory context for a task once with the specified `future` and `schedule`. + /// + /// Assignment assumes that the initial state is already scheduled for execution and that there is a `Task` to receive the return value + pub(crate) fn new(future: F, schedule: S) -> NonNull + where + F: Future, + S: Fn(Runnable), + { + let ly = Layout::new::>(); + unsafe { + let mem = alloc(ly) as *mut TaskMemMap; + mem.write(TaskMemMap { + inner: RawTask { + state: ::new(TASK | SCHEDULED | REFERENCE), + awaiter: UnsafeCell::new(None), + output: (&(*mem).fo.output as *const _ as *mut ()), + dynfn: NonNull::new_unchecked(transmute(mem as *mut dyn TaskDynFn)), + }, + ly, + sch: schedule, + fo: FO { + future: ManuallyDrop::new(future), + }, + }); + return NonNull::new_unchecked(&mut (*mem).inner); + } + + union FO { + future: ManuallyDrop, + output: ManuallyDrop, + } + struct TaskMemMap { + inner: RawTask, + ly: Layout, + sch: S, + fo: FO, + } + impl, T, S: Fn(Runnable)> TaskDynFn for TaskMemMap { + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let future = unsafe { Pin::new_unchecked(self.fo.future.deref_mut()) }; + if let Poll::Ready(rtn) = future.poll(cx) { + self.drop_future(); + self.fo.output = ManuallyDrop::new(rtn); + Poll::Ready(()) + } else { + Poll::Pending + } + } + fn schedule(&self, runnable: Runnable) { + (&self.sch)(runnable); + } + fn drop_future(&mut self) { + unsafe { ManuallyDrop::drop(&mut self.fo.future) }; + } + fn drop_output(&mut self) { + unsafe { ManuallyDrop::drop(&mut self.fo.output) }; + } + fn destory(&mut self) { + unsafe { + (&mut self.sch as *mut S).drop_in_place(); + dealloc(self as *const _ as *mut u8, self.ly); + }; + } + } + } + + /// Clone a Waker and add a reference to it. + pub(crate) fn waker_clone(&self) -> RawWaker { + self.state.fetch_add(REFERENCE, Ordering::Relaxed); + RawWaker::new(self as *const _ as *const (), &Self::RAW_WAKER_VTABLE) + } + /// Delete a Waker, if the reference count is 0 then delete the task. + pub(crate) fn drop_ref(&mut self) { + let new = self.state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { + self.destroy(); + } + } + /// Wake-up task, here is unified use Ref wake-up + /// + /// Note: References need to be decremented after waking up. + fn waker_wake(&mut self) { + self.waker_wake_by_ref(); + self.drop_ref(); + } + /// Task wake-up logic + /// + /// Awakening of a task means simply scheduling the task to an executor. + pub(crate) fn waker_wake_by_ref(&mut self) { + let mut state = self.state.load(Ordering::Acquire); + + loop { + // If the task has been completed, or has been Closed, it is not allowed to be awakened again. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + // If the task has already woken up, simply wake up again and exit + // The task will be automatically scheduled to run after it finishes running. + if state & SCHEDULED != 0 { + match self.state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } else { + // If the task is neither scheduled, nor Closed, nor completed, the task status is converted to SCHEDULED. + match self.state.compare_exchange_weak( + state, + state | SCHEDULED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not running, it can now be actually scheduled for execution. + // Otherwise, the task will continue to be scheduled for execution after running. + if state & RUNNING == 0 { + self.schedule() + }; + break; + } + Err(s) => state = s, + } + } + } + } + + pub(crate) unsafe fn run(&mut self) -> bool { + let mut state = self.state.load(Ordering::Acquire); + loop { + // The RUNNING state of the task switch. + let new = (state & !SCHEDULED) | RUNNING; + match self + .state + .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire) + { + Ok(_) => { + state = new; + break; + } + Err(s) => state = s, + } + } + // Start the real execution of the Future + let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new( + self as *const _ as *const (), + &Self::RAW_WAKER_VTABLE, + ))); + let cx = &mut Context::from_waker(&waker); + let poll = self.dynfn.as_mut().poll(cx); + match poll { + Poll::Ready(_) => { + loop { + // Task switches to COMPLETED state + let new = if state & TASK == 0 { + (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED + } else { + (state & !RUNNING & !SCHEDULED) | COMPLETED + }; + match self.state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + if state & TASK == 0 || state & CLOSED != 0 { + self.drop_output(); + } + if state & AWAITER != 0 { + self.notify(); + } + self.drop_ref(); + break; + } + Err(s) => { + state = s; + } + } + } + } + + Poll::Pending => loop { + let new = if state & CLOSED != 0 { + state & !RUNNING & !SCHEDULED + } else { + state & !RUNNING + }; + match self.state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + if state & CLOSED != 0 { + if state & AWAITER != 0 { + self.notify(); + } + self.drop_future(); + self.drop_ref(); + } else if state & SCHEDULED != 0 { + self.schedule(); + return true; + } + break; + } + Err(s) => state = s, + } + }, + } + false + } + pub(crate) fn schedule(&self) { + unsafe { + let runnable = Runnable { + raw: NonNull::new_unchecked(self as *const _ as *mut Self), + }; + self.dynfn.as_ref().schedule(runnable); + } + } + #[inline] + pub(crate) fn drop_output(&mut self) { + unsafe { self.dynfn.as_mut().drop_output() }; + } + #[inline] + pub(crate) fn drop_future(&mut self) { + unsafe { self.dynfn.as_mut().drop_future() }; + } + #[inline] + pub(crate) fn destroy(&mut self) { + unsafe { self.dynfn.as_mut().destory() }; + } + + pub(crate) fn notify(&self) { + if let Some(w) = self.take() { + w.wake(); + } + } + + pub(crate) fn take(&self) -> Option { + let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel); + // If there is already a thread acquiring or registering, it will return None. + // Then, the registered thread will wake up once. + if state & (NOTIFYING | REGISTERING) == 0 { + let waker = unsafe { (*self.awaiter.get()).take() }; + + self.state + .fetch_and(!NOTIFYING & !AWAITER, Ordering::Release); + return waker; + }; + None + } + pub(crate) fn register(&self, waker: &Waker) { + let mut state = self.state.load(Ordering::Acquire); + loop { + // Registration cannot be in multiple threads, so State should never be REGISTERING + debug_assert!(state & REGISTERING == 0); + // If there is already a thread to apply for notification, there is no need to register for direct notification and exit + if state & NOTIFYING != 0 { + waker.wake_by_ref(); + return; + } + // Change to registering state, and should never be in notification state at this time. + match self.state.compare_exchange_weak( + state, + state | REGISTERING, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + state |= REGISTERING; + break; + } + Err(s) => state = s, + } + } + // It is now safe to get the waker in your hands. + unsafe { + (*self.awaiter.get()) = Some(waker.clone()); + } + + let mut waker = None; + + loop { + if state & NOTIFYING != 0 { + // Judging again, if a thread has already applied for notification, the waker is taken out, and then notified. + if let Some(w) = unsafe { (*self.awaiter.get()).take() } { + waker = Some(w); + } + } + let new = if waker.is_none() { + (state & !NOTIFYING & !REGISTERING) | AWAITER + } else { + state & !NOTIFYING & !REGISTERING & !AWAITER + }; + + match self + .state + .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire) + { + Ok(_) => break, + Err(s) => state = s, + } + } + + if let Some(w) = waker { + w.wake(); + } + } +} diff --git a/src/runnable.rs b/src/runnable.rs new file mode 100644 index 0000000000000000000000000000000000000000..bdaecfc8847511a375ae492e83c47566c4349d9f --- /dev/null +++ b/src/runnable.rs @@ -0,0 +1,101 @@ +use crate::raw::RawTask; +use crate::state::{AWAITER, CLOSED, COMPLETED, SCHEDULED}; +use crate::Task; +use alloc::boxed::Box; +use core::future::Future; +use core::marker::PhantomData; +use core::mem::{self, forget}; +use core::ptr::NonNull; +use core::sync::atomic::Ordering; +use core::task::Waker; +#[derive(Debug)] +pub struct Runnable { + pub(crate) raw: NonNull, +} +unsafe impl Send for Runnable {} +unsafe impl Sync for Runnable {} + +#[cfg(feature = "std")] +impl std::panic::UnwindSafe for Runnable {} +#[cfg(feature = "std")] +impl std::panic::RefUnwindSafe for Runnable {} + +impl Runnable { + pub fn schedule(self) { + let raw = unsafe { self.raw.as_ref() }; + forget(self); + raw.schedule(); + } + + pub fn run(self) -> bool { + let mut raw = self.raw; + forget(self); + unsafe { raw.as_mut().run() } + } + + pub fn waker(&self) -> Waker { + unsafe { + let raw_waker = self.raw.as_ref().waker_clone(); + Waker::from_raw(raw_waker) + } + } +} +impl Drop for Runnable { + fn drop(&mut self) { + let raw = unsafe { self.raw.as_mut() }; + + let mut state = raw.state.load(Ordering::Acquire); + + loop { + // If the task has been completed or closed, it can't be canceled. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // Mark the task as closed. + match raw.state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } + + // Drop the future. + raw.drop_future(); + + // Mark the task as unscheduled. + let state = raw.state.fetch_and(!SCHEDULED, Ordering::AcqRel); + + // Notify the awaiter that the future has been dropped. + if state & AWAITER != 0 { + raw.notify(); + } + + // Drop the task reference. + raw.drop_ref(); + } +} + +pub unsafe fn spawn_unchecked(future: F, schedule: S) -> (Runnable, Task) +where + F: Future, + S: Fn(Runnable), +{ + let raw = if mem::size_of::() >= 2048 { + let future = Box::pin(future); + RawTask::new::<_, F::Output, S>(future, schedule) + } else { + RawTask::new::(future, schedule) + }; + + let runnable = Runnable { raw }; + let task = Task { + raw, + _marker: PhantomData, + }; + (runnable, task) +} diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000000000000000000000000000000000000..2bd403eac213d4dd679c76f0ed904fd19794d879 --- /dev/null +++ b/src/state.rs @@ -0,0 +1,9 @@ +pub(crate) const SCHEDULED: usize = 1 << 0; +pub(crate) const RUNNING: usize = 1 << 1; +pub(crate) const COMPLETED: usize = 1 << 2; +pub(crate) const CLOSED: usize = 1 << 3; +pub(crate) const TASK: usize = 1 << 4; +pub(crate) const AWAITER: usize = 1 << 5; +pub(crate) const REGISTERING: usize = 1 << 6; +pub(crate) const NOTIFYING: usize = 1 << 7; +pub(crate) const REFERENCE: usize = 1 << 8; diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000000000000000000000000000000000000..d3ecf265dee38a572c9490c1c5c154230757d2d6 --- /dev/null +++ b/src/task.rs @@ -0,0 +1,170 @@ +use core::future::Future; +use core::marker::{PhantomData, Unpin}; +use core::mem::forget; +use core::pin::Pin; +use core::ptr::NonNull; +use core::sync::atomic::Ordering; +use core::task::{Context, Poll}; + +use crate::raw::RawTask; +use crate::state::*; + +pub struct Task { + pub(crate) raw: NonNull, + + pub(crate) _marker: PhantomData, +} + +unsafe impl Send for Task {} +unsafe impl Sync for Task {} + +impl Unpin for Task {} + +impl Task { + // Let the Task separate from the RawTask, and the RawTask runs freely until it ends. + #[inline] + pub fn detach(mut self) -> Option { + let rtn = unsafe { self.set_detached() }; + forget(self); + rtn + } + // Cancels the task, and returns the result if the task has completed, otherwise returns None. + pub async fn cancel(mut self) -> Option { + // Create Waiting to wait for the end of the task. + struct Waiting<'a, T>(&'a mut Task); + impl<'a, T> Future for Waiting<'a, T> { + type Output = Option; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0.poll_task(cx) + } + } + // cancel task + self.set_canceled(); + // wait for the task to end + let rtn = Waiting(&mut self).await; + // separation from task + self.detach(); + rtn + } + // Cancel task execution (may only be called once, because either cancel or drop will take ownership) + fn set_canceled(&mut self) { + let raw = unsafe { self.raw.as_mut() }; + let mut state = raw.state.load(Ordering::Acquire); + loop { + // Cancellation is not required if the task has already been completed or closed. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + match raw.state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is in the Pendding state, notify the waiter + // Drop Future + // Decrease the reference count. At this time, TASK is definitely not separated from the task, so it will not really destroy the task. + // When the task is separated, it will actually destroy the task + // Here the notified person can only be other tasks + if state & (SCHEDULED | RUNNING) == 0 { + // raw.notify(None); + raw.drop_future(); + raw.drop_ref(); + } + } + Err(s) => state = s, + } + } + } + /// Separating Task from Task + unsafe fn set_detached(&mut self) -> Option { + let raw = self.raw.as_mut(); + let mut output = None; + // In most cases, it is separated when creating a task, just simply delete the task associated tag + if let Err(mut state) = raw.state.compare_exchange_weak( + SCHEDULED | TASK | REFERENCE, + SCHEDULED | REFERENCE, + Ordering::AcqRel, + Ordering::Acquire, + ) { + // Remove task associated tags + state = raw.state.fetch_and(!TASK, Ordering::AcqRel); + // If the pre-removal task was completed, but not CLOSED + // Then you need to extract the execution result of the task + if state & (COMPLETED | CLOSED) == COMPLETED { + output = Some((raw.output as *mut T).read()); + } + // If the task reference is already zero when detached, the task needs to be destroyed. + if state & !(REFERENCE - 1) == 0 { + raw.destroy(); + } + } + output + } + + fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll> { + unsafe { + let raw = self.raw.as_mut(); + let mut state = raw.state.load(Ordering::Acquire); + + // If the task is canceled, register itself to the task and wait for the task cancellation to complete. + // The task execution result is not obtained at this time. + if state & CLOSED != 0 { + // If the task is scheduled or running, we need to wait until its future is abandoned. + if state & (SCHEDULED | RUNNING) != 0 { + // Replace the waker with the one associated with the current task. + raw.register(cx.waker()); + + // Reload state after registration. Probably changed before registration, so we need to check. + state = raw.state.load(Ordering::Acquire); + + // If the task is still scheduled or running, we need to wait because its future hasn't given up. + if state & (SCHEDULED | RUNNING) != 0 { + return Poll::Pending; + } else { + let _rtn = raw.take(); + } + } + return Poll::Ready(None); + } + // If the task is not completed, register the current task. + if state & COMPLETED == 0 { + // Replace the waker with the one associated with the current task. + raw.register(cx.waker()); + // Reload state after registration. Tasks may be completed or closed before registration, so we need to check that. + state = raw.state.load(Ordering::Acquire); + // If the task is still not completed, we are blocked. + if state & COMPLETED == 0 { + return Poll::Pending; + } else { + raw.take(); + } + } + raw.state.fetch_or(CLOSED, Ordering::AcqRel); + // Get the output from the task. + let output = raw.output as *mut T; + return Poll::Ready(Some(output.read())); + } + } +} + +impl Drop for Task { + fn drop(&mut self) { + unsafe { + self.set_canceled(); + self.set_detached(); + } + } +} + +impl Future for Task { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.poll_task(cx) { + Poll::Ready(t) => Poll::Ready(t.expect("task has failed")), + Poll::Pending => Poll::Pending, + } + } +}