From 275ae95c37d7346fb8498e04bc4ffc34612c082b Mon Sep 17 00:00:00 2001 From: fqwert Date: Mon, 23 Oct 2023 16:25:17 +0800 Subject: [PATCH] task_local! Signed-off-by: fqwert --- ylong_runtime/src/lib.rs | 1 + ylong_runtime/src/task_local.rs | 341 ++++++++++++++++++++++++++++++ ylong_runtime/tests/task_local.rs | 148 +++++++++++++ 3 files changed, 490 insertions(+) create mode 100644 ylong_runtime/src/task_local.rs create mode 100644 ylong_runtime/tests/task_local.rs diff --git a/ylong_runtime/src/lib.rs b/ylong_runtime/src/lib.rs index 07310ef..9d744fd 100644 --- a/ylong_runtime/src/lib.rs +++ b/ylong_runtime/src/lib.rs @@ -56,6 +56,7 @@ pub(crate) mod spawn; #[cfg(feature = "sync")] pub mod sync; pub mod task; +pub mod task_local; cfg_time! { pub mod time; diff --git a/ylong_runtime/src/task_local.rs b/ylong_runtime/src/task_local.rs new file mode 100644 index 0000000..bbe2906 --- /dev/null +++ b/ylong_runtime/src/task_local.rs @@ -0,0 +1,341 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Task local storage + +use std::cell::RefCell; +use std::future::Future; +use std::marker::PhantomPinned; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; + +mod task_local_macro { + /// Declare a new task local storage key of type + /// [`crate::task_local::LocalKey`]. + /// + /// # Syntax + /// The macro wraps any number of static declarations and makes them task + /// local. Publicity and attributes for each static are allowed. + /// **Unlike thread_local, task_local does not specify the initial value.** + /// Example: + /// + /// ``` + /// use std::cell::RefCell; + /// ylong_runtime::task_local! { + /// pub static FOO: RefCell; + /// + /// static BAR: RefCell; + /// } + /// ``` + /// + /// See [`LocalKey` documentation][`crate::task_local::LocalKey`] for more + /// information. + /// + /// [`crate::task_local::LocalKey`]: crate::task_local::LocalKey + #[macro_export] + macro_rules! task_local { + () => {}; + + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty; $($rest:tt)*) => { + $crate::task_local!($(#[$attr])* $vis static $name, $t); + $crate::task_local!($($rest)*); + }; + + ($(#[$attr:meta])* $vis:vis static $name:ident: $t:ty) => { + $crate::task_local!{$(#[$attr])* $vis static $name, $t}; + }; + + ($(#[$attr:meta])* $vis:vis static $name:ident, $t:ty) => { + $(#[$attr])* + $vis static $name: $crate::task_local::LocalKey<$t> = { + thread_local! { + static KEY:std::cell::RefCell> =const{ std::cell::RefCell::new(None)}; + } + $crate::task_local::LocalKey {inner: KEY} + }; + }; +} +} + +/// A task local storage key which owns its contents. +/// +/// It is instantiated with the [`task_local!`] macro and the primary method is +/// the [`scope`] method. +/// +/// [`task_local!`]: crate::task_local! +/// [`scope`]: LocalKey::scope +/// +/// Unlike [`std::thread::LocalKey`] which lazily initialises its value on first +/// access, `LocalKey` generated by [`task_local!`] is initialised for the first +/// time when the futures executor first polls the futures containing the task +/// local. +/// +/// # Example +/// +/// ``` +/// use ylong_runtime::task_local; +/// +/// task_local! { +/// static FOO: i32; +/// } +/// let handle1 = ylong_runtime::spawn(async { FOO.scope(6, async { FOO.get() }).await }); +/// +/// assert_eq!(ylong_runtime::block_on(handle1).unwrap(), 6); +/// ``` +#[derive(Debug)] +pub struct LocalKey { + /// inner thread LocalKey + pub inner: std::thread::LocalKey>>, +} + +impl LocalKey { + /// Set a value T to the task-local value for the future F. + /// On completion of scope, the task-local will be dropped. + /// + /// ###Example + /// ``` + /// ylong_runtime::task_local! { + /// static F:usize; + /// } + /// + /// ylong_runtime::block_on(async { + /// F.scope(1, async { + /// F.scope(2, async { + /// assert_eq!(F.get(), 2); + /// }) + /// .await; + /// assert_eq!(F.get(), 1); + /// }) + /// .await; + /// }) + /// ``` + pub fn scope(&'static self, value: T, fut: Fut) -> TaskLocalFut { + TaskLocalFut { + local: self, + slot: Some(value), + future: Some(fut), + _pin: PhantomPinned, + } + } + + /// Sets a value `T` as the task-local value for the closure `F`. + /// On completion of `sync_scope`, the task-local will be dropped. + /// + /// ### Example + /// + /// ``` + /// ylong_runtime::task_local! { + /// static F:i32; + /// } + /// + /// F.sync_scope(1, || { + /// assert_eq!(F.get(), 1); + /// }) + /// ``` + pub fn sync_scope(&'static self, value: T, f: F) -> R + where + F: FnOnce() -> R, + { + let mut slot = Some(value); + self.scope_inner(&mut slot, f) + } + + fn scope_inner(&'static self, slot: &mut Option, f: F) -> R + where + F: FnOnce() -> R, + { + self.swap_slot(slot); + let res = f(); + self.swap_slot(slot); + res + } + + fn swap_slot(&'static self, slot: &mut Option) { + self.inner + .try_with(|inner| { + mem::swap( + &mut *inner.try_borrow_mut().expect( + "cannot enter a task-local scope while the Task Local Storage is borrowed", + ), + slot, + ) + }) + .expect("cannot access a Task Local Storage value during or after destruction"); + } + /// Acquires a reference to the value in this Task Local storage key. + /// + /// This will not initialize the value, so it should be invoked inside a + /// `scope`. + /// + /// # Panics + /// + /// This function will `panic!()` if the key is currently borrowed or its + /// destructor is running, and it **may** panic if the + /// destructor has previously been run for this thread. + /// + /// # Example + /// + /// ``` + /// ylong_runtime::task_local! { + /// static A:i32; + /// } + /// + /// A.sync_scope(1, || { + /// A.with(|a| assert_eq!(*a, 1)); + /// }) + /// ``` + + pub fn with(&'static self, f: F) -> R + where + F: FnOnce(&T) -> R, + { + match self.try_with(f) { + Ok(res) => res, + Err(e) => { + e.panic(); + unreachable!() + } + } + } + + /// Acquires a reference to the value in this Task Local storage key. + /// + /// This will not initialize the value, so it should be invoked inside a + /// `scope`. If the key has been destroyed (which may happen if this is + /// called in a destructor), this function will return an + /// [`AccessError`]. + /// + /// # Example + /// + /// ``` + /// ylong_runtime::task_local! { + /// static A:i32; + /// } + /// + /// A.sync_scope(1, || { + /// A.try_with(|a| assert_eq!(*a, 1)); + /// }) + /// ``` + /// + /// [`AccessError`]:ScopeError::AccessError + pub fn try_with(&'static self, f: F) -> Result + where + F: FnOnce(&T) -> R, + { + match self.inner.try_with(|inner| inner.borrow().as_ref().map(f)) { + Ok(Some(res)) => Ok(res), + Ok(None) => Err(ScopeError::InitializeError), + Err(_) => Err(ScopeError::AccessError), + } + } +} + +/// Enum to store the various types of errors that can cause +/// [`LocalKey::try_with`](LocalKey::try_with) to fail. +#[derive(Debug, PartialEq, Eq)] +pub enum ScopeError { + /// An error returned by [`LocalKey::try_with`](LocalKey::try_with) + AccessError, + /// An error indicating that the Task Local Storage Key is not initialized + /// before invoking the [`LocalKey::try_with`](LocalKey::try_with) + InitializeError, +} + +impl ScopeError { + fn panic(self) { + match self { + ScopeError::AccessError => { + panic!("cannot access a Thread Local Storage value during or after destruction") + } + ScopeError::InitializeError => panic!("Thread Local Storage is not Initialized"), + } + } +} + +impl LocalKey { + /// get the value of the inner thread LocalKey, panic if it is not + /// initialized + pub fn get(&'static self) -> T { + self.with(|v| *v) + } +} +/// A future that sets a value `T` of a task local for the future `F` during +/// its execution. +/// +/// The value of the task-local must be `'static` and will be dropped on the +/// completion of the future. +/// +/// ### Examples +/// +/// ``` +/// ylong_runtime::task_local! { +/// static FOO: u32; +/// } +/// ylong_runtime::block_on(async { +/// FOO.scope(1, async { +/// println!("task local value: {}", FOO.get()); +/// }) +/// .await; +/// }); +/// ``` +pub struct TaskLocalFut { + local: &'static LocalKey, + slot: Option, + future: Option, + _pin: PhantomPinned, +} + +impl Future for TaskLocalFut { + type Output = Fut::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { + local, + slot, + future, + _pin, + } = unsafe { self.get_unchecked_mut() }; + let mut future_opt = unsafe { Pin::new_unchecked(future) }; + let res = local.scope_inner(slot, || match future_opt.as_mut().as_pin_mut() { + Some(fut) => { + let res = fut.poll(cx); + if res.is_ready() { + future_opt.set(None); + } + Some(res) + } + None => None, + }); + res.unwrap() + } +} + +impl Drop for TaskLocalFut { + fn drop(&mut self) { + let pin_self = unsafe { Pin::new_unchecked(self) }; + let TaskLocalFut { + local, + slot, + future, + _pin, + } = unsafe { pin_self.get_unchecked_mut() }; + + if mem::needs_drop::() && future.is_some() { + let mut future = unsafe { Pin::new_unchecked(future) }; + local.scope_inner(slot, || { + future.set(None); + }); + } + } +} diff --git a/ylong_runtime/tests/task_local.rs b/ylong_runtime/tests/task_local.rs new file mode 100644 index 0000000..1083a83 --- /dev/null +++ b/ylong_runtime/tests/task_local.rs @@ -0,0 +1,148 @@ +// Copyright (c) 2023 Huawei Device Co., Ltd. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg(feature = "full")] + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use ylong_runtime::sync::oneshot; +use ylong_runtime::sync::oneshot::Sender; + +/// SDV test cases for task_local on local value. +/// +/// # Brief +/// 1. Initialize a task_local. +/// 2. check the result of LocalKey in different usage. +#[test] +fn sdv_task_local_get() { + use ylong_runtime::task_local::ScopeError::InitializeError; + + ylong_runtime::task_local! { + static A: u32; + pub static B: bool; + } + + ylong_runtime::block_on(async { + let handle1 = ylong_runtime::spawn(A.scope(1, async { + assert_eq!(A.get(), 1); + A.scope(2, async { + assert_eq!(A.get(), 2); + }) + .await + })); + + let handle2 = ylong_runtime::spawn(A.scope(2, async { + A.with(|v| { + assert_eq!(A.get(), 2); + assert_eq!(*v, 2); + }); + assert_eq!(A.get(), 2); + })); + + let handle3 = ylong_runtime::spawn(B.scope(true, async { + assert!(B.get()); + })); + + let handle4 = ylong_runtime::spawn(async { B.try_with(|b| *b) == Err(InitializeError) }); + + let _ = handle1.await; + let _ = handle2.await; + let _ = handle3.await; + let _ = handle4.await; + }); +} + +/// SDV test cases for task_local on drop process. +/// +/// # Brief +/// 1. Initialize a task_local. +/// 2. Create a DropFuture +/// 3. Drop the Future, Check that the drop process is correct. +#[test] +fn sdv_task_local_available_on_abort() { + ylong_runtime::task_local! { + static FOO: i32; + } + + struct DropFuture { + tx_drop: Option>, + } + + impl Drop for DropFuture { + fn drop(&mut self) { + let _ = self.tx_drop.take().unwrap().send(FOO.get()); + } + } + + impl Future for DropFuture { + type Output = i32; + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } + } + + let (tx_drop, rx_drop) = oneshot::channel(); + let (tx, rx) = oneshot::channel(); + + let handle = ylong_runtime::spawn(FOO.scope(1, async { + FOO.scope(2, async { tx.send(FOO.get()) }).await.unwrap(); + DropFuture { + tx_drop: Some(tx_drop), + } + .await; + })); + ylong_runtime::block_on(async { + assert_eq!(rx.await.unwrap(), 2); + }); + handle.cancel(); + assert_eq!(ylong_runtime::block_on(rx_drop).unwrap(), 1); +} + +/// SDV test cases for task_local on completion . +/// +/// # Brief +/// 1. Initialize a task_local. +/// 2. Create a Future +/// 3. Check that the drop process is correct. +#[test] +fn sdv_task_local_completion_drop() { + ylong_runtime::task_local! { + static KEY: u32; + } + + struct FooFuture { + tx: Option>, + } + + impl Future for FooFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + Poll::Ready(()) + } + } + impl Drop for FooFuture { + fn drop(&mut self) { + let _ = self.tx.take().unwrap().send(KEY.get()); + } + } + + let (tx, rx) = oneshot::channel(); + + ylong_runtime::block_on(async { + ylong_runtime::spawn(KEY.scope(42, FooFuture { tx: Some(tx) })); + assert_eq!(rx.await.unwrap(), 42); + }) +} -- Gitee