diff --git a/js_concurrent_module/taskpool/task.cpp b/js_concurrent_module/taskpool/task.cpp index b8d7e3a66a9e069d28241aec714f80fe04711129..ba1c9eb6f8f68ca1508426c7431a07f4e7495049 100644 --- a/js_concurrent_module/taskpool/task.cpp +++ b/js_concurrent_module/taskpool/task.cpp @@ -118,7 +118,7 @@ void Task::TaskDestructor(napi_env env, void* data, [[maybe_unused]] void* hint) } bool shouldDelete = false; { - std::lock_guard lock(task->taskMutex_); + std::lock_guard lock(task->taskMutex_); task->SetValid(false); if (task->refCount_ == 0) { shouldDelete = true; @@ -141,7 +141,7 @@ void Task::CleanupHookFunc(void* arg) if (task->IsSeqRunnerTask()) { SequenceRunnerManager::GetInstance().RemoveWaitingTask(task); } - std::lock_guard lock(task->taskMutex_); + std::lock_guard lock(task->taskMutex_); if (task->onResultSignal_ != nullptr) { uv_close(reinterpret_cast(task->onResultSignal_), nullptr); } @@ -264,7 +264,7 @@ TaskInfo* Task::GetTaskInfo(napi_env env, napi_value task, Priority priority) return nullptr; } { - std::lock_guard lock(taskMutex_); + std::lock_guard lock(taskMutex_); if (currentTaskInfo_ == nullptr) { currentTaskInfo_ = pendingInfo; } else { @@ -1074,7 +1074,7 @@ void Task::NotifyPendingTask() { HILOG_DEBUG("taskpool:: task:%{public}s NotifyPendingTask", std::to_string(taskId_).c_str()); TaskManager::GetInstance().NotifyDependencyTaskInfo(taskId_); - std::lock_guard lock(taskMutex_); + std::lock_guard lock(taskMutex_); delete currentTaskInfo_; if (pendingTaskInfos_.empty()) { currentTaskInfo_ = nullptr; @@ -1114,12 +1114,9 @@ void Task::CancelPendingTask(napi_env env) bool Task::UpdateTask(uint64_t startTime, void* worker) { HILOG_DEBUG("taskpool:: task:%{public}s UpdateTask", std::to_string(taskId_).c_str()); - if (taskState_ == ExecuteState::CANCELED) { // task may have been canceled - HILOG_INFO("taskpool:: task has been canceled, taskId %{public}s", std::to_string(taskId_).c_str()); - isCancelToFinish_ = true; - return false; + if (taskState_ != ExecuteState::CANCELED) { // task may have been canceled + taskState_ = ExecuteState::RUNNING; } - taskState_ = ExecuteState::RUNNING; startTime_ = startTime; worker_ = worker; return true; @@ -1127,7 +1124,7 @@ bool Task::UpdateTask(uint64_t startTime, void* worker) napi_value Task::DeserializeValue(napi_env env, napi_value* func, napi_value* args) { - std::lock_guard lock(taskMutex_); + std::lock_guard lock(taskMutex_); if (UNLIKELY(currentTaskInfo_ == nullptr)) { HILOG_ERROR("taskpool:: the currentTaskInfo is nullptr, the task may have been cancelled"); return nullptr; @@ -1388,7 +1385,7 @@ void Task::ClearDelayedTimers() HILOG_DEBUG("taskpool:: task ClearDelayedTimers"); std::list deferreds {}; { - std::lock_guard lock(taskMutex_); + std::lock_guard lock(taskMutex_); TaskMessage *taskMessage = nullptr; for (auto t: delayedTimers_) { if (t == nullptr) { @@ -1426,7 +1423,7 @@ bool Task::VerifyAndPostResult(Priority priority) TaskManager::GetInstance().PostTask(onResultTask, "TaskPoolOnResultTask", priority); return true; } else { - std::lock_guard lock(taskMutex_); + std::lock_guard lock(taskMutex_); if (!IsValid() || onResultSignal_ == nullptr || uv_is_closing((uv_handle_t*)onResultSignal_)) { return false; } @@ -1434,7 +1431,7 @@ bool Task::VerifyAndPostResult(Priority priority) return true; } #else - std::lock_guard lock(taskMutex_); + std::lock_guard lock(taskMutex_); if (!IsValid() || onResultSignal_ == nullptr || uv_is_closing((uv_handle_t*)onResultSignal_)) { return false; } @@ -1455,7 +1452,7 @@ void Task::DecreaseTaskRefCount() bool Task::ShouldDeleteTask(bool needUnref) { - std::lock_guard lock(taskMutex_); + std::lock_guard lock(taskMutex_); if (!IsValid()) { HILOG_WARN("taskpool:: task is invalid"); TaskManager::GetInstance().RemoveTask(taskId_); @@ -1488,7 +1485,7 @@ bool Task::CheckStartExecution(Priority priority) if (onStartExecutionSignal_ == nullptr) { return true; } - std::lock_guard lock(taskMutex_); + std::lock_guard lock(taskMutex_); if (!IsValid()) { return false; } @@ -1501,7 +1498,7 @@ bool Task::CheckStartExecution(Priority priority) if (onStartExecutionSignal_ == nullptr) { return true; } - std::lock_guard lock(taskMutex_); + std::lock_guard lock(taskMutex_); if (!IsValid()) { return false; } diff --git a/js_concurrent_module/taskpool/task.h b/js_concurrent_module/taskpool/task.h index 2e4de98b99c49422b694f2037017c621ced5d464..0ec79da0c02b53d4e07a341edf21f67787ed0e22 100644 --- a/js_concurrent_module/taskpool/task.h +++ b/js_concurrent_module/taskpool/task.h @@ -52,12 +52,6 @@ struct TaskInfo { void* serializationArguments = nullptr; }; -#if defined(ENABLE_TASKPOOL_FFRT) -#define RECURSIVE_MUTEX ffrt::recursive_mutex -#else -#define RECURSIVE_MUTEX std::recursive_mutex -#endif - struct ListenerCallBackInfo { ListenerCallBackInfo(napi_env env, napi_ref callbackRef, napi_value taskError) : env_(env), callbackRef_(callbackRef), taskError_(taskError) {} @@ -179,7 +173,7 @@ public: void* worker_ {nullptr}; napi_ref taskRef_ {}; std::atomic taskRefCount_ {}; - RECURSIVE_MUTEX taskMutex_ {}; + std::recursive_mutex taskMutex_ {}; bool hasDependency_ {false}; bool isLongTask_ {false}; std::atomic isValid_ {true}; diff --git a/js_concurrent_module/taskpool/task_group.cpp b/js_concurrent_module/taskpool/task_group.cpp index 4645303fbf86ea10bb276bc7f0d6ba630e5faa55..3b9641a9cf1c1110900bd94294f8cb485f77c273 100644 --- a/js_concurrent_module/taskpool/task_group.cpp +++ b/js_concurrent_module/taskpool/task_group.cpp @@ -172,7 +172,7 @@ uint32_t TaskGroup::GetTaskIndex(uint32_t taskId) void TaskGroup::NotifyGroupTask(napi_env env) { HILOG_DEBUG("taskpool:: NotifyGroupTask"); - std::lock_guard lock(taskGroupMutex_); + std::lock_guard lock(taskGroupMutex_); if (pendingGroupInfos_.empty()) { return; } diff --git a/js_concurrent_module/taskpool/task_group.h b/js_concurrent_module/taskpool/task_group.h index eae786dddfd62535fb27618f573adc886281b711..18aa6e9f659d15ed940a4f4de3e2002350be66bb 100644 --- a/js_concurrent_module/taskpool/task_group.h +++ b/js_concurrent_module/taskpool/task_group.h @@ -81,7 +81,7 @@ public: uint32_t taskNum_ {}; std::atomic groupState_ {ExecuteState::NOT_FOUND}; napi_ref groupRef_ {}; - RECURSIVE_MUTEX taskGroupMutex_ {}; + std::recursive_mutex taskGroupMutex_ {}; std::atomic isValid_ {true}; }; } // namespace Commonlibrary::Concurrent::TaskPoolModule diff --git a/js_concurrent_module/taskpool/task_manager.cpp b/js_concurrent_module/taskpool/task_manager.cpp index b73715a028f1981cd5dd75579a3fd9affc054bbf..9a9e44aeb72ce77baba55f6d5903e34f76628e10 100644 --- a/js_concurrent_module/taskpool/task_manager.cpp +++ b/js_concurrent_module/taskpool/task_manager.cpp @@ -94,7 +94,7 @@ TaskManager::~TaskManager() } { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); for (auto& worker : workers_) { delete worker; } @@ -113,7 +113,7 @@ TaskManager::~TaskManager() } { - std::lock_guard lock(tasksMutex_); + std::lock_guard lock(tasksMutex_); for (auto& [_, task] : tasks_) { delete task; task = nullptr; @@ -125,7 +125,7 @@ TaskManager::~TaskManager() void TaskManager::CountTraceForWorker() { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); int64_t threadNum = static_cast(workers_.size()); int64_t idleWorkers = static_cast(idleWorkers_.size()); int64_t timeoutWorkers = static_cast(timeoutWorkers_.size()); @@ -140,7 +140,7 @@ napi_value TaskManager::GetThreadInfos(napi_env env) napi_value threadInfos = nullptr; napi_create_array(env, &threadInfos); { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); int32_t i = 0; for (auto& worker : workers_) { if (worker->workerEnv_ == nullptr) { @@ -177,7 +177,7 @@ napi_value TaskManager::GetTaskInfos(napi_env env) napi_value taskInfos = nullptr; napi_create_array(env, &taskInfos); { - std::lock_guard lock(tasksMutex_); + std::lock_guard lock(tasksMutex_); int32_t i = 0; for (const auto& [_, task] : tasks_) { if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED || @@ -185,7 +185,7 @@ napi_value TaskManager::GetTaskInfos(napi_env env) continue; } napi_value taskInfoValue = NapiHelper::CreateObject(env); - std::lock_guard lock(task->taskMutex_); + std::lock_guard lock(task->taskMutex_); napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_); napi_value name = nullptr; napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name); @@ -238,7 +238,7 @@ void TaskManager::CheckForBlockedWorkers() // the threshold will be dynamically modified to provide more flexibility in detecting exceptions // if the thread num has reached the limit and the idle worker is not available, a short time will be used, // else we will choose the longer one - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); bool needChecking = false; bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0); uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME; @@ -325,7 +325,7 @@ uint32_t TaskManager::GetIdleWorkers() uint32_t idleCount = 0; std::unordered_set tids {}; { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); for (auto& worker : idleWorkers_) { #if defined(ENABLE_TASKPOOL_FFRT) if (worker->ffrtTaskHandle_ != nullptr) { @@ -427,7 +427,7 @@ void TaskManager::TriggerShrink(uint32_t step) #else uint32_t TaskManager::GetIdleWorkers() { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); return idleWorkers_.size(); } @@ -452,7 +452,7 @@ void TaskManager::TriggerShrink(uint32_t step) void TaskManager::NotifyShrink(uint32_t targetNum) { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); uint32_t workerCount = workers_.size(); uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS; if (minThread == 0) { @@ -529,7 +529,7 @@ void TaskManager::TryExpand() uint32_t idleCount = 0; uint32_t timeoutWorkers = 0; { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); idleCount = std::min(idleNum, static_cast(idleWorkers_.size())); workerCount = workers_.size(); timeoutWorkers = timeoutWorkers_.size(); @@ -633,7 +633,7 @@ void TaskManager::CancelTask(napi_env env, uint64_t taskId) } ExecuteState state = ExecuteState::NOT_FOUND; { - std::lock_guard lock(task->taskMutex_); + std::lock_guard lock(task->taskMutex_); if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) || task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) { @@ -648,7 +648,7 @@ void TaskManager::CancelTask(napi_env env, uint64_t taskId) task->CancelPendingTask(env); std::list deferreds {}; { - std::lock_guard lock(task->taskMutex_); + std::lock_guard lock(task->taskMutex_); if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr && EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) { reinterpret_cast(env)->DecreaseSubEnvCounter(); @@ -682,7 +682,7 @@ void TaskManager::CancelSeqRunnerTask(napi_env env, Task *task) void TaskManager::NotifyWorkerIdle(Worker* worker) { { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); if (worker->state_ == WorkerState::BLOCKED) { return; } @@ -701,21 +701,21 @@ void TaskManager::NotifyWorkerCreated(Worker* worker) void TaskManager::NotifyWorkerAdded(Worker* worker) { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); workers_.insert(worker); HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size()); } void TaskManager::NotifyWorkerRunning(Worker* worker) { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); idleWorkers_.erase(worker); CountTraceForWorker(); } uint32_t TaskManager::GetRunningWorkers() { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) { return worker->runningCount_ != 0; }); @@ -723,7 +723,7 @@ uint32_t TaskManager::GetRunningWorkers() uint32_t TaskManager::GetTimeoutWorkers() { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); return timeoutWorkers_.size(); } @@ -758,7 +758,7 @@ void TaskManager::DecreaseNumIfNoIdle(Priority priority) uint32_t TaskManager::GetThreadNum() { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); return workers_.size(); } @@ -822,7 +822,7 @@ std::pair TaskManager::DequeueTaskId() bool TaskManager::IsChooseIdle() { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); for (auto& worker : workers_) { if (worker->state_ == WorkerState::IDLE) { // If worker->state_ is WorkerState::IDLE, it means that the worker is free @@ -849,7 +849,7 @@ std::pair TaskManager::GetTaskByPriority(const std::unique_p void TaskManager::NotifyExecuteTask() { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) { // When there are only idle tasks and workers executing them, it is not triggered return; @@ -908,7 +908,7 @@ void TaskManager::CreateWorkers(napi_env env, uint32_t num) void TaskManager::RemoveWorker(Worker* worker) { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); idleWorkers_.erase(worker); timeoutWorkers_.erase(worker); workers_.erase(worker); @@ -916,7 +916,7 @@ void TaskManager::RemoveWorker(Worker* worker) void TaskManager::RestoreWorker(Worker* worker) { - std::lock_guard lock(workersMutex_); + std::lock_guard lock(workersMutex_); if (UNLIKELY(suspend_)) { suspend_ = false; uv_timer_again(timer_); @@ -1297,7 +1297,7 @@ uint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType) std::string TaskManager::GetTaskName(uint64_t taskId) { - std::lock_guard lock(tasksMutex_); + std::lock_guard lock(tasksMutex_); auto iter = tasks_.find(taskId); if (iter == tasks_.end()) { return ""; @@ -1439,19 +1439,19 @@ void TaskManager::ReleaseCallBackInfo(Task* task) void TaskManager::StoreTask(uint64_t taskId, Task* task) { - std::lock_guard lock(tasksMutex_); + std::lock_guard lock(tasksMutex_); tasks_.emplace(taskId, task); } void TaskManager::RemoveTask(uint64_t taskId) { - std::lock_guard lock(tasksMutex_); + std::lock_guard lock(tasksMutex_); tasks_.erase(taskId); } Task* TaskManager::GetTask(uint64_t taskId) { - std::lock_guard lock(tasksMutex_); + std::lock_guard lock(tasksMutex_); auto iter = tasks_.find(taskId); if (iter == tasks_.end()) { return nullptr; @@ -1497,7 +1497,7 @@ bool TaskManager::PostTask(std::function task, const char* taskName, Pri bool TaskManager::CheckTask(uint64_t taskId) { - std::lock_guard lock(tasksMutex_); + std::lock_guard lock(tasksMutex_); auto item = tasks_.find(taskId); return item != tasks_.end(); } @@ -1543,7 +1543,7 @@ void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group) HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group"); TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_); { - std::lock_guard lock(group->taskGroupMutex_); + std::lock_guard lock(group->taskGroupMutex_); if (group->isValid_) { for (uint64_t taskId : group->taskIds_) { Task* task = TaskManager::GetInstance().GetTask(taskId); @@ -1574,7 +1574,7 @@ void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId) if (taskGroup->groupState_ == ExecuteState::CANCELED) { return; } - std::lock_guard lock(taskGroup->taskGroupMutex_); + std::lock_guard lock(taskGroup->taskGroupMutex_); if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED) { std::string errMsg = "taskpool:: taskGroup is not executed or has been executed"; @@ -1613,7 +1613,7 @@ void TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* HILOG_INFO("taskpool:: CancelGroupTask task is nullptr"); return; } - std::lock_guard lock(task->taskMutex_); + std::lock_guard lock(task->taskMutex_); if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr && TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) { reinterpret_cast(env)->DecreaseSubEnvCounter(); diff --git a/js_concurrent_module/taskpool/task_manager.h b/js_concurrent_module/taskpool/task_manager.h index 3df6895d319bc0565d0ffb0d16a342b3e2bcc387..7ed5a257f0dcd499361ab2c1dee8334ac63ec4ce 100644 --- a/js_concurrent_module/taskpool/task_manager.h +++ b/js_concurrent_module/taskpool/task_manager.h @@ -183,7 +183,7 @@ private: // std::unordered_map tasks_ {}; - RECURSIVE_MUTEX tasksMutex_; + std::recursive_mutex tasksMutex_; // >, update when removeDependency or executeTask std::unordered_map> dependTaskInfos_ {}; @@ -208,7 +208,7 @@ private: std::unordered_set workers_ {}; std::unordered_set idleWorkers_ {}; std::unordered_set timeoutWorkers_ {}; - RECURSIVE_MUTEX workersMutex_; + std::recursive_mutex workersMutex_; // for load balance napi_env hostEnv_ = nullptr; diff --git a/js_concurrent_module/taskpool/taskpool.cpp b/js_concurrent_module/taskpool/taskpool.cpp index c34440c428580a7fd1ce57390e39da44f7cee980..1b3658659fae4ff2c35a7cc22dc1ede196fa1a68 100644 --- a/js_concurrent_module/taskpool/taskpool.cpp +++ b/js_concurrent_module/taskpool/taskpool.cpp @@ -320,7 +320,7 @@ void TaskPool::DelayTask(uv_timer_t* handle) } } if (task != nullptr) { - std::lock_guard lock(task->taskMutex_); + std::lock_guard lock(task->taskMutex_); task->delayedTimers_.erase(handle); } uv_timer_stop(handle); @@ -363,7 +363,7 @@ napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo) uv_timer_start(timer, reinterpret_cast(DelayTask), delayTime, 0); { - std::lock_guard lock(task->taskMutex_); + std::lock_guard lock(task->taskMutex_); task->delayedTimers_.insert(timer); } NativeEngine* engine = reinterpret_cast(env); @@ -396,7 +396,7 @@ napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priori groupInfo->resArr = arrRef; napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred); { - std::lock_guard lock(taskGroup->taskGroupMutex_); + std::lock_guard lock(taskGroup->taskGroupMutex_); if (taskGroup->taskNum_ == 0) { napi_resolve_deferred(env, groupInfo->deferred, resArr); taskGroup->groupState_ = ExecuteState::FINISHED;