From cb1a91c09787bd3d4d99cef291fadf5a3a6eccce Mon Sep 17 00:00:00 2001 From: molotkovmikhail Date: Thu, 19 Jun 2025 19:15:55 +0300 Subject: [PATCH] Worker Thread in TaskManager now uses ring buffer Worker Thread in TaskManager now uses ring buffer Issue: https://gitee.com/openharmony/arkcompiler_runtime_core/issues/ICGQQS Testing: `ninja all tests` Signed-off-by: molotkovmikhail --- .../taskmanager/task_queue_set.cpp | 4 +- .../sp_mc_copy_base_lock_free_ring_buffer.h | 191 ++++++++++++++++++ .../taskmanager/worker_thread.cpp | 15 -- .../libpandabase/taskmanager/worker_thread.h | 10 +- .../tests/taskmanager/task_utils_test.cpp | 47 ++++- 5 files changed, 242 insertions(+), 25 deletions(-) create mode 100644 static_core/libpandabase/taskmanager/utils/sp_mc_copy_base_lock_free_ring_buffer.h diff --git a/static_core/libpandabase/taskmanager/task_queue_set.cpp b/static_core/libpandabase/taskmanager/task_queue_set.cpp index a669f1a5f2..43b507a1bc 100644 --- a/static_core/libpandabase/taskmanager/task_queue_set.cpp +++ b/static_core/libpandabase/taskmanager/task_queue_set.cpp @@ -58,8 +58,8 @@ TaskQueueSet::~TaskQueueSet() TaskQueueInterface *TaskQueueSet::GetQueue(QueueId id) { TASK_MANAGER_CHECK_ID_VALUE(id); - // Atomic with relaxed order reason: no order dependency with another variables - return queues_[id].load(std::memory_order_relaxed); + // Atomic with acquire order reason: load should see init of queue and its registration + return queues_[id].load(std::memory_order_acquire); } TaskQueueInterface *TaskQueueSet::SelectQueue() diff --git a/static_core/libpandabase/taskmanager/utils/sp_mc_copy_base_lock_free_ring_buffer.h b/static_core/libpandabase/taskmanager/utils/sp_mc_copy_base_lock_free_ring_buffer.h new file mode 100644 index 0000000000..a14ec52767 --- /dev/null +++ b/static_core/libpandabase/taskmanager/utils/sp_mc_copy_base_lock_free_ring_buffer.h @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_CORY_BASE_LOCK_FREE_RING_BUFFER_H +#define LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_CORY_BASE_LOCK_FREE_RING_BUFFER_H + +#include "libpandabase/macros.h" +#include "libpandabase/utils/math_helpers.h" +#include "coherency_line_size.h" + +#include +#include +#include + +namespace ark::taskmanager::internal { + +template +class SPMCCopyBaseLockFreeRingBuffer { + static constexpr bool IS_RING_BUFFER_SIZE_POWER_OF_TWO = ark::helpers::math::IsPowerOfTwo(RING_BUFFER_SIZE); + + static_assert(std::is_default_constructible_v, "Contained type should be default constructible"); + static_assert(std::is_copy_assignable_v, "Contained type should be copy assignable"); + +public: + /** + * @brief The method tries add value in ring buffer. This method should be call by only one thread at the same time. + * Method fails if ring buffer is full. + * @param val: pinter to value that should be pushed in ring buffer. + * @returns true if push was successfully complit, otherwise false. + * @see SPMCCopyBaseLockFreeRingBuffer::IsFull() + */ + bool TryPush(T *val); + /** + * @brief The method pushs value in ring buffer. Method will wait while ring buffer is full. As TryPush method it + * shold be called only by one thread at the same time. + * @param val: value that should be pushed in ring buffer. + */ + void Push(T val); + /** + * @brief The method tries to remove value from ring buffer and insert it in gotten space. Method will fail if ring + * buffer is empty. If the method fails, state of gotten space will not be changed. This method can be called by any + * count of threads. + * @param val: pointer to space where should be placed popped value. + * @return true, if the method successfully popped value from ring buffer, otherwise @returns false. + * @see SPMCCopyBaseLockFreeRingBuffer::IsEmpty() + */ + bool TryPop(T *val); + /** + * @brief The method pops value from ring buffer. Method will wait for value if ring buffer is empty. + * @returns value that was popped from ring buffer. + */ + T Pop(); + /// @returns count of elements in ring buffer. The result of this method may vary for different threads. + size_t Size() const; + /** + * @returns true if ring buffer is empty, otherwise false. Remember, ring buffer is multithreading, so returned + * value may not be relevant immediately after returning + */ + bool IsEmpty() const; + /** + * @returns true if ring buffer is full, otherwise false. Remember, ring buffer is multithreading, so returned value + * may not be relevant immediately after returning + */ + bool IsFull() const; + +private: + static size_t GetNodeIndex(size_t index); + + alignas(ark::COHERENCY_LINE_SIZE) std::array buffer_; + alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t head_ = 0; + alignas(ark::COHERENCY_LINE_SIZE) std::atomic_size_t tail_ = 0; +}; + +template +inline void SPMCCopyBaseLockFreeRingBuffer::Push(T val) +{ + while (!TryPush(&val)) { + } +} + +template +inline bool SPMCCopyBaseLockFreeRingBuffer::TryPush(T *val) +{ + // Atomic with relaxed order reason: push do not need guaranties of visibility + auto head = head_.load(std::memory_order_relaxed); + // Atomic with relaxed order reason: push do not need guaranties of visibility + auto tail = tail_.load(std::memory_order_relaxed); + // We can see any tail value in a history of it changes, but not earlier value then last load got, so we should + // check correctnes only in the loop + if (head - tail >= RING_BUFFER_SIZE) { + return false; + } + buffer_[GetNodeIndex(head)] = *val; + // Atomic with release order reason: need to guaranty visibility of stores in the TryPop method + head_.store(head + 1U, std::memory_order_release); + return true; +} + +template +// CC-OFFNXT(G.FUL.06, G.FUD.06): inline key work is used to resolve ODR violation +inline bool SPMCCopyBaseLockFreeRingBuffer::TryPop(T *val) +{ + ASSERT(val != nullptr); + auto oldVal = *val; + // The order of tail and head load is importent, we should guaranty that tail <= head + // Atomic with relaxed order reason: future RMW operation will load last value if it will be needed + auto tail = tail_.load(std::memory_order_relaxed); + // First part of algorithm is getting of head_ and tail_ values, next they will be changed, in while loop + // Atomic with acquire order reason: need to guaranty visibility of stores in buffer_ in the Push method + auto head = head_.load(std::memory_order_acquire); + do { + // We can get any head from history of changes, but not oledr one value than the one we have already gotten. + // It means that we can get head less then tail. So we should retry getting until got correct value. + while (head <= tail) { + // Atomic with acquire order reason: need to guaranty visibility of stores in buffer_ in the Push method + head = head_.load(std::memory_order_acquire); + // Next check also can by false positive, but we should return form queue, if user need to insert it will + // retry. + if (head == tail) { + *val = oldVal; + return false; + } + } + *val = buffer_[GetNodeIndex(tail)]; + // Atomic with relaxed order reason: tail_ no need visibility guaranties, if operation fails, tail will be + // loaded with last value of tail_ + } while (!tail_.compare_exchange_weak(tail, tail + 1U, std::memory_order_relaxed)); + return true; +} + +template +inline T SPMCCopyBaseLockFreeRingBuffer::Pop() +{ + T val; + while (!TryPop(&val)) { + }; + return val; +} + +template +inline size_t SPMCCopyBaseLockFreeRingBuffer::Size() const +{ + // Atomic with relaxed order reason: size do not need guaranties of visibility + auto head = head_.load(std::memory_order_relaxed); + // Atomic with relaxed order reason: size do not need guaranties of visibility + auto tail = tail_.load(std::memory_order_relaxed); + return head - tail; +} + +template +inline bool SPMCCopyBaseLockFreeRingBuffer::IsEmpty() const +{ + return Size() == 0; +} + +template +inline bool SPMCCopyBaseLockFreeRingBuffer::IsFull() const +{ + size_t size = 0; + do { + size = Size(); + // We use such loop to prevent getting incorrect value of tail + } while (size > RING_BUFFER_SIZE); + return size == RING_BUFFER_SIZE; +} + +/*static*/ +template +inline size_t SPMCCopyBaseLockFreeRingBuffer::GetNodeIndex(size_t index) +{ + if constexpr (IS_RING_BUFFER_SIZE_POWER_OF_TWO) { + return index & (RING_BUFFER_SIZE - 1U); + } + return index % RING_BUFFER_SIZE; +} + +} // namespace ark::taskmanager::internal + +#endif // LIBPANDABASE_TASKMANAGER_UTILS_SP_MC_CORY_BASE_LOCK_FREE_RING_BUFFER_H diff --git a/static_core/libpandabase/taskmanager/worker_thread.cpp b/static_core/libpandabase/taskmanager/worker_thread.cpp index 0edd9e2464..489428789e 100644 --- a/static_core/libpandabase/taskmanager/worker_thread.cpp +++ b/static_core/libpandabase/taskmanager/worker_thread.cpp @@ -106,21 +106,6 @@ size_t WorkerThread::ExecuteTasksFromLocalQueue() return executed; } -void WorkerThread::Start() -{ - os::memory::LockHolder lockHolder(startWaitLock_); - start_ = true; - startWaitCondVar_.SignalAll(); -} - -void WorkerThread::WaitForStart() -{ - os::memory::LockHolder lockHolder(startWaitLock_); - while (!start_) { - startWaitCondVar_.Wait(&startWaitLock_); - } -} - std::string WorkerThread::GetWorkerName() const { return name_; diff --git a/static_core/libpandabase/taskmanager/worker_thread.h b/static_core/libpandabase/taskmanager/worker_thread.h index d0e45eb30b..65ea03bf60 100644 --- a/static_core/libpandabase/taskmanager/worker_thread.h +++ b/static_core/libpandabase/taskmanager/worker_thread.h @@ -16,9 +16,8 @@ #ifndef PANDA_LIBPANDABASE_TASKMANAGER_WORKER_THREAD_H #define PANDA_LIBPANDABASE_TASKMANAGER_WORKER_THREAD_H -#include "libpandabase/taskmanager/schedulable_task_queue_interface.h" -#include "libpandabase/taskmanager/utils/two_lock_queue.h" -#include "libpandabase/os/mutex.h" +#include "libpandabase/taskmanager/utils/task_time_stats.h" +#include "libpandabase/taskmanager/utils/sp_mc_copy_base_lock_free_ring_buffer.h" #include namespace ark::taskmanager::internal { @@ -79,16 +78,13 @@ private: */ void WaitForStart(); - using SelfTaskQueue = TwoLockQueue, QueueElemAllocationType::OUTSIDE>; + using SelfTaskQueue = SPMCCopyBaseLockFreeRingBuffer; SelfTaskQueue foregroundTaskQueue_; SelfTaskQueue backgroundTaskQueue_; std::thread *thread_ = nullptr; TaskScheduler *scheduler_ = nullptr; TaskTimeStatsBase *taskTimeStats_ = nullptr; std::string name_; - os::memory::Mutex startWaitLock_; - os::memory::ConditionVariable startWaitCondVar_ GUARDED_BY(startWaitLock_); - bool start_ {false}; std::atomic_bool finish_ {false}; }; diff --git a/static_core/libpandabase/tests/taskmanager/task_utils_test.cpp b/static_core/libpandabase/tests/taskmanager/task_utils_test.cpp index 587ba6636b..bb9fd26db9 100644 --- a/static_core/libpandabase/tests/taskmanager/task_utils_test.cpp +++ b/static_core/libpandabase/tests/taskmanager/task_utils_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024 Huawei Device Co., Ltd. + * Copyright (c) 2023-2025 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -16,6 +16,7 @@ #include "libpandabase/taskmanager/utils/sp_mc_lock_free_queue.h" #include "libpandabase/taskmanager/utils/wait_list.h" #include "libpandabase/os/thread.h" +#include "taskmanager/utils/sp_mc_copy_base_lock_free_ring_buffer.h" #include #include @@ -139,4 +140,48 @@ TEST_F(TaskUtilityTest, WaitListTests) ASSERT_TRUE(value.has_value() && value.value() == RETURN_VALUE_2 && !waitList.HaveReadyValue()); } +TEST_F(TaskUtilityTest, SPMCCopyBaseLockFreeRingBufferTest) +{ + // CC-OFFNXT(G.NAM.03-CPP): static_core files have specifice codestyle + constexpr size_t RING_BUFFER_SIZE = 8U; + internal::SPMCCopyBaseLockFreeRingBuffer ringBuffer; + + // CC-OFFNXT(G.NAM.03-CPP): static_core files have specifice codestyle + constexpr size_t CONSUMER_COUNT = 10U; + std::vector consumers; + std::atomic_size_t countOfPopped = 0; + std::array barray; + + auto consumerBody = [&ringBuffer, &countOfPopped, &barray] { + // Atomic with relaxed order reason: no need order here + while (countOfPopped.load(std::memory_order_relaxed) != ELEMENTS_MAX_COUNT) { + int val; + if (ringBuffer.TryPop(&val)) { + // Atomic with relaxed order reason: no need order here + countOfPopped.fetch_add(1, std::memory_order_relaxed); + barray[val] = true; + } + } + }; + + for (size_t i = 0; i < CONSUMER_COUNT; i++) { + consumers.emplace_back(consumerBody); + } + + for (size_t i = 0; i < ELEMENTS_MAX_COUNT; i++) { + ringBuffer.Push(i); + } + + for (size_t i = 0; i < CONSUMER_COUNT; i++) { + consumers[i].join(); + } + + ASSERT_TRUE(ringBuffer.IsEmpty()); + // Atomic with relaxed order reason: no need order here + ASSERT_EQ(countOfPopped.load(std::memory_order_relaxed), ELEMENTS_MAX_COUNT); + for (size_t i = 0; i < ELEMENTS_MAX_COUNT; i++) { + ASSERT_TRUE(barray[i]); + } +} + } // namespace ark::taskmanager -- Gitee