diff --git a/common_components/BUILD.gn b/common_components/BUILD.gn index f18d39fedb1e758747298d6408454255744a0137..82c53975a71f06b81c9a100cc6a0a8fb75071547 100755 --- a/common_components/BUILD.gn +++ b/common_components/BUILD.gn @@ -53,6 +53,7 @@ source_Heap = [ "heap/collector/task_queue.cpp", "heap/collector/copy_data_manager.cpp", "heap/collector/marking_collector.cpp", + "heap/collector/utils.cpp", "heap/space/from_space.cpp", "heap/space/nonmovable_space.cpp", "heap/space/large_space.cpp", diff --git a/common_components/heap/ark_collector/ark_collector.cpp b/common_components/heap/ark_collector/ark_collector.cpp index 019834557fbbfe88e211f4199d02d8c6091442c4..da5d5370a4637f3eaa7c2743ca2bdb96870cb207 100755 --- a/common_components/heap/ark_collector/ark_collector.cpp +++ b/common_components/heap/ark_collector/ark_collector.cpp @@ -160,8 +160,7 @@ static void MarkingRefField(BaseObject *obj, RefField<> &field, WorkStack &workS DLOG(TRACE, "marking: skip weak obj when full gc, object: %p@%p, targetObj: %p", obj, &field, targetObj); // weak ref is cleared after roots pre-forward, so there might be a to-version weak ref which also need to be // cleared, offset recorded here will help us find it - weakStack.push_back(std::make_shared*, size_t>>( - &field, reinterpret_cast(&field) - reinterpret_cast(obj))); + weakStack.emplace_back(&field, reinterpret_cast(&field) - reinterpret_cast(obj)); return; } @@ -655,6 +654,13 @@ void ArkCollector::PreforwardFlip() if (LIKELY_CC(allocBuffer != nullptr)) { allocBuffer->ClearRegions(); } + if (gcReason_ == GC_REASON_YOUNG || globalWeakStack_.empty()) { + return; + } + ASSERT(std::all_of(globalWeakStack_.begin(), globalWeakStack_.end(), + [](const WeakStack::value_type &pair) { return pair.first == nullptr; }) && + "weak ref must be clear by `TraceCollector::ClearWeakStack` !!"); + globalWeakStack_.clear(); } void ArkCollector::Preforward() diff --git a/common_components/heap/collector/marking_collector.cpp b/common_components/heap/collector/marking_collector.cpp index b8dbc386eba7cdf41fc241733f65867ee4a97932..a643d61cbfe0755778d2173a6084b0653ac93300 100755 --- a/common_components/heap/collector/marking_collector.cpp +++ b/common_components/heap/collector/marking_collector.cpp @@ -17,11 +17,12 @@ #include #include +#include #include "common_components/heap/allocator/alloc_buffer.h" #include "common_components/heap/collector/heuristic_gc_policy.h" #include "common_interfaces/base/runtime_param.h" -#include +#include "common_components/heap/collector/utils.h" namespace common { const size_t MarkingCollector::MAX_MARKING_WORK_SIZE = 16; // fork task if bigger @@ -86,45 +87,6 @@ private: GlobalWorkStackQueue &globalQueue_; }; -class ClearWeakStackTask : public common::Task { -public: - ClearWeakStackTask(uint32_t id, MarkingCollector &tc, Taskpool *pool, TaskPackMonitor &monitor, - GlobalWeakStackQueue &globalQueue) - : Task(id), collector_(tc), threadPool_(pool), monitor_(monitor), globalQueue_(globalQueue) - {} - - // single work task without thread pool - ClearWeakStackTask(uint32_t id, MarkingCollector& tc, TaskPackMonitor &monitor, - GlobalWeakStackQueue &globalQueue) - : Task(id), collector_(tc), threadPool_(nullptr), monitor_(monitor), globalQueue_(globalQueue) - {} - - ~ClearWeakStackTask() override - { - threadPool_ = nullptr; - } - - // run concurrent marking task. - bool Run([[maybe_unused]] uint32_t threadIndex) override - { - while (true) { - WeakStack weakStack = globalQueue_.PopWorkStack(); - if (weakStack.empty()) { - break; - } - collector_.ProcessWeakStack(weakStack); - } - monitor_.NotifyFinishOne(); - return true; - } - -private: - MarkingCollector &collector_; - Taskpool *threadPool_; - TaskPackMonitor &monitor_; - GlobalWeakStackQueue &globalQueue_; -}; - void MarkingCollector::TryForkTask(Taskpool *threadPool, WorkStack &workStack, GlobalWorkStackQueue &globalQueue) { size_t size = workStack.size(); @@ -147,11 +109,12 @@ void MarkingCollector::TryForkTask(Taskpool *threadPool, WorkStack &workStack, G } } -void MarkingCollector::ProcessWeakStack(WeakStack &weakStack) +static void ClearWeakRef(WeakStack::value_type *begin, WeakStack::value_type *end) { - while (!weakStack.empty()) { - auto [fieldPointer, offset] = *weakStack.back(); - weakStack.pop_back(); + for (auto iter = begin; iter != end; ++iter) { + RefField<> *fieldPointer = iter->first; + size_t offset = iter->second; + ASSERT(fieldPointer != nullptr && (*iter = {nullptr, 0}).first == nullptr); // debug only. clear memory ASSERT_LOGF(offset % sizeof(RefField<>) == 0, "offset is not aligned"); RefField<> &field = reinterpret_cast&>(*fieldPointer); @@ -183,6 +146,13 @@ void MarkingCollector::ProcessWeakStack(WeakStack &weakStack) } } +void MarkingCollector::ProcessWeakStack(WeakStack &weakStack) +{ + auto begin = &weakStack[0]; + auto end = begin + weakStack.size(); + ClearWeakRef(begin, end); +} + void MarkingCollector::ProcessMarkStack([[maybe_unused]] uint32_t threadIndex, Taskpool *threadPool, WorkStack &workStack, GlobalWorkStackQueue &globalQueue) { @@ -238,17 +208,13 @@ void MarkingCollector::ProcessMarkStack([[maybe_unused]] uint32_t threadIndex, T MergeWeakStack(weakStack); } -void MarkingCollector::MergeWeakStack(WeakStack& weakStack) +void MarkingCollector::MergeWeakStack(WeakStack &weakStack) { std::lock_guard lock(weakStackLock_); // Preprocess the weak stack to minimize work during STW remark. - while (!weakStack.empty()) { - auto tuple = weakStack.back(); - weakStack.pop_back(); - - auto [weakFieldPointer, _] = *tuple; - RefField<> oldField(*weakFieldPointer); + for (const auto &pair : weakStack) { + RefField<> oldField(*pair.first); if (!Heap::IsTaggedObject(oldField.GetFieldValue())) { continue; @@ -261,7 +227,7 @@ void MarkingCollector::MergeWeakStack(WeakStack& weakStack) continue; } - globalWeakStack_.push_back(tuple); + globalWeakStack_.push_back(pair); } } @@ -361,23 +327,6 @@ bool MarkingCollector::AddConcurrentTracingWork(WorkStack& workStack, GlobalWork return true; } -bool MarkingCollector::AddWeakStackClearWork(WeakStack &weakStack, - GlobalWeakStackQueue &globalQueue, - size_t threadCount) -{ - if (weakStack.size() <= threadCount * MIN_MARKING_WORK_SIZE) { - return false; // too less init tasks, which may lead to workload imbalance, add work rejected - } - DCHECK_CC(threadCount > 0); - const size_t chunkSize = std::min(weakStack.size() / threadCount + 1, MIN_MARKING_WORK_SIZE); - // Split the current work stack into work tasks. - while (!weakStack.empty()) { - WeakStackBuf *hSplit = weakStack.split(chunkSize); - globalQueue.AddWorkStack(WeakStack(hSplit)); - } - return true; -} - bool MarkingCollector::PushRootToWorkStack(RootSet *workStack, BaseObject *obj) { RegionDesc *regionInfo = RegionDesc::GetAliveRegionDescAt(reinterpret_cast(obj)); @@ -454,43 +403,45 @@ void MarkingCollector::Remark() VLOG(DEBUG, "mark %zu objects", markedObjectCount_.load(std::memory_order_relaxed)); } +class ClearWeakRefTask : public ArrayTaskDispatcher::ArrayTask { +public: + using T = WeakStack::value_type; + explicit ClearWeakRefTask(CArrayList *inputs) : data_(inputs) {} + void Run(void *begin, void *end) override + { + T *first = reinterpret_cast(begin); + T *last = reinterpret_cast(end); + ASSERT(((uintptr_t)last - (uintptr_t)first) % sizeof(T) == 0 && + ((uintptr_t)first - (uintptr_t)data_->data()) % sizeof(T) == 0); + ASSERT(first == last || (first >= data_->data() && last > data_->data())); + ASSERT(first == last || (first < (data_->data() + data_->size()) && last <= (data_->data() + data_->size()))); + ClearWeakRef(first, last); + } + CArrayList *data_; +}; + void MarkingCollector::ClearWeakStack(bool parallel) { OHOS_HITRACE(HITRACE_LEVEL_COMMERCIAL, "CMCGC::ProcessGlobalWeakStack", ""); - { - if (gcReason_ == GC_REASON_YOUNG || globalWeakStack_.empty()) { - return; - } - Taskpool *threadPool = GetThreadPool(); - ASSERT_LOGF(threadPool != nullptr, "thread pool is null"); - if (parallel) { - uint32_t parallelCount = GetGCThreadCount(true); - uint32_t threadCount = parallelCount + 1; - TaskPackMonitor monitor(parallelCount, parallelCount); - GlobalWeakStackQueue globalQueue; - for (uint32_t i = 0; i < parallelCount; ++i) { - threadPool->PostTask(std::make_unique(0, *this, threadPool, monitor, globalQueue)); - } - if (!AddWeakStackClearWork(globalWeakStack_, globalQueue, static_cast(threadCount))) { - ProcessWeakStack(globalWeakStack_); - } - bool exitLoop = false; - while (!exitLoop) { - WeakStack stack = globalQueue.DrainAllWorkStack(); - if (stack.empty()) { - exitLoop = true; - } - ProcessWeakStack(stack); - } - globalQueue.NotifyFinish(); - monitor.WaitAllFinished(); - } else { - ProcessWeakStack(globalWeakStack_); - } + if (gcReason_ == GC_REASON_YOUNG || globalWeakStack_.empty()) { + return; + } + // the globalWeakStack_ cannot be modified during task execution + Taskpool *threadPool = GetThreadPool(); + ASSERT_LOGF(threadPool != nullptr, "thread pool is null"); + constexpr size_t BATCH_N = 200; + if (parallel && globalWeakStack_.size() > BATCH_N) { + auto inputs = &globalWeakStack_; + ClearWeakRefTask callback(inputs); + ArrayTaskDispatcher dispatcher(inputs->data(), inputs->size() * sizeof(WeakStack::value_type), + BATCH_N * sizeof(WeakStack::value_type), &callback); + dispatcher.Dispatch(threadPool, threadPool->GetTotalThreadNum()); + dispatcher.Wait(); + } else { + ProcessWeakStack(globalWeakStack_); } } - bool MarkingCollector::MarkSatbBuffer(WorkStack& workStack) { OHOS_HITRACE(HITRACE_LEVEL_COMMERCIAL, "CMCGC::MarkSatbBuffer", ""); diff --git a/common_components/heap/collector/marking_collector.h b/common_components/heap/collector/marking_collector.h index d15c225a879875828c2d272818c482f008980084..81faef06ba9b4621e0bff066f1bd610994a8eacd 100755 --- a/common_components/heap/collector/marking_collector.h +++ b/common_components/heap/collector/marking_collector.h @@ -107,10 +107,9 @@ class ConcurrentMarkingWork; using RootSet = MarkStack; using WorkStack = MarkStack; using WorkStackBuf = MarkStackBuffer; -using WeakStack = MarkStack*, size_t>>>; -using WeakStackBuf = MarkStackBuffer*, size_t>>>; using GlobalWorkStackQueue = GlobalStackQueue; -using GlobalWeakStackQueue = GlobalStackQueue; + +using WeakStack = CArrayList*, size_t>>; class MarkingCollector : public Collector { friend MarkingWork; @@ -323,7 +322,6 @@ protected: void TracingImpl(WorkStack& workStack, bool parallel, bool Remark); bool AddConcurrentTracingWork(WorkStack& workStack, GlobalWorkStackQueue &globalQueue, size_t threadCount); - bool AddWeakStackClearWork(WeakStack& workStack, GlobalWeakStackQueue &globalQueue, size_t threadCount); private: void MarkRememberSetImpl(BaseObject* object, WorkStack& workStack); void ConcurrentRemark(WorkStack& remarkStack, bool parallel); diff --git a/common_components/heap/collector/utils.cpp b/common_components/heap/collector/utils.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ca3a867ca5f9a9ea6270bf235d428efb95291a2f --- /dev/null +++ b/common_components/heap/collector/utils.cpp @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common_components/heap/collector/utils.h" + +namespace common { + +void ArrayTaskDispatcher::Dispatch(Taskpool *pool, int nThread) +{ + aliveTask_ = nThread; + for (int i = 0; i < nThread; ++i) { + pool->PostTask(std::make_unique(this)); + } +} + +void ArrayTaskDispatcher::RunImpl() +{ + void *end = reinterpret_cast(array_ + bytes_); + size_t i = index_.fetch_add(1, std::memory_order_relaxed); + auto iter = TaskAt(i); + while (iter < end) { + consumer_->Run(iter, std::min(TaskAt(i + 1), end)); + i = index_.fetch_add(1, std::memory_order_relaxed); + iter = TaskAt(i); + } +} + +bool ArrayTaskDispatcher::Runner::Run(uint32_t) +{ + manager_->RunImpl(); + std::unique_lock lock(manager_->mtx_); + if ((--manager_->aliveTask_) == 0) { + manager_->cv_.notify_one(); + } + return true; +} + +void ArrayTaskDispatcher::Wait() +{ + RunImpl(); + std::unique_lock lock(mtx_); + cv_.wait(lock, [this]() { return aliveTask_ == 0; }); +} + +} // namespace common diff --git a/common_components/heap/collector/utils.h b/common_components/heap/collector/utils.h new file mode 100644 index 0000000000000000000000000000000000000000..18657588548c8299a3cadc85cdaa5c00535ab004 --- /dev/null +++ b/common_components/heap/collector/utils.h @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef COMMON_COMPONENTS_HEAP_ARK_COLLECTOR_UTILS_H +#define COMMON_COMPONENTS_HEAP_ARK_COLLECTOR_UTILS_H + +#include "common_components/taskpool/taskpool.h" +#include "macros.h" + +namespace common { + +class ArrayTaskDispatcher { +private: + class Runner : public common::Task { + public: + explicit Runner(ArrayTaskDispatcher *dispatcher) : Task(0), manager_(dispatcher) {} + bool Run(uint32_t) override; + ArrayTaskDispatcher *manager_; + }; + +public: + class ArrayTask { + public: + virtual ~ArrayTask() = default; + virtual void Run(void *begin, void *end) = 0; + }; + ArrayTaskDispatcher(void *memory, size_t bytes, size_t taskBytes, ArrayTask *callback) + : index_(0), + array_(reinterpret_cast(memory)), + bytes_(bytes), + batch_(taskBytes), + consumer_(callback), + aliveTask_(0) + { + } + ~ArrayTaskDispatcher() { ASSERT(aliveTask_ == 0); } + void Dispatch(Taskpool *pool, int nThread); + void Wait(); + +private: + void *TaskAt(size_t i) const { return reinterpret_cast(array_ + (i * batch_)); } + void RunImpl(); + + std::mutex mtx_; + std::condition_variable cv_; + std::atomic_size_t index_; + const uintptr_t array_; + const size_t bytes_; + const size_t batch_; + ArrayTask *consumer_; + int aliveTask_; +}; + +} // namespace common + +#endif