diff --git a/js_concurrent_module/common/helper/concurrent_helper.h b/js_concurrent_module/common/helper/concurrent_helper.h index 63372ef8099866c0a30c0a1068a2d31d39c4b583..2d703eb286db8922c34396b98a4e6d674a761462 100644 --- a/js_concurrent_module/common/helper/concurrent_helper.h +++ b/js_concurrent_module/common/helper/concurrent_helper.h @@ -118,6 +118,13 @@ public: } } + template + static void DeleteHelper(T*& pointer) + { + delete pointer; + pointer = nullptr; + } + #if defined(OHOS_PLATFORM) static std::optional GetSystemMemoryRatio(); static SystemMemoryLevel GetMemoryLevel(); diff --git a/js_concurrent_module/taskpool/BUILD.gn b/js_concurrent_module/taskpool/BUILD.gn index 04007d066bb00a200fcbbb299f1a6e2b701a5353..4aafcc740c31059b12589caefb9f89f2a27178a2 100644 --- a/js_concurrent_module/taskpool/BUILD.gn +++ b/js_concurrent_module/taskpool/BUILD.gn @@ -101,7 +101,6 @@ ohos_shared_library("taskpool") { "bundle_framework:appexecfwk_base", "bundle_framework:appexecfwk_core", "c_utils:utils", - "eventhandler:libeventhandler", "ffrt:libffrt", "hisysevent:libhisysevent", "init:libbegetutil", @@ -160,7 +159,6 @@ ohos_source_set("taskpool_static") { if (is_ohos && is_standard_system && !is_arkui_x) { defines += [ - "ENABLE_TASKPOOL_EVENTHANDLER", "ENABLE_TASKPOOL_FFRT", "ENABLE_TASKPOOL_HISYSEVENT", ] @@ -168,7 +166,6 @@ ohos_source_set("taskpool_static") { "bundle_framework:appexecfwk_base", "bundle_framework:appexecfwk_core", "c_utils:utils", - "eventhandler:libeventhandler", "ffrt:libffrt", "hisysevent:libhisysevent", "init:libbegetutil", diff --git a/js_concurrent_module/taskpool/async_runner.cpp b/js_concurrent_module/taskpool/async_runner.cpp index e18a32798322faf5ac193754a829b81c3c2bfdf1..b4d3507e94d1c1553e03d0c3afb784e259eae16f 100644 --- a/js_concurrent_module/taskpool/async_runner.cpp +++ b/js_concurrent_module/taskpool/async_runner.cpp @@ -24,7 +24,6 @@ namespace Commonlibrary::Concurrent::TaskPoolModule { using namespace Commonlibrary::Concurrent::Common::Helper; -static constexpr char EXECUTE_STR[] = "execute"; napi_value AsyncRunner::AsyncRunnerConstructor(napi_env env, napi_callback_info cbinfo) { @@ -72,7 +71,7 @@ bool AsyncRunner::AsyncRunnerConstructorInner(napi_env env, napi_value& thisVar, napi_value napiAsyncRunnerId = NapiHelper::CreateUint64(env, asyncRunnerId); AsyncRunnerManager::GetInstance().StoreAsyncRunner(asyncRunnerId, asyncRunner); napi_property_descriptor properties[] = { - DECLARE_NAPI_FUNCTION(EXECUTE_STR, Execute), + DECLARE_NAPI_FUNCTION("execute", Execute), }; napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties); HILOG_INFO("taskpool:: construct asyncRunner name is %{public}s, asyncRunnerId %{public}s.", @@ -121,10 +120,10 @@ napi_value AsyncRunner::Execute(napi_env env, napi_callback_info cbinfo) } if (napiPriority != nullptr) { uint32_t priority = NapiHelper::GetUint32Value(env, napiPriority); - task->asyncTaskPriority_ = static_cast(priority); + task->priority_ = static_cast(priority); } task->asyncRunnerId_ = asyncRunner->asyncRunnerId_; - napi_value promise = task->GetTaskInfoPromise(env, napiTask, TaskType::ASYNCRUNNER_TASK, task->asyncTaskPriority_); + napi_value promise = task->GetTaskInfoPromise(env, napiTask, TaskType::ASYNCRUNNER_TASK, task->priority_); if (promise == nullptr) { return nullptr; } @@ -132,9 +131,8 @@ napi_value AsyncRunner::Execute(napi_env env, napi_callback_info cbinfo) return nullptr; } if (!AddTasksToAsyncRunner(asyncRunner, task)) { - ExecuteTaskImmediately(asyncRunner, task); + task->ExecuteTask(); } - return promise; } @@ -209,16 +207,6 @@ bool AsyncRunner::CheckExecuteArgs(napi_env env, napi_value napiTask, napi_value return true; } -void AsyncRunner::ExecuteTaskImmediately(AsyncRunner* asyncRunner, Task* task) -{ - HILOG_DEBUG("taskpool:: task %{public}s in asyncRunner %{public}s immediately.", - std::to_string(task->taskId_).c_str(), std::to_string(asyncRunner->asyncRunnerId_).c_str()); - task->IncreaseRefCount(); - TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_); - task->taskState_ = ExecuteState::WAITING; - TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->asyncTaskPriority_); -} - bool AsyncRunner::AddTasksToAsyncRunner(AsyncRunner* asyncRunner, Task* task) { Task* frontTask = nullptr; @@ -291,25 +279,26 @@ void AsyncRunner::TriggerRejectErrorTimer(Task* task, int32_t errCode, bool isWa task->DiscardAsyncRunnerTask(message); } -void AsyncRunner::TriggerWaitingTask() +void AsyncRunner::TriggerWaitingTask(napi_env env) { std::unique_lock lock(waitingTasksMutex_); DecreaseRunningCount(); Task* task = nullptr; while (runningCount_ < runningCapacity_) { if (waitingTasks_.empty()) { - HILOG_DEBUG("taskpool:: asyncRunner %{public}s empty.", std::to_string(asyncRunnerId_).c_str()); - break; + return; } task = waitingTasks_.front(); waitingTasks_.pop_front(); - runningCount_.fetch_add(1); - task->IncreaseRefCount(); - TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_); - task->taskState_ = ExecuteState::WAITING; - HILOG_DEBUG("taskpool:: Trig task %{public}s in asyncRunner %{public}s.", - std::to_string(task->taskId_).c_str(), std::to_string(asyncRunnerId_).c_str()); - TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->asyncTaskPriority_); + ++runningCount_; + if (task->IsSameEnv(env) || task->onEnqueuedCallBackInfo_ == nullptr) { + task->ExecuteTask(); + } else { + auto asyncTriggerFunc = [task] { + task->ExecuteTask(); + }; + napi_send_event(task->GetEnv(), asyncTriggerFunc, g_napiPriorityMap.at(Priority::DEFAULT)); + } } } diff --git a/js_concurrent_module/taskpool/async_runner.h b/js_concurrent_module/taskpool/async_runner.h index ae66d7e75ed99a9160d264c2a12d8d47033f2df3..335ee085b922438aabef197eb26ed2d983530e25 100644 --- a/js_concurrent_module/taskpool/async_runner.h +++ b/js_concurrent_module/taskpool/async_runner.h @@ -32,7 +32,7 @@ public: static AsyncRunner* CreateGlobalRunner(const std::string& name, uint32_t runningCapacity, uint32_t waitingCapacity); bool RemoveWaitingTask(Task* task, bool isReject = true); void TriggerRejectErrorTimer(Task* task, int32_t errCode, bool isWaiting = false); - void TriggerWaitingTask(); + void TriggerWaitingTask(napi_env env); uint64_t DecreaseAsyncCount(); void IncreaseAsyncCount(); bool CheckGlobalRunnerParams(napi_env env, uint32_t runningCapacity, uint32_t waitingCapacity); @@ -45,7 +45,6 @@ private: AsyncRunner& operator=(AsyncRunner &&) = delete; static bool AsyncRunnerConstructorInner(napi_env env, napi_value& thisVar, AsyncRunner* asyncRunner); - static void ExecuteTaskImmediately(AsyncRunner* asyncRunner, Task* task); static void AsyncRunnerDestructor(napi_env env, void* data, void* hint); static AsyncRunner* CheckAndCreateAsyncRunner(napi_env env, napi_value &thisVar, napi_value name, napi_value runningCapacity, napi_value waitingCapacity); diff --git a/js_concurrent_module/taskpool/async_runner_manager.cpp b/js_concurrent_module/taskpool/async_runner_manager.cpp index e1b91b4f95bd532267736f3b3f4b6616545d7b59..344e5b947b4bc849cc9c81483a6fca8c8ba3b3d9 100644 --- a/js_concurrent_module/taskpool/async_runner_manager.cpp +++ b/js_concurrent_module/taskpool/async_runner_manager.cpp @@ -50,7 +50,6 @@ AsyncRunner* AsyncRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_valu } } } - return asyncRunner; } @@ -84,20 +83,17 @@ bool AsyncRunnerManager::TriggerAsyncRunner(napi_env env, Task* lastTask) return false; } if (UnrefAndDestroyRunner(asyncRunner)) { - HILOG_ERROR("taskpool:: trigger asyncRunner is remove."); + HILOG_DEBUG("taskpool:: trigger asyncRunner is remove."); return false; } - asyncRunner->TriggerWaitingTask(); + asyncRunner->TriggerWaitingTask(env); return true; } void AsyncRunnerManager::RemoveGlobalAsyncRunner(const std::string& name) { std::unique_lock lock(globalAsyncRunnerMutex_); - auto iter = globalAsyncRunner_.find(name); - if (iter != globalAsyncRunner_.end()) { - globalAsyncRunner_.erase(iter); - } + globalAsyncRunner_.erase(name); } void AsyncRunnerManager::CancelAsyncRunnerTask(napi_env env, Task* task) diff --git a/js_concurrent_module/taskpool/sequence_runner.cpp b/js_concurrent_module/taskpool/sequence_runner.cpp index 1ef2040d4d78f44d738ba02f07159945d41d50c9..f370d18d1947374ea8aad4dc36cad8801ee55ad2 100644 --- a/js_concurrent_module/taskpool/sequence_runner.cpp +++ b/js_concurrent_module/taskpool/sequence_runner.cpp @@ -148,9 +148,7 @@ napi_value SequenceRunner::Execute(napi_env env, napi_callback_info cbinfo) HILOG_INFO("taskpool:: taskId %{public}s in seqRunner %{public}s immediately.", std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str()); seqRunner->currentTaskId_ = task->taskId_; - task->IncreaseRefCount(); - task->taskState_ = ExecuteState::WAITING; - ExecuteTaskImmediately(task->taskId_, seqRunner->priority_); + task->ExecuteTask(); } else { HILOG_INFO("taskpool:: add taskId: %{public}s to seqRunner %{public}s.", std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str()); @@ -159,11 +157,6 @@ napi_value SequenceRunner::Execute(napi_env env, napi_callback_info cbinfo) return promise; } -void SequenceRunner::ExecuteTaskImmediately(uint32_t taskId, Priority priority) -{ - TaskManager::GetInstance().EnqueueTaskId(taskId, priority); -} - void SequenceRunner::SequenceRunnerDestructor(napi_env env, void* data, [[maybe_unused]] void* hint) { SequenceRunner* seqRunner = static_cast(data); @@ -205,49 +198,77 @@ void SequenceRunner::AddTask(Task* task) void SequenceRunner::TriggerTask(napi_env env) { - std::list deferreds {}; - { - std::unique_lock lock(seqRunnerMutex_); + // NRVO will work here, do not need the reference + auto deferreds = ProcessCanceledTasks(env); + TryTriggerNextTask(env); + if (!deferreds.empty()) { + TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, + "taskpool:: sequenceRunner task has been canceled"); + } +} + +std::list SequenceRunner::ProcessCanceledTasks(napi_env env) +{ + std::list deferreds; + std::unique_lock lock(seqRunnerMutex_); + if (seqRunnerTasks_.empty()) { + currentTaskId_ = 0; + return deferreds; + } + + Task* task = seqRunnerTasks_.front(); + while (task && task->taskState_ == ExecuteState::CANCELED) { + HandleCanceledTask(task, env, deferreds); + seqRunnerTasks_.pop_front(); + if (seqRunnerTasks_.empty()) { currentTaskId_ = 0; - return; - } - Task* task = seqRunnerTasks_.front(); - seqRunnerTasks_.pop_front(); - bool isEmpty = false; - while (task->taskState_ == ExecuteState::CANCELED) { - if (refCount_ > 0) { - refCount_--; - } - if (task->currentTaskInfo_ != nullptr) { - deferreds.push_back(task->currentTaskInfo_->deferred); - } - if (task->IsSameEnv(env)) { - task->CancelInner(ExecuteState::CANCELED); - } else { - CancelTaskMessage* message = new CancelTaskMessage(ExecuteState::CANCELED, task->taskId_); - task->TriggerCancel(message); - } - - if (seqRunnerTasks_.empty()) { - HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.", - std::to_string(seqRunnerId_).c_str()); - currentTaskId_ = 0; - isEmpty = true; - break; - } - task = seqRunnerTasks_.front(); - seqRunnerTasks_.pop_front(); + task = nullptr; + break; } - if (!isEmpty) { - currentTaskId_ = task->taskId_; - task->IncreaseRefCount(); - task->taskState_ = ExecuteState::WAITING; - HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.", - std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId_).c_str()); - TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority_); + task = seqRunnerTasks_.front(); + } + return deferreds; +} + +void SequenceRunner::HandleCanceledTask(Task* task, napi_env env, std::list& deferreds) +{ + if (refCount_ > 0) { + --refCount_; + } + + if (task->currentTaskInfo_ != nullptr) { + deferreds.push_back(task->currentTaskInfo_->deferred); + } + + if (task->IsSameEnv(env)) { + task->CancelInner(ExecuteState::CANCELED); + } else { + CancelTaskMessage* message = new CancelTaskMessage(ExecuteState::CANCELED, task->taskId_); + task->TriggerCancel(message); + } +} + +void SequenceRunner::TryTriggerNextTask(napi_env env) +{ + std::unique_lock lock(seqRunnerMutex_); + if (seqRunnerTasks_.empty()) { + return; + } + + Task* task = seqRunnerTasks_.front(); + seqRunnerTasks_.pop_front(); + if (task->taskState_ != ExecuteState::CANCELED) { + currentTaskId_ = task->taskId_; + if (task->IsSameEnv(env) || task->onEnqueuedCallBackInfo_ == nullptr) { + task->ExecuteTask(); + return; } + auto asyncTriggerTask = [task]() { + task->ExecuteTask(); + }; + auto napiPrio = g_napiPriorityMap.at(Priority::DEFAULT); + napi_send_event(task->GetEnv(), asyncTriggerTask, napiPrio); } - TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, "taskpool:: sequenceRunner task has been canceled"); } } // namespace Commonlibrary::Concurrent::TaskPoolModule \ No newline at end of file diff --git a/js_concurrent_module/taskpool/sequence_runner.h b/js_concurrent_module/taskpool/sequence_runner.h index 9f5e7ddbc378142f20622c8b246c5e31539a9252..3007f5cf35d6b6a1879fdf72a8e631bda2788f73 100644 --- a/js_concurrent_module/taskpool/sequence_runner.h +++ b/js_concurrent_module/taskpool/sequence_runner.h @@ -42,11 +42,16 @@ private: SequenceRunner(SequenceRunner &&) = delete; SequenceRunner& operator=(SequenceRunner &&) = delete; + std::list ProcessCanceledTasks(napi_env env); + void HandleCanceledTask(Task* task, napi_env env, std::list& deferreds); + void TryTriggerNextTask(napi_env env); + static void ExecuteTaskImmediately(uint32_t taskId, Priority priority); static void SequenceRunnerDestructor(napi_env env, void* data, void* hint); static bool SeqRunnerConstructorInner(napi_env env, napi_value& thisVar, SequenceRunner* seqRunner); friend class NativeEngineTest; + public: uint64_t seqRunnerId_ {}; std::atomic currentTaskId_ {}; diff --git a/js_concurrent_module/taskpool/sequence_runner_manager.cpp b/js_concurrent_module/taskpool/sequence_runner_manager.cpp index 1c55b3305fe341f377537ecd77311592e01e00c8..3ae0230679f5c5a246b5febbfc5032b2cc0fb732 100644 --- a/js_concurrent_module/taskpool/sequence_runner_manager.cpp +++ b/js_concurrent_module/taskpool/sequence_runner_manager.cpp @@ -122,7 +122,7 @@ bool SequenceRunnerManager::TriggerSeqRunner(napi_env env, Task* lastTask) return false; } if (UnrefAndDestroyRunner(seqRunner)) { - HILOG_ERROR("taskpool:: trigger seqRunner is removed."); + HILOG_DEBUG("taskpool:: trigger seqRunner is removed."); return false; } if (seqRunner->currentTaskId_ != lastTask->taskId_) { diff --git a/js_concurrent_module/taskpool/task.cpp b/js_concurrent_module/taskpool/task.cpp index d0e3b011f601c90b18938a25d1953361f28d9fc1..633d54ce69e83d2b92b545b0f3f3015774d991c1 100644 --- a/js_concurrent_module/taskpool/task.cpp +++ b/js_concurrent_module/taskpool/task.cpp @@ -23,6 +23,8 @@ #include "worker.h" namespace Commonlibrary::Concurrent::TaskPoolModule { +using namespace Commonlibrary::Concurrent::Common::Helper; + static constexpr char ONRECEIVEDATA_STR[] = "onReceiveData"; static constexpr char SETTRANSFERLIST_STR[] = "setTransferList"; static constexpr char SET_CLONE_LIST_STR[] = "setCloneList"; @@ -40,8 +42,6 @@ const std::unordered_map g_napiPriorityMap = { {Priority::HIGH, napi_eprio_immediate}, }; -using namespace Commonlibrary::Concurrent::Common::Helper; - napi_value Task::TaskConstructor(napi_env env, napi_callback_info cbinfo) { // check argv count @@ -144,13 +144,11 @@ void Task::CleanupHookFunc(void* arg) Task* task = static_cast(arg); { std::lock_guard lock(task->taskMutex_); - ConcurrentHelper::UvHandleClose(task->onStartCancelSignal_); - ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_); - ConcurrentHelper::UvHandleClose(task->onStartDiscardSignal_); if (task->IsFunctionTask()) { task->SetValid(false); } } + if (task->IsAsyncRunnerTask()) { AsyncRunnerManager::GetInstance().RemoveWaitingTask(task); } @@ -159,30 +157,6 @@ void Task::CleanupHookFunc(void* arg) } } -void Task::Cancel(const uv_async_t* req) -{ - auto message = static_cast(req->data); - if (message == nullptr) { - HILOG_DEBUG("taskpool:: cancel message is nullptr"); - return; - } - Task* task = TaskManager::GetInstance().GetTask(message->taskId); - if (task == nullptr) { - HILOG_DEBUG("taskpool:: cancel task is nullptr"); - CloseHelp::DeletePointer(message, false); - return; - } - napi_status status = napi_ok; - HandleScope scope(task->env_, status); - if (status != napi_ok) { - HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); - CloseHelp::DeletePointer(message, false); - return; - } - task->CancelInner(message->state); - CloseHelp::DeletePointer(message, false); -} - Task* Task::GenerateTask(napi_env env, napi_value napiTask, napi_value func, napi_value name, napi_value* args, size_t argc) { @@ -198,7 +172,7 @@ Task* Task::GenerateTask(napi_env env, napi_value napiTask, napi_value func, Task* task = new Task(env, TaskType::TASK, nameStr); delete[] nameStr; TaskManager::GetInstance().StoreTask(task); - task->InitHandle(env); + task->SetIsMainTask(); napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_); napi_set_named_property(env, napiTask, FUNCTION_STR, func); @@ -244,7 +218,7 @@ Task* Task::GenerateFunctionTask(napi_env env, napi_value func, napi_value* args Task* task = new Task(env, type, nameStr); delete[] nameStr; task->currentTaskInfo_ = taskInfo; - task->InitHandle(env); + task->SetIsMainTask(); if (!task->IsMainThreadTask()) { napi_add_env_cleanup_hook(env, CleanupHookFunc, task); } @@ -501,7 +475,7 @@ napi_value Task::SendData(napi_env env, napi_callback_info cbinfo) napi_status status = napi_serialize_inner(env, argsArray, undefined, undefined, defaultTransfer, defaultClone, &serializationArgs); if (status != napi_ok || serializationArgs == nullptr) { - std::string errMessage = "taskpool:: failed to serialize function"; + std::string errMessage = "taskpool:: failed to serialize arguments"; HILOG_ERROR("%{public}s in SendData", errMessage.c_str()); ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str()); return nullptr; @@ -699,65 +673,25 @@ napi_value Task::RemoveDependency(napi_env env, napi_callback_info cbinfo) return nullptr; } -void Task::StartExecutionCallback(const uv_async_t* req) -{ - HILOG_DEBUG("taskpool:: task StartExecutionCallback"); - auto listenerCallBackInfo = static_cast(req->data); - if (listenerCallBackInfo == nullptr) { // LCOV_EXCL_BR_LINE - HILOG_FATAL("taskpool:: StartExecutionCallBackInfo is null"); - return; - } - StartExecutionTask(listenerCallBackInfo); -} - -void Task::StartExecutionTask(ListenerCallBackInfo* listenerCallBackInfo) +void Task::ExecuteListenerCallback(ListenerCallBackInfo* listenerCallBackInfo) { - auto env = listenerCallBackInfo->env_; + napi_env env = listenerCallBackInfo->env; napi_status status = napi_ok; HandleScope scope(env, status); if (status != napi_ok) { HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); return; } - auto func = NapiHelper::GetReferenceValue(env, listenerCallBackInfo->callbackRef_); + napi_value func = NapiHelper::GetReferenceValue(env, listenerCallBackInfo->callbackRef); if (func == nullptr) { - HILOG_INFO("taskpool:: StartExecutionCallback func is null"); + HILOG_ERROR("taskpool:: ExecuteListenerCallback func is null"); return; } napi_value result; - napi_call_function(env, NapiHelper::GetGlobalObject(env), func, 0, nullptr, &result); - if (NapiHelper::IsExceptionPending(env)) { - napi_value exception = nullptr; - napi_get_and_clear_last_exception(env, &exception); - std::string funcStr = NapiHelper::GetPrintString(env, func); - HILOG_ERROR("taskpool:: an exception has occurred napi_call_function, func is %{public}s", funcStr.c_str()); - } -} - -void Task::ExecuteListenerCallback(ListenerCallBackInfo* listenerCallBackInfo) -{ - HILOG_DEBUG("taskpool:: task ExecuteListenerCallback"); - if (listenerCallBackInfo == nullptr) { // LCOV_EXCL_BR_LINE - HILOG_FATAL("taskpool:: listenerCallBackInfo is null"); - return; - } - - napi_env env = listenerCallBackInfo->env_; - napi_value func = NapiHelper::GetReferenceValue(env, listenerCallBackInfo->callbackRef_); - if (func == nullptr) { - HILOG_INFO("taskpool:: ExecuteListenerCallback func is null"); - return; - } - - napi_value result; - napi_value args = listenerCallBackInfo->taskError_; - if (args != nullptr) { - napi_call_function(env, NapiHelper::GetGlobalObject(env), func, 1, &args, &result); - } else { - napi_call_function(env, NapiHelper::GetGlobalObject(env), func, 0, nullptr, &result); - } - + napi_value args = listenerCallBackInfo->result; + size_t argc = args == nullptr ? 0 : 1; + napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argc, &args, &result); if (NapiHelper::IsExceptionPending(env)) { napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); @@ -838,18 +772,6 @@ napi_value Task::OnStartExecution(napi_env env, napi_callback_info cbinfo) napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, args[0], 1); task->onStartExecutionCallBackInfo_ = new ListenerCallBackInfo(env, callbackRef, nullptr); -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) - if (!task->IsMainThreadTask()) { - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, - Task::StartExecutionCallback, task->onStartExecutionCallBackInfo_); - } -#else - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, - Task::StartExecutionCallback, task->onStartExecutionCallBackInfo_); -#endif - return nullptr; } @@ -1127,7 +1049,7 @@ void Task::NotifyPendingTask() currentTaskInfo_ = pendingTaskInfos_.front(); pendingTaskInfos_.pop_front(); taskState_ = ExecuteState::WAITING; - TaskManager::GetInstance().EnqueueTaskId(taskId_, currentTaskInfo_->priority); + TaskManager::GetInstance().EnqueueTask(this, currentTaskInfo_->priority); } void Task::CancelPendingTask(napi_env env) @@ -1155,7 +1077,7 @@ void Task::CancelPendingTask(napi_env env) TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, error); } -bool Task::UpdateTask(uint64_t startTime, void* worker) +void Task::UpdateRunningInfo(uint64_t startTime, void* worker) { HILOG_DEBUG("taskpool:: task:%{public}s UpdateTask", std::to_string(taskId_).c_str()); if (taskState_ != ExecuteState::CANCELED) { @@ -1163,7 +1085,6 @@ bool Task::UpdateTask(uint64_t startTime, void* worker) } startTime_ = startTime; worker_ = worker; - return true; } napi_value Task::DeserializeValue(napi_env env, napi_value* func, napi_value* args) @@ -1198,7 +1119,7 @@ napi_value Task::DeserializeValue(napi_env env, napi_value* func, napi_value* ar napi_delete_serialization_data(env, serializationArguments); } if (status != napi_ok || args == nullptr) { - errMessage = "taskpool:: failed to deserialize function."; + errMessage = "taskpool:: failed to deserialize arguments."; HILOG_ERROR("%{public}s", errMessage.c_str()); napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str()); success_ = false; @@ -1440,18 +1361,9 @@ void Task::UpdatePeriodicTask() isPeriodicTask_ = true; } -void Task::InitHandle(napi_env env) +void Task::SetIsMainTask() { -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) - if (OHOS::AppExecFwk::EventRunner::IsAppMainThread()) { - isMainThreadTask_ = true; - return; - } -#endif - uv_loop_t* loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, onStartCancelSignal_, Task::Cancel); - ConcurrentHelper::UvHandleInit(loop, onStartDiscardSignal_, Task::DiscardTask); - auto engine = reinterpret_cast(env); + auto engine = reinterpret_cast(env_); isMainThreadTask_ = engine->IsMainThread(); } @@ -1541,53 +1453,33 @@ bool Task::ShouldDeleteTask(bool needUnref) bool Task::CheckStartExecution(Priority priority) { -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) - if (IsMainThreadTask()) { - if (onStartExecutionCallBackInfo_ == nullptr) { - return true; - } - HITRACE_HELPER_METER_NAME("PerformTask: PostTask"); - uint32_t taskId = taskId_; - auto onStartExecutionTask = [taskId]() { - Task* task = TaskManager::GetInstance().GetTask(taskId); - if (task == nullptr || task->onStartExecutionCallBackInfo_ == nullptr) { - return; - } - Task::StartExecutionTask(task->onStartExecutionCallBackInfo_); - }; - TaskManager::GetInstance().PostTask(onStartExecutionTask, "TaskPoolOnStartExecutionTask", priority); - } else { - if (onStartExecutionSignal_ == nullptr) { - return true; - } - std::lock_guard lock(taskMutex_); - if (!IsValid()) { - return false; - } - ConcurrentHelper::UvCheckAndAsyncSend(onStartExecutionSignal_); - } - return true; -#else - if (onStartExecutionSignal_ == nullptr) { + if (onStartExecutionCallBackInfo_ == nullptr) { return true; } - std::lock_guard lock(taskMutex_); - if (!IsValid()) { - return false; - } - ConcurrentHelper::UvCheckAndAsyncSend(onStartExecutionSignal_); - return true; -#endif + + auto onStartExecutionTask = [taskId = taskId_] { + Task* task = TaskManager::GetInstance().GetTask(taskId); + if (task == nullptr || task->onStartExecutionCallBackInfo_ == nullptr) { + HILOG_ERROR("taskpool:: task is not valid or onStartExecutionCallBackInfo_ is null"); + return; + } + Task::ExecuteListenerCallback(task->onStartExecutionCallBackInfo_); + }; + + auto napiPrio = g_napiPriorityMap.at(priority); + uint64_t handleId = 0; + napi_status status = napi_send_event(env_, onStartExecutionTask, napiPrio); + return status == napi_ok ? true : false; } void Task::SetValid(bool isValid) { - isValid_.store(isValid); + isValid_ = isValid; } -bool Task::IsValid() +bool Task::IsValid() const { - return isValid_.load(); + return isValid_; } bool Task::CanForAsyncRunner(napi_env env) @@ -1638,43 +1530,26 @@ void Task::SetTaskId(uint32_t taskId) void Task::TriggerCancel(CancelTaskMessage* message) { -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) - if (IsMainThreadTask()) { - HITRACE_HELPER_METER_NAME("TriggerCancel: PostTask"); - auto onCancelTask = [message]() { - Task* task = TaskManager::GetInstance().GetTask(message->taskId); - if (task == nullptr) { - CloseHelp::DeletePointer(message, false); - return; - } - napi_status status = napi_ok; - HandleScope scope(task->env_, status); - if (status != napi_ok) { - HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); - CloseHelp::DeletePointer(message, false); - return; - } - task->CancelInner(message->state); - CloseHelp::DeletePointer(message, false); - }; - TaskManager::GetInstance().PostTask(onCancelTask, "TaskOnCancelTask", Priority::DEFAULT); - } else { - std::lock_guard lock(taskMutex_); - if (!IsValid() || !ConcurrentHelper::IsUvActive(onStartCancelSignal_)) { + auto onCancelTask = [message] { + ObjectScope nativeScope(message, false); + Task* task = TaskManager::GetInstance().GetTask(message->taskId); + if (task == nullptr) { return; } - onStartCancelSignal_->data = message; - uv_async_send(onStartCancelSignal_); - } -#else - std::lock_guard lock(taskMutex_); - if (!IsValid() || !ConcurrentHelper::IsUvActive(onStartCancelSignal_)) { - CloseHelp::DeletePointer(message, false); + napi_status status = napi_ok; + HandleScope scope(task->env_, status); + if (status != napi_ok) { + HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); + return; + } + task->CancelInner(message->state); + }; + auto napiPrio = g_napiPriorityMap.at(Priority::DEFAULT); + napi_status status = napi_send_event(env_, onCancelTask, napiPrio); + if (status != napi_ok) { + delete message; return; } - onStartCancelSignal_->data = message; - uv_async_send(onStartCancelSignal_); -#endif } void Task::CancelInner(ExecuteState state) @@ -1710,7 +1585,7 @@ void Task::CancelInner(ExecuteState state) TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, error); } -bool Task::IsSameEnv(napi_env env) +bool Task::IsSameEnv(napi_env env) const { return env_ == env; } @@ -1718,53 +1593,43 @@ bool Task::IsSameEnv(napi_env env) void Task::DiscardAsyncRunnerTask(DiscardTaskMessage* message) { if (message == nullptr || !IsAsyncRunnerTask() || !IsValid()) { - CloseHelp::DeletePointer(message, false); + delete message; return; } -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) - if (IsMainThreadTask()) { - HITRACE_HELPER_METER_NAME("DiscardAsyncRunnerTask: PostTask"); - auto onDiscardTask = [message]() { - Task* task = TaskManager::GetInstance().GetTask(message->taskId); - if (task == nullptr) { - CloseHelp::DeletePointer(message, false); - return; - } - napi_status status = napi_ok; - HandleScope scope(task->env_, status); - if (status != napi_ok) { - CloseHelp::DeletePointer(message, false); - HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); - return; - } - task->DiscardInner(message); - }; - TaskManager::GetInstance().PostTask(onDiscardTask, "TaskOnDiscardTask", Priority::DEFAULT); - } else { - std::lock_guard lock(taskMutex_); - if (ConcurrentHelper::IsUvActive(onStartDiscardSignal_)) { - onStartDiscardSignal_->data = message; - uv_async_send(onStartDiscardSignal_); + auto onDiscardTask = [message] { + ObjectScope nativeScope(message, false); + Task* task = TaskManager::GetInstance().GetTask(message->taskId); + if (task == nullptr) { + return; } + napi_status status = napi_ok; + HandleScope scope(task->GetEnv(), status); + if (status != napi_ok) { + HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); + return; + } + napi_value error = ErrorHelper::NewError(task->env_, message->errCode); + napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, error); + task->DisposeCanceledTask(); + TaskManager::GetInstance().RemoveTask(message->taskId); + }; + auto napiPrio = g_napiPriorityMap.at(Priority::DEFAULT); + napi_status status = napi_send_event(env_, onDiscardTask, napiPrio); + if (status != napi_ok) { + delete message; + return; } -#else - std::lock_guard lock(taskMutex_); - if (ConcurrentHelper::IsUvActive(onStartDiscardSignal_)) { - onStartDiscardSignal_->data = message; - uv_async_send(onStartDiscardSignal_); - } -#endif } void Task::DiscardInner(DiscardTaskMessage* message) { if (message == nullptr) { - CloseHelp::DeletePointer(message, false); + delete message; return; } auto task = TaskManager::GetInstance().GetTask(message->taskId); if (task == nullptr || !task->IsValid() || message->env != task->env_) { - CloseHelp::DeletePointer(message, false); + delete message; HILOG_DEBUG("taskpool:: discard task is nullptr."); return; } @@ -1773,53 +1638,12 @@ void Task::DiscardInner(DiscardTaskMessage* message) DisposeCanceledTask(); TaskManager::GetInstance().RemoveTask(message->taskId); - CloseHelp::DeletePointer(message, false); -} - -void Task::DiscardTask(const uv_async_t* req) -{ - auto message = static_cast(req->data); - if (message == nullptr) { - return; - } - auto task = TaskManager::GetInstance().GetTask(message->taskId); - if (task == nullptr || task->env_ != message->env) { - CloseHelp::DeletePointer(message, false); - HILOG_DEBUG("taskpool:: task is nullptr."); - return; - } - napi_status status = napi_ok; - HandleScope scope(task->env_, status); - if (status != napi_ok) { - CloseHelp::DeletePointer(message, false); - HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); - return; - } - - task->DiscardInner(message); + delete message; } void Task::ReleaseData() { std::lock_guard lock(taskMutex_); - if (onStartCancelSignal_ != nullptr) { - if (!ConcurrentHelper::IsUvClosing(onStartCancelSignal_)) { - ConcurrentHelper::UvHandleClose(onStartCancelSignal_); - } else { - delete onStartCancelSignal_; - onStartCancelSignal_ = nullptr; - } - } - - if (onStartDiscardSignal_ != nullptr) { - if (!ConcurrentHelper::IsUvClosing(onStartDiscardSignal_)) { - ConcurrentHelper::UvHandleClose(onStartDiscardSignal_); - } else { - delete onStartDiscardSignal_; - onStartDiscardSignal_ = nullptr; - } - } - if (currentTaskInfo_ != nullptr) { delete currentTaskInfo_; currentTaskInfo_ = nullptr; @@ -1848,4 +1672,30 @@ uint32_t Task::GetTaskId() const { return taskId_; } + +bool Task::IsCanceled() const +{ + return taskState_ == ExecuteState::CANCELED; +} + +void Task::ExecuteTask() +{ + auto priority = (currentTaskInfo_ != nullptr) ? currentTaskInfo_->priority : Priority::DEFAULT; + // tag for trace parse: Task Allocation + std::string strTrace = "Task Allocation: taskId : " + std::to_string(taskId_) + + ", priority : " + std::to_string(priority) + + ", executeState : " + std::to_string(ExecuteState::WAITING); + HITRACE_HELPER_METER_NAME(strTrace); + std::string taskLog = "Task Allocation: " + std::to_string(taskId_) + + ", " + std::to_string(priority); + HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.data()); + IncreaseRefCount(); + TaskManager::GetInstance().IncreaseSendDataRefCount(taskId_); + if (taskState_ == ExecuteState::NOT_FOUND || taskState_ == ExecuteState::FINISHED || + (taskState_ == ExecuteState::CANCELED && isCancelToFinish_)) { + taskState_ = ExecuteState::WAITING; + isCancelToFinish_ = false; + TaskManager::GetInstance().EnqueueTask(this, priority); + } +} } // namespace Commonlibrary::Concurrent::TaskPoolModule \ No newline at end of file diff --git a/js_concurrent_module/taskpool/task.h b/js_concurrent_module/taskpool/task.h index 07a7a1d955dc4f2ee45bfaef35a4b7f98084eeff..6884df5ee397eaa590eafdfa2799f6e0c499018d 100644 --- a/js_concurrent_module/taskpool/task.h +++ b/js_concurrent_module/taskpool/task.h @@ -30,9 +30,6 @@ #include "napi/native_node_api.h" #include "utils.h" #include "tools/log.h" -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) -#include "event_handler.h" -#endif #if defined(ENABLE_TASKPOOL_FFRT) #include "c/executor_task.h" @@ -64,15 +61,15 @@ struct TaskInfo { }; struct ListenerCallBackInfo { - ListenerCallBackInfo(napi_env env, napi_ref callbackRef, napi_value taskError) : env_(env), - callbackRef_(callbackRef), taskError_(taskError) {} + ListenerCallBackInfo(napi_env env, napi_ref callbackRef, napi_value result) : env(env), + callbackRef(callbackRef), result(result) {} ~ListenerCallBackInfo() { - napi_delete_reference(env_, callbackRef_); + napi_delete_reference(env, callbackRef); } - napi_env env_; - napi_ref callbackRef_; - napi_value taskError_; + napi_env env; + napi_ref callbackRef; + napi_value result; }; struct CancelTaskMessage { @@ -131,12 +128,8 @@ public: static void TaskDestructor(napi_env env, void* data, void* hint); static void ThrowNoDependencyError(napi_env env); - static void StartExecutionCallback(const uv_async_t* req); - static void StartExecutionTask(ListenerCallBackInfo* listenerCallBackInfo); static void ExecuteListenerCallback(ListenerCallBackInfo* listenerCallBackInfo); static void CleanupHookFunc(void* arg); - static void Cancel(const uv_async_t* req); - static void DiscardTask(const uv_async_t* req); static bool VerifyAndPostResult(Task* task, Priority priority); void StoreTaskId(uint32_t taskId); @@ -161,7 +154,7 @@ public: bool IsReadyToHandle() const; void NotifyPendingTask(); void CancelPendingTask(napi_env env); - bool UpdateTask(uint64_t startTime, void* worker); + void UpdateRunningInfo(uint64_t startTime, void* worker); napi_value DeserializeValue(napi_env env, napi_value* func, napi_value* args); void StoreTaskDuration(); bool CanForSequenceRunner(napi_env env); @@ -177,14 +170,15 @@ public: void DecreaseTaskLifecycleCount(); bool ShouldDeleteTask(bool needUnref = true); bool CheckStartExecution(Priority priority); - bool IsValid(); + bool IsValid() const; + bool IsCanceled() const; void SetValid(bool isValid); bool CanForAsyncRunner(napi_env env); bool IsAsyncRunnerTask(); void SetTaskId(uint32_t taskId); void TriggerCancel(CancelTaskMessage* message); void CancelInner(ExecuteState state); - bool IsSameEnv(napi_env env); + bool IsSameEnv(napi_env env) const; void DiscardAsyncRunnerTask(DiscardTaskMessage* message); void DiscardInner(DiscardTaskMessage* message); void ReleaseData(); @@ -192,6 +186,7 @@ public: Worker* GetWorker() const; napi_env GetEnv() const; uint32_t GetTaskId() const; + void ExecuteTask(); private: Task(const Task &) = delete; @@ -199,7 +194,7 @@ private: Task(Task &&) = delete; Task& operator=(Task &&) = delete; - void InitHandle(napi_env env); + void SetIsMainTask(); public: napi_env env_ = nullptr; @@ -207,9 +202,11 @@ public: std::string name_ {}; uint32_t taskId_ {}; std::atomic taskState_ {ExecuteState::NOT_FOUND}; - uint64_t groupId_ {}; // 0 for task outside taskgroup - uint64_t seqRunnerId_ {}; // 0 for task without seqRunner - uint64_t asyncRunnerId_ {}; // 0 for task without asyncRunner + union { + uint64_t groupId_; + uint64_t seqRunnerId_; + uint64_t asyncRunnerId_; + }; TaskInfo* currentTaskInfo_ {}; std::list pendingTaskInfos_ {}; // for a common task executes multiple times void* result_ = nullptr; @@ -227,9 +224,6 @@ public: bool defaultCloneSendable_ {false}; std::atomic isValid_ {true}; std::atomic lifecycleCount_ {0}; // when lifecycleCount_ is 0, the task pointer can be deleted - uv_async_t* onStartExecutionSignal_ = nullptr; - uv_async_t* onStartCancelSignal_ = nullptr; - uv_async_t* onStartDiscardSignal_ = nullptr; ListenerCallBackInfo* onEnqueuedCallBackInfo_ = nullptr; ListenerCallBackInfo* onStartExecutionCallBackInfo_ = nullptr; ListenerCallBackInfo* onExecutionFailedCallBackInfo_ = nullptr; @@ -240,12 +234,11 @@ public: // periodic task first Generate TaskInfo std::atomic isFirstTaskInfo_ {false}; uv_timer_t* timer_ {nullptr}; - Priority periodicTaskPriority_ {Priority::DEFAULT}; + Priority priority_ {Priority::DEFAULT}; std::set delayedTimers_ {}; // task delayed timer bool isMainThreadTask_ {false}; - Priority asyncTaskPriority_ {Priority::DEFAULT}; std::atomic isCancelToFinish_ {false}; }; diff --git a/js_concurrent_module/taskpool/task_group.cpp b/js_concurrent_module/taskpool/task_group.cpp index f5d61fe71a9d6222ebad2dbd2f92d6f8803db1b6..f05a52f4dda87b48e6ca2acbf3c1096e81473d70 100644 --- a/js_concurrent_module/taskpool/task_group.cpp +++ b/js_concurrent_module/taskpool/task_group.cpp @@ -32,58 +32,56 @@ napi_value TaskGroup::TaskGroupConstructor(napi_env env, napi_callback_info cbin return nullptr; } napi_value name; - if (argc == 1) { + if (argc == 0) { + name = NapiHelper::CreateEmptyString(env); + } else { // check 1st param is taskGroupName if (!NapiHelper::IsString(env, args[0])) { ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the first param must be string."); return nullptr; } name = args[0]; - } else { - name = NapiHelper::CreateEmptyString(env); } + TaskGroup* group = new TaskGroup(env); - uint64_t groupId = reinterpret_cast(group); - group->groupId_ = groupId; - TaskGroupManager::GetInstance().StoreTaskGroup(groupId, group); - group->InitHandle(env); + uint64_t groupId = group->GetGroupId(); napi_value napiGroupId = NapiHelper::CreateUint64(env, groupId); napi_property_descriptor properties[] = { - DECLARE_NAPI_PROPERTY(GROUP_ID_STR, napiGroupId), + DECLARE_NAPI_PROPERTY("groupId", napiGroupId), + DECLARE_NAPI_PROPERTY("name", name), DECLARE_NAPI_FUNCTION_WITH_DATA("addTask", AddTask, thisVar), }; - napi_set_named_property(env, thisVar, NAME, name); napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties); napi_status status = napi_wrap(env, thisVar, group, TaskGroupDestructor, nullptr, nullptr); if (status != napi_ok) { HILOG_ERROR("taskpool::TaskGroupConstructor napi_wrap return value is %{public}d", status); - TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_); delete group; group = nullptr; return nullptr; } - napi_create_reference(env, thisVar, 0, &group->groupRef_); + TaskGroupManager::GetInstance().StoreTaskGroup(groupId, group); + auto& ref = group->GetGroupRef(); + napi_create_reference(env, thisVar, 0, &ref); napi_add_env_cleanup_hook(env, TaskGroup::HostEnvCleanupHook, group); return thisVar; } void TaskGroup::TaskGroupDestructor(napi_env env, void* data, [[maybe_unused]] void* hint) { - HILOG_DEBUG("taskpool::TaskGroupDestructor"); TaskGroup* group = static_cast(data); napi_remove_env_cleanup_hook(env, TaskGroup::HostEnvCleanupHook, group); TaskGroupManager::GetInstance().ReleaseTaskGroupData(env, group); - napi_delete_reference(env, group->groupRef_); + auto& ref = group->GetGroupRef(); + napi_delete_reference(env, ref); delete group; } napi_value TaskGroup::AddTask(napi_env env, napi_callback_info cbinfo) { size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo); - std::string errMessage = ""; if (argc < 1) { - errMessage = "taskGroup:: the number of params must be at least one"; - HILOG_ERROR("%{public}s", errMessage.c_str()); + std::string_view errMessage = "taskGroup:: the number of params must be at least one"; + HILOG_ERROR("%{public}s", errMessage.data()); ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one."); return nullptr; } @@ -91,15 +89,19 @@ napi_value TaskGroup::AddTask(napi_env env, napi_callback_info cbinfo) ObjectScope scope(args, true); napi_value thisVar; napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr); - napi_value napiGroupId = NapiHelper::GetNameProperty(env, thisVar, GROUP_ID_STR); - uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId); - TaskGroup* group = TaskGroupManager::GetInstance().GetTaskGroup(groupId); - if (group == nullptr || group->groupState_ != ExecuteState::NOT_FOUND) { - errMessage = "taskpool:: executed taskGroup cannot addTask"; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str()); + TaskGroup* group = nullptr; + napi_unwrap(env, thisVar, reinterpret_cast(&group)); + if (group == nullptr) { + HILOG_FATAL("taskpool:: group is nullptr"); + return nullptr; + } + if (group->GetGroupState() != ExecuteState::NOT_FOUND) { + std::string_view errMessage = "taskpool:: executed taskGroup cannot addTask"; + HILOG_ERROR("%{public}s", errMessage.data()); + ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.data()); return nullptr; } + auto groupId = group->GetGroupId(); napi_valuetype type = napi_undefined; napi_typeof(env, args[0], &type); if (type == napi_object) { @@ -115,9 +117,10 @@ napi_value TaskGroup::AddTask(napi_env env, napi_callback_info cbinfo) task->taskType_ = TaskType::GROUP_COMMON_TASK; task->groupId_ = groupId; napi_reference_ref(env, task->taskRef_, nullptr); - TaskGroupManager::GetInstance().AddTask(groupId, task->taskRef_, task->taskId_); + group->AddTask(task->taskRef_, task->taskId_); return nullptr; - } else if (type == napi_function) { + } + if (type == napi_function) { napi_value napiTask = NapiHelper::CreateObject(env); Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::GROUP_FUNCTION_TASK); if (task == nullptr) { @@ -133,7 +136,7 @@ napi_value TaskGroup::AddTask(napi_env env, napi_callback_info cbinfo) return nullptr; } napi_create_reference(env, napiTask, 1, &task->taskRef_); - TaskGroupManager::GetInstance().AddTask(groupId, task->taskRef_, task->taskId_); + group->AddTask(task->taskRef_, task->taskId_); return nullptr; } ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be object or function."); @@ -143,46 +146,21 @@ napi_value TaskGroup::AddTask(napi_env env, napi_callback_info cbinfo) void TaskGroup::HostEnvCleanupHook(void* data) { if (data == nullptr) { - HILOG_ERROR("taskpool:: taskGroup cleanupHook arg is nullptr"); return; } TaskGroup* group = static_cast(data); std::lock_guard lock(group->taskGroupMutex_); - ConcurrentHelper::UvHandleClose(group->onRejectResultSignal_); group->isValid_ = false; } -void TaskGroup::StartRejectResult(const uv_async_t* req) -{ - auto* group = static_cast(req->data); - if (group == nullptr) { - HILOG_DEBUG("taskpool::StartRejectResult group is nullptr"); - return; - } - napi_status status = napi_ok; - HandleScope scope(group->env_, status); - if (status != napi_ok) { - HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); - return; - } - group->RejectResult(group->env_); -} - uint32_t TaskGroup::GetTaskIndex(uint32_t taskId) { - uint32_t index = 0; - for (uint32_t id : taskIds_) { - if (taskId == id) { - break; - } - index++; - } - return index; + auto it = std::find(taskIds_.begin(), taskIds_.end(), taskId); + return (it != taskIds_.end()) ? std::distance(taskIds_.begin(), it) : taskIds_.size(); } void TaskGroup::NotifyGroupTask(napi_env env) { - HILOG_DEBUG("taskpool:: NotifyGroupTask"); std::lock_guard lock(taskGroupMutex_); if (pendingGroupInfos_.empty()) { return; @@ -195,7 +173,7 @@ void TaskGroup::NotifyGroupTask(napi_env env) Task* task = nullptr; napi_unwrap(env, napiTask, reinterpret_cast(&task)); if (task == nullptr) { - HILOG_ERROR("taskpool::ExecuteGroup task is nullptr"); + HILOG_ERROR("taskpool:: ExecuteGroup task is nullptr"); return; } napi_reference_ref(env, task->taskRef_, nullptr); @@ -205,35 +183,28 @@ void TaskGroup::NotifyGroupTask(napi_env env) } else { reinterpret_cast(env)->IncreaseSubEnvCounter(); } - task->IncreaseRefCount(); - TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_); - task->taskState_ = ExecuteState::WAITING; - TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority); + task->ExecuteTask(); } } void TaskGroup::CancelPendingGroup(napi_env env) { - HILOG_DEBUG("taskpool:: CancelPendingGroup"); std::list deferreds {}; { std::lock_guard lock(taskGroupMutex_); if (pendingGroupInfos_.empty()) { return; } - auto pendingIter = pendingGroupInfos_.begin(); auto engine = reinterpret_cast(env); - for (; pendingIter != pendingGroupInfos_.end(); ++pendingIter) { + for (auto* info : pendingGroupInfos_) { for (size_t i = 0; i < taskIds_.size(); i++) { engine->DecreaseSubEnvCounter(); } - GroupInfo* info = *pendingIter; deferreds.push_back(info->deferred); napi_reference_unref(env, groupRef_, nullptr); delete info; } - pendingIter = pendingGroupInfos_.begin(); - pendingGroupInfos_.erase(pendingIter, pendingGroupInfos_.end()); + pendingGroupInfos_.clear(); } TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, "taskpool:: taskGroup has been canceled"); } @@ -245,7 +216,22 @@ void TaskGroup::CancelGroupTask(napi_env env, uint32_t taskId) RejectResult(env); return; } - TriggerRejectResult(); + auto cancelGroup = [groupId = groupId_] { + TaskGroup* group = TaskGroupManager::GetInstance().GetTaskGroup(groupId); + if (group == nullptr) { + HILOG_WARN("taskpool:: group is nullptr"); + return; + } + napi_status status = napi_ok; + HandleScope scope(group->env_, status); + if (status != napi_ok) { + HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); + return; + } + group->RejectResult(group->env_); + }; + auto napiPrio = g_napiPriorityMap.at(Priority::DEFAULT); + napi_send_event(env_, cancelGroup, napiPrio); } void TaskGroup::RejectResult(napi_env env, napi_value res) @@ -268,26 +254,83 @@ void TaskGroup::RejectResult(napi_env env) napi_reference_unref(env, groupRef_, nullptr); delete currentGroupInfo_; currentGroupInfo_ = nullptr; + } else { + return; } } - std::string error = "taskpool:: taskGroup has been canceled"; + std::string_view error = "taskpool:: taskGroup has been canceled"; TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, error); } -void TaskGroup::InitHandle(napi_env env) +bool TaskGroup::IsSameEnv(napi_env env) const { - uv_loop_t* loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, onRejectResultSignal_, TaskGroup::StartRejectResult, this); + return env_ == env; } -void TaskGroup::TriggerRejectResult() +void TaskGroup::AddTask(napi_ref taskRef, uint32_t taskId) { + taskRefs_.push_back(taskRef); + taskNum_++; + taskIds_.push_back(taskId); +} + +void TaskGroup::Cancel() +{ + if (groupState_ == ExecuteState::CANCELED) { + return; + } + { + std::lock_guard lock(taskGroupMutex_); + if (currentGroupInfo_ == nullptr || groupState_ == ExecuteState::NOT_FOUND || + groupState_ == ExecuteState::FINISHED) { + std::string_view errMsg = "taskpool:: taskGroup is not executed or has been executed"; + HILOG_ERROR("%{public}s", errMsg.data()); + ErrorHelper::ThrowError(env_, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.data()); + return; + } + } + ExecuteState groupState = groupState_.exchange(ExecuteState::CANCELED); + CancelPendingGroup(env_); std::lock_guard lock(taskGroupMutex_); - ConcurrentHelper::UvCheckAndAsyncSend(onRejectResultSignal_); + if (currentGroupInfo_->finishedTaskNum != taskNum_) { + for (uint32_t taskId : taskIds_) { + CancelAllTasks(taskId); + } + if (currentGroupInfo_->finishedTaskNum == taskNum_) { + napi_value error = ErrorHelper::NewError(env_, 0, "taskpool:: taskGroup has been canceled"); + RejectResult(env_, error); + return; + } + } + if (groupState == ExecuteState::WAITING && currentGroupInfo_ != nullptr) { + auto engine = reinterpret_cast(env_); + for (size_t i = 0; i < taskIds_.size(); i++) { + engine->DecreaseSubEnvCounter(); + } + napi_value error = ErrorHelper::NewError(env_, 0, "taskpool:: taskGroup has been canceled"); + RejectResult(env_, error); + } } -bool TaskGroup::IsSameEnv(napi_env env) +void TaskGroup::CancelAllTasks(uint32_t taskId) { - return env_ == env; + auto task = TaskManager::GetInstance().GetTask(taskId); + if (task == nullptr) { + HILOG_WARN("taskpool:: CancelGroupTask task is nullptr"); + return; + } + std::lock_guard lock(task->taskMutex_); + if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr && + TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) { + reinterpret_cast(env_)->DecreaseSubEnvCounter(); + task->DecreaseTaskLifecycleCount(); + TaskManager::GetInstance().DecreaseSendDataRefCount(env_, taskId); + delete task->currentTaskInfo_; + task->currentTaskInfo_ = nullptr; + if (currentGroupInfo_ != nullptr) { + currentGroupInfo_->finishedTaskNum++; + } + } + task->taskState_ = ExecuteState::CANCELED; } } // namespace Commonlibrary::Concurrent::TaskPoolModule \ No newline at end of file diff --git a/js_concurrent_module/taskpool/task_group.h b/js_concurrent_module/taskpool/task_group.h index e5dab8d4ff1132613af4d5145e11e46aff5ccd0b..9990163e531defb09beab8e768f566f73b19d923 100644 --- a/js_concurrent_module/taskpool/task_group.h +++ b/js_concurrent_module/taskpool/task_group.h @@ -17,6 +17,7 @@ #define JS_CONCURRENT_MODULE_TASKPOOL_TASK_GROUP_H #include +#include #include "task.h" #include "task_manager.h" @@ -50,14 +51,13 @@ private: class TaskGroup { public: - explicit TaskGroup(napi_env env) : env_(env) {} + explicit TaskGroup(napi_env env) : env_(env), groupId_(reinterpret_cast(this)) {} TaskGroup() = default; ~TaskGroup() = default; static napi_value TaskGroupConstructor(napi_env env, napi_callback_info cbinfo); static napi_value AddTask(napi_env env, napi_callback_info cbinfo); static void HostEnvCleanupHook(void* data); - static void StartRejectResult(const uv_async_t* req); uint32_t GetTaskIndex(uint32_t taskId); void NotifyGroupTask(napi_env env); @@ -66,8 +66,30 @@ public: void RejectResult(napi_env env, napi_value res); void RejectResult(napi_env env); void InitHandle(napi_env env); - void TriggerRejectResult(); - bool IsSameEnv(napi_env env); + bool IsSameEnv(napi_env env) const; + void AddTask(napi_ref taskRef, uint32_t taskId); + void Cancel(); + void CancelAllTasks(uint32_t taskId); + + uint64_t GetGroupId() const + { + return groupId_; + } + + napi_env GetEnv() const + { + return env_; + } + + napi_ref& GetGroupRef() + { + return groupRef_; + } + + ExecuteState GetGroupState() const + { + return groupState_; + } private: TaskGroup(const TaskGroup &) = delete; @@ -89,7 +111,6 @@ public: std::atomic groupState_ {ExecuteState::NOT_FOUND}; napi_ref groupRef_ {}; std::recursive_mutex taskGroupMutex_ {}; - uv_async_t* onRejectResultSignal_ = nullptr; std::atomic isValid_ {true}; }; } // namespace Commonlibrary::Concurrent::TaskPoolModule diff --git a/js_concurrent_module/taskpool/task_group_manager.cpp b/js_concurrent_module/taskpool/task_group_manager.cpp index c5d1e0d4609563e48c1d4f4f3bf74fc3f1aecfff..fcfc873a1c76d6382873fb074068ecf6c5feb947 100644 --- a/js_concurrent_module/taskpool/task_group_manager.cpp +++ b/js_concurrent_module/taskpool/task_group_manager.cpp @@ -24,38 +24,11 @@ TaskGroupManager& TaskGroupManager::GetInstance() return groupManager; } -void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint32_t taskId) -{ - std::lock_guard lock(taskGroupsMutex_); - auto groupIter = taskGroups_.find(groupId); - if (groupIter == taskGroups_.end()) { - HILOG_DEBUG("taskpool:: taskGroup has been released"); - return; - } - auto taskGroup = reinterpret_cast(groupIter->second); - if (taskGroup == nullptr) { - HILOG_ERROR("taskpool:: taskGroup is null"); - return; - } - taskGroup->taskRefs_.push_back(taskRef); - taskGroup->taskNum_++; - taskGroup->taskIds_.push_back(taskId); -} - void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group) { - HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group"); - TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_); + RemoveTaskGroup(group->groupId_); { std::lock_guard lock(group->taskGroupMutex_); - if (group->onRejectResultSignal_ != nullptr) { - if (!ConcurrentHelper::IsUvClosing(group->onRejectResultSignal_)) { - ConcurrentHelper::UvHandleClose(group->onRejectResultSignal_); - } else { - delete group->onRejectResultSignal_; - group->onRejectResultSignal_ = nullptr; - } - } if (group->isValid_) { for (uint32_t taskId : group->taskIds_) { Task* task = TaskManager::GetInstance().GetTask(taskId); @@ -65,10 +38,7 @@ void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group) napi_reference_unref(task->env_, task->taskRef_, nullptr); } } - if (group->currentGroupInfo_ != nullptr) { - delete group->currentGroupInfo_; - group->currentGroupInfo_ = nullptr; - } + ConcurrentHelper::DeleteHelper(group->currentGroupInfo_); } group->CancelPendingGroup(env); } @@ -83,46 +53,11 @@ void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId) HILOG_ERROR("taskpool:: CancelGroup group is nullptr"); return; } - if (taskGroup->groupState_ == ExecuteState::CANCELED) { - return; - } - { - std::lock_guard lock(taskGroup->taskGroupMutex_); - if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND || - taskGroup->groupState_ == ExecuteState::FINISHED) { - std::string errMsg = "taskpool:: taskGroup is not executed or has been executed"; - HILOG_ERROR("%{public}s", errMsg.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str()); - return; - } - } - ExecuteState groupState = taskGroup->groupState_; - taskGroup->groupState_ = ExecuteState::CANCELED; - taskGroup->CancelPendingGroup(env); - std::lock_guard lock(taskGroup->taskGroupMutex_); - if (taskGroup->currentGroupInfo_->finishedTaskNum != taskGroup->taskNum_) { - for (uint32_t taskId : taskGroup->taskIds_) { - CancelGroupTask(env, taskId, taskGroup); - } - if (taskGroup->currentGroupInfo_->finishedTaskNum == taskGroup->taskNum_) { - napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled"); - taskGroup->RejectResult(env, error); - return; - } - } - if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) { - auto engine = reinterpret_cast(env); - for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) { - engine->DecreaseSubEnvCounter(); - } - napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled"); - taskGroup->RejectResult(env, error); - } + taskGroup->Cancel(); } void TaskGroupManager::CancelGroupTask(napi_env env, uint32_t taskId, TaskGroup* group) { - HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str()); auto task = TaskManager::GetInstance().GetTask(taskId); if (task == nullptr) { HILOG_INFO("taskpool:: CancelGroupTask task is nullptr"); @@ -159,15 +94,11 @@ TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId) { std::lock_guard lock(taskGroupsMutex_); auto groupIter = taskGroups_.find(groupId); - if (groupIter == taskGroups_.end()) { - return nullptr; - } - return reinterpret_cast(groupIter->second); + return groupIter != taskGroups_.end() ? groupIter->second : nullptr; } bool TaskGroupManager::UpdateGroupState(uint64_t groupId) { - HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str()); // During the modification process of the group, prevent other sub threads from performing other // operations on the group pointer, which may cause the modification to fail. std::lock_guard lock(taskGroupsMutex_); @@ -177,7 +108,7 @@ bool TaskGroupManager::UpdateGroupState(uint64_t groupId) } TaskGroup* group = reinterpret_cast(groupIter->second); if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) { - HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled"); + HILOG_WARN("taskpool:: UpdateGroupState taskGroup has been released or canceled"); return false; } group->groupState_ = ExecuteState::RUNNING; diff --git a/js_concurrent_module/taskpool/task_group_manager.h b/js_concurrent_module/taskpool/task_group_manager.h index f1c70a3aa584feb0b5db020f26a22560b96ce847..0e4c6548f88e214b40596788e101953edd1aecfd 100644 --- a/js_concurrent_module/taskpool/task_group_manager.h +++ b/js_concurrent_module/taskpool/task_group_manager.h @@ -23,7 +23,6 @@ class TaskGroupManager { public: static TaskGroupManager& GetInstance(); - void AddTask(uint64_t groupId, napi_ref taskRef, uint32_t taskId); void StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup); void RemoveTaskGroup(uint64_t groupId); TaskGroup* GetTaskGroup(uint64_t groupId); diff --git a/js_concurrent_module/taskpool/task_manager.cpp b/js_concurrent_module/taskpool/task_manager.cpp index 091b030d86b5d416f3185a12e8a3a265d12d2e57..d2901cd00d29dc23c0df326c921a9c2ce48a85ae 100644 --- a/js_concurrent_module/taskpool/task_manager.cpp +++ b/js_concurrent_module/taskpool/task_manager.cpp @@ -54,18 +54,8 @@ static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every static constexpr uint32_t MAX_UINT32_T = 0xFFFFFFFF; // 0xFFFFFFFF: max uint32_t static constexpr uint32_t TRIGGER_EXPAND_INTERVAL = 10; // 10: ms, trigger recheck expansion interval [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread -static constexpr char ON_CALLBACK_STR[] = "TaskPoolOnCallbackTask"; static constexpr uint32_t UNEXECUTE_TASK_TIME = 60000; // 60000: 1min -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) -static const std::map TASK_EVENTHANDLER_PRIORITY_MAP = { - {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE}, - {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW}, - {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH}, - {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE}, -}; -#endif - // ----------------------------------- TaskManager ---------------------------------------- TaskManager& TaskManager::GetInstance() { @@ -796,19 +786,20 @@ uint32_t TaskManager::GetThreadNum() return workers_.size(); } -void TaskManager::EnqueueTaskId(uint32_t taskId, Priority priority) +void TaskManager::EnqueueTask(Task* task, Priority priority) { + if (UNLIKELY(task == nullptr)) { + HILOG_FATAL("taskpool:: task is nullptr"); + return; + } + { std::lock_guard lock(taskQueuesMutex_); IncreaseNumIfNoIdle(priority); - taskQueues_[priority]->EnqueueTaskId(taskId); + taskQueues_[priority]->EnqueueTaskId(task->GetTaskId()); } + TryTriggerExpand(); - Task* task = GetTask(taskId); - if (task == nullptr) { - HILOG_FATAL("taskpool:: task is nullptr"); - return; - } task->IncreaseTaskLifecycleCount(); if (task->onEnqueuedCallBackInfo_ != nullptr) { task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_); @@ -912,10 +903,6 @@ void TaskManager::InitTaskManager(napi_env env) } else { HILOG_INFO("taskpool:: apps do not use ffrt"); } -#endif -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) - mainThreadHandler_ = std::make_shared( - OHOS::AppExecFwk::EventRunner::GetMainEventRunner()); #endif auto mainThreadEngine = NativeEngine::GetMainThreadEngine(); if (mainThreadEngine == nullptr) { @@ -1041,7 +1028,7 @@ void TaskManager::ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, Task auto priority = g_napiPriorityMap.at(worker->GetPriority()); uint64_t handleId = 0; napi_status status = napi_send_cancelable_event(hostEnv, onCallbackTask, nullptr, priority, - &handleId, ON_CALLBACK_STR); + &handleId, "TaskPoolOnCallbackTask"); if (status != napi_ok) { HILOG_ERROR("taskpool:: failed to send event to the host side"); --callbackInfo->refCount; @@ -1066,7 +1053,8 @@ void TaskManager::NotifyDependencyTaskInfo(uint32_t taskId) RemoveDependencyById(taskId, *taskIdIter); taskIdIter = iter->second.erase(taskIdIter); if (taskInfo.first != 0) { - EnqueueTaskId(taskInfo.first, taskInfo.second); + auto task = TaskManager::GetInstance().GetTask(taskInfo.first); + EnqueueTask(task, taskInfo.second); } } } @@ -1368,47 +1356,11 @@ void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTas void TaskManager::ReleaseCallBackInfo(Task* task) { - HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str()); std::lock_guard lock(task->taskMutex_); - if (task->onEnqueuedCallBackInfo_ != nullptr) { - delete task->onEnqueuedCallBackInfo_; - task->onEnqueuedCallBackInfo_ = nullptr; - } - - if (task->onStartExecutionCallBackInfo_ != nullptr) { - delete task->onStartExecutionCallBackInfo_; - task->onStartExecutionCallBackInfo_ = nullptr; - } - - if (task->onExecutionFailedCallBackInfo_ != nullptr) { - delete task->onExecutionFailedCallBackInfo_; - task->onExecutionFailedCallBackInfo_ = nullptr; - } - - if (task->onExecutionSucceededCallBackInfo_ != nullptr) { - delete task->onExecutionSucceededCallBackInfo_; - task->onExecutionSucceededCallBackInfo_ = nullptr; - } - -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) - if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) { - if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) { - ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_); - } else { - delete task->onStartExecutionSignal_; - task->onStartExecutionSignal_ = nullptr; - } - } -#else - if (task->onStartExecutionSignal_ != nullptr) { - if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) { - ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_); - } else { - delete task->onStartExecutionSignal_; - task->onStartExecutionSignal_ = nullptr; - } - } -#endif + ConcurrentHelper::DeleteHelper(task->onEnqueuedCallBackInfo_); + ConcurrentHelper::DeleteHelper(task->onStartExecutionCallBackInfo_); + ConcurrentHelper::DeleteHelper(task->onExecutionFailedCallBackInfo_); + ConcurrentHelper::DeleteHelper(task->onExecutionSucceededCallBackInfo_); } void TaskManager::StoreTask(Task* task) @@ -1469,19 +1421,12 @@ void TaskManager::UpdateSystemAppFlag() } #endif -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) -bool TaskManager::PostTask(std::function task, const char* taskName, Priority priority) -{ - return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority)); -} -#endif - -void TaskManager::BatchRejectDeferred(napi_env env, std::list deferreds, std::string error) +void TaskManager::BatchRejectDeferred(napi_env env, const std::list& deferreds, std::string_view error) { if (deferreds.empty()) { return; } - napi_value message = ErrorHelper::NewError(env, 0, error.c_str()); + napi_value message = ErrorHelper::NewError(env, 0, error.data()); for (auto deferred : deferreds) { napi_reject_deferred(env, deferred, message); } diff --git a/js_concurrent_module/taskpool/task_manager.h b/js_concurrent_module/taskpool/task_manager.h index 2ca4a772cf7e2f79f4e930a6829fab4fef7d6b83..f4507e0a8e55047744304c0ee96a3aa90137f403 100644 --- a/js_concurrent_module/taskpool/task_manager.h +++ b/js_concurrent_module/taskpool/task_manager.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -62,7 +63,7 @@ public: void StoreTask(Task* task); void RemoveTask(uint32_t taskId); Task* GetTask(uint32_t taskId); - void EnqueueTaskId(uint32_t taskId, Priority priority = Priority::DEFAULT); + void EnqueueTask(Task* task, Priority priority = Priority::DEFAULT); bool EraseWaitingTaskId(uint32_t taskId, Priority priority); std::pair DequeueTaskId(); void CancelTask(napi_env env, uint32_t taskId); @@ -119,8 +120,6 @@ public: void RemoveDependentTaskInfo(uint32_t dependentTaskId, uint32_t taskId); std::string GetTaskDependInfoToString(uint32_t taskId); - bool PostTask(std::function task, const char* taskName, Priority priority = Priority::DEFAULT); - // for duration void StoreTaskDuration(uint32_t taskId, uint64_t totalDuration, uint64_t cpuDuration); uint64_t GetTaskDuration(uint32_t taskId, std::string durationType); @@ -145,7 +144,7 @@ public: return globalEnableFfrtFlag_ || (isSystemApp_ && !disableFfrtFlag_); } - void BatchRejectDeferred(napi_env env, std::list deferreds, std::string error); + void BatchRejectDeferred(napi_env env, const std::list& deferreds, std::string_view error); uint32_t CalculateTaskId(uint64_t id); void ClearDependentTask(uint32_t taskId); void UvReportHisysEvent(Worker* worker, std::string methodName, std::string funName, std::string message, @@ -250,10 +249,6 @@ private: std::vector freeList_ {}; uint32_t maxThreads_ = ConcurrentHelper::GetMaxThreads(); -#if defined(ENABLE_TASKPOOL_EVENTHANDLER) - std::shared_ptr mainThreadHandler_ {}; -#endif - friend class TaskGroupManager; friend class NativeEngineTest; }; diff --git a/js_concurrent_module/taskpool/taskpool.cpp b/js_concurrent_module/taskpool/taskpool.cpp index e26372937c927a592276c726e7048389cb9aad44..95bee2cdd4b2bc66665262f2796b8362a98d9fa5 100644 --- a/js_concurrent_module/taskpool/taskpool.cpp +++ b/js_concurrent_module/taskpool/taskpool.cpp @@ -266,7 +266,7 @@ void TaskPool::DelayTask(uv_timer_t* handle) taskInfo->deferred = taskMessage->deferred; if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) { task->taskState_ = ExecuteState::WAITING; - TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority)); + TaskManager::GetInstance().EnqueueTask(task, static_cast(taskMessage->priority)); } } else { napi_value execption = nullptr; @@ -448,7 +448,7 @@ void TaskPool::HandleTaskResultInner(Task* task) } else { napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult); if (task->onExecutionFailedCallBackInfo_ != nullptr) { - task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult; + task->onExecutionFailedCallBackInfo_->result = napiTaskResult; task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_); } } @@ -468,14 +468,14 @@ void TaskPool::TriggerTask(Task* task) // seqRunnerTask will trigger the next if (task->IsSeqRunnerTask()) { if (!SequenceRunnerManager::GetInstance().TriggerSeqRunner(task->env_, task)) { - HILOG_ERROR("taskpool:: task %{public}s trigger in seqRunner %{public}s failed", + HILOG_DEBUG("taskpool:: task %{public}s trigger in seqRunner %{public}s failed", std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str()); } } else if (task->IsCommonTask()) { task->NotifyPendingTask(); } else if (task->IsAsyncRunnerTask()) { if (!AsyncRunnerManager::GetInstance().TriggerAsyncRunner(task->env_, task)) { - HILOG_ERROR("taskpool:: task %{public}s trigger in asyncRunner %{public}s failed", + HILOG_DEBUG("taskpool:: task %{public}s trigger in asyncRunner %{public}s failed", std::to_string(task->taskId_).c_str(), std::to_string(task->asyncRunnerId_).c_str()); } } @@ -494,7 +494,6 @@ void TaskPool::TriggerTask(Task* task) void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success) { - HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str()); TaskManager::GetInstance().DecreaseSendDataRefCount(task->env_, task->taskId_); task->taskState_ = ExecuteState::FINISHED; napi_reference_unref(env, task->taskRef_, nullptr); @@ -504,7 +503,7 @@ void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, } TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_); if (taskGroup == nullptr || taskGroup->currentGroupInfo_ == nullptr) { - HILOG_DEBUG("taskpool:: taskGroup may have been released or canceled"); + HILOG_WARN("taskpool:: taskGroup may have been released or canceled"); return; } // store the result @@ -540,7 +539,7 @@ void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, std::advance(iter, groupInfo->GetFailedIndex()); auto task = iter != taskGroup->taskIds_.end() ? TaskManager::GetInstance().GetTask(*iter) : nullptr; if (task != nullptr && task->onExecutionFailedCallBackInfo_ != nullptr) { - task->onExecutionFailedCallBackInfo_->taskError_ = res; + task->onExecutionFailedCallBackInfo_->result = res; task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_); } } @@ -568,7 +567,7 @@ void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority) (task->taskState_ == ExecuteState::CANCELED && task->isCancelToFinish_)) { task->taskState_ = ExecuteState::WAITING; task->isCancelToFinish_ = false; - TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority); + TaskManager::GetInstance().EnqueueTask(task); } } @@ -636,11 +635,7 @@ napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo) void TaskPool::PeriodicTaskCallback(uv_timer_t* handle) { Task* task = reinterpret_cast(handle->data); - if (task == nullptr) { - HILOG_DEBUG("taskpool:: the task is nullptr"); - return; - } else if (!task->IsPeriodicTask()) { - HILOG_DEBUG("taskpool:: the current task is not a periodic task"); + if (task == nullptr || !task->IsPeriodicTask()) { return; } else if (task->taskState_ == ExecuteState::CANCELED) { HILOG_DEBUG("taskpool:: the periodic task has been canceled"); @@ -660,7 +655,7 @@ void TaskPool::PeriodicTaskCallback(uv_timer_t* handle) return; } napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_); - TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_); + TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->priority_); if (taskInfo == nullptr) { HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr"); return; @@ -672,7 +667,7 @@ void TaskPool::PeriodicTaskCallback(uv_timer_t* handle) HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str()); if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) { task->taskState_ = ExecuteState::WAITING; - TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_); + TaskManager::GetInstance().EnqueueTask(task); } } @@ -690,9 +685,9 @@ napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo } periodicTask->UpdatePeriodicTask(); - periodicTask->periodicTaskPriority_ = static_cast(priority); + periodicTask->priority_ = static_cast(priority); napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_); - TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->periodicTaskPriority_); + TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->priority_); if (taskInfo == nullptr) { return nullptr; } diff --git a/js_concurrent_module/taskpool/test/test.cpp b/js_concurrent_module/taskpool/test/test.cpp index cd2aeffb35ebb6fea4a3935cf8de398dbd72603d..ff16bbb221734356243d03cd9f3d306a274e4398 100755 --- a/js_concurrent_module/taskpool/test/test.cpp +++ b/js_concurrent_module/taskpool/test/test.cpp @@ -382,7 +382,7 @@ void NativeEngineTest::EnqueueTaskId(napi_env env) napi_value obj = Helper::NapiHelper::CreateObject(env); napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, obj, 1); task->onEnqueuedCallBackInfo_ = new ListenerCallBackInfo(env, callbackRef, nullptr); - taskManager.EnqueueTaskId(task->taskId_); + taskManager.EnqueueTask(task); taskManager.workers_.clear(); Worker* worker = reinterpret_cast(WorkerConstructor(env)); @@ -404,7 +404,7 @@ void NativeEngineTest::GetTaskByPriority(napi_env env) while (id != 0) { id = mediumTaskQueue->DequeueTaskId(); } - taskManager.EnqueueTaskId(task->taskId_); + taskManager.EnqueueTask(task); std::set set{task->taskId_}; taskManager.dependTaskInfos_.emplace(task->taskId_, std::move(set)); taskManager.GetTaskByPriority(mediumTaskQueue, Priority::DEFAULT); @@ -423,7 +423,7 @@ void NativeEngineTest::RestoreWorker(napi_env env) Task* task = new Task(); task->taskId_ = TaskManager::GetInstance().CalculateTaskId(reinterpret_cast(task)); - taskManager.EnqueueTaskId(task->taskId_); + taskManager.EnqueueTask(task); worker->state_ = WorkerState::IDLE; worker->workerEnv_ = env; uv_loop_t* loop = worker->GetWorkerLoop(); @@ -584,9 +584,6 @@ void NativeEngineTest::ReleaseTaskData(napi_env env) taskManager.ReleaseCallBackInfo(task2); task2->isMainThreadTask_ = false; taskManager.ReleaseCallBackInfo(task2); - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, NativeEngineTest::foo, task2); - taskManager.ReleaseCallBackInfo(task2); } void NativeEngineTest::CheckTask(napi_env env) @@ -605,8 +602,6 @@ void NativeEngineTest::CheckTask(napi_env env) group->groupRef_ = ref; uint64_t groupId = reinterpret_cast(group); groupManager.StoreTaskGroup(groupId, nullptr); - groupManager.AddTask(groupId, nullptr, task->taskId_); - groupManager.taskGroups_.clear(); groupManager.StoreTaskGroup(groupId, group); group->groupState_ = ExecuteState::CANCELED; groupManager.CancelGroup(env, groupId); diff --git a/js_concurrent_module/taskpool/test/test_taskpool.cpp b/js_concurrent_module/taskpool/test/test_taskpool.cpp index 32655c17adcceaacf4860cf372e51cb5ab92a685..d8ff5179534607e6d0173d9bb921252e578e9f8f 100644 --- a/js_concurrent_module/taskpool/test/test_taskpool.cpp +++ b/js_concurrent_module/taskpool/test/test_taskpool.cpp @@ -396,7 +396,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest022, testing::ext::TestSize.Level0) uint32_t taskId = TaskManager::GetInstance().CalculateTaskId(reinterpret_cast(task)); napi_value value = NapiHelper::CreateUint64(env, groupId); napi_ref reference = NapiHelper::CreateReference(env, value, 0); - taskGroupManager.AddTask(groupId, reference, taskId); + group->AddTask(reference, taskId); ASSERT_NE(reference, nullptr); delete task; delete group; @@ -530,8 +530,10 @@ HWTEST_F(NativeEngineTest, TaskpoolTest033, testing::ext::TestSize.Level0) ResetTaskManager(); // the task will freed in the taskManager's Destuctor and will not cause memory leak Task* task = new Task(); + task->priority_ = Priority::HIGH; auto taskId = TaskManager::GetInstance().CalculateTaskId(reinterpret_cast(task)); - taskManager.EnqueueTaskId(taskId, Priority::HIGH); + task->taskId_ = taskId; + taskManager.EnqueueTask(task); std::pair result = taskManager.DequeueTaskId(); ASSERT_TRUE(result.first == taskId); ASSERT_TRUE(result.second == Priority::HIGH); @@ -822,24 +824,13 @@ HWTEST_F(NativeEngineTest, TaskpoolTest060, testing::ext::TestSize.Level0) napi_env env = reinterpret_cast(engine_); TaskManager& taskManager = TaskManager::GetInstance(); ResetTaskManager(); - uint32_t taskId = 36; - taskManager.EnqueueTaskId(taskId, Priority::LOW); - ASSERT_EQ(taskId, 36); + auto task = std::make_shared(); + task->taskId_ = 36; + taskManager.EnqueueTask(task.get(), Priority::LOW); std::pair result = taskManager.DequeueTaskId(); ASSERT_TRUE(result.first == 36); ASSERT_TRUE(result.second == Priority::LOW); - - taskId = 37; - taskManager.EnqueueTaskId(taskId, Priority::IDLE); - ASSERT_EQ(taskId, 37); - - result = taskManager.DequeueTaskId(); - ASSERT_TRUE(result.first == 37); - ASSERT_TRUE(result.second == Priority::IDLE); - result = taskManager.DequeueTaskId(); - ASSERT_TRUE(result.first == 0); - ASSERT_TRUE(result.second == Priority::LOW); } HWTEST_F(NativeEngineTest, TaskpoolTest061, testing::ext::TestSize.Level0) @@ -942,7 +933,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest067, testing::ext::TestSize.Level0) } }, nullptr, nullptr); napi_ref ref = NapiHelper::CreateReference(env, thisValue, 1); - taskGroupManager.AddTask(groupId, ref, task->taskId_); + taskGroup->AddTask(ref, task->taskId_); GroupInfo* groupInfo = new GroupInfo(); groupInfo->priority = Priority::DEFAULT; @@ -992,10 +983,8 @@ HWTEST_F(NativeEngineTest, TaskpoolTest070, testing::ext::TestSize.Level0) taskManager.NotifyWorkerCreated(worker); Task* task = new Task(); task->isLongTask_ = true; - taskManager.StoreTask(task); - uint32_t id = task->taskId_; - taskManager.EnqueueTaskId(id); usleep(50000); + uint32_t id = 100; auto res = taskManager.GetLongTaskInfo(id); ASSERT_NE(res, nullptr); ResetTaskManager(); @@ -2364,7 +2353,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest141, testing::ext::TestSize.Level0) taskGroupManager.StoreTaskGroup(groupId, group); napi_value value = NapiHelper::CreateUint64(env, groupId); napi_ref reference = NapiHelper::CreateReference(env, value, 0); - taskGroupManager.AddTask(groupId, reference, 1); + group->AddTask(reference, 1); napi_value result = NativeEngineTest::ExecuteGroup(env, taskGroupResult); ASSERT_TRUE(result == nullptr); } @@ -2398,7 +2387,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest142, testing::ext::TestSize.Level0) } }, nullptr, nullptr); napi_ref ref = NapiHelper::CreateReference(env, thisValue, 1); - taskGroupManager.AddTask(groupId, ref, task->taskId_); + group->AddTask(ref, task->taskId_); napi_value result = NativeEngineTest::ExecuteGroup(env, taskGroupResult); ASSERT_TRUE(result != nullptr); } @@ -2627,7 +2616,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest149, testing::ext::TestSize.Level0) taskGroupManager.StoreTaskGroup(groupId, group); napi_value value = NapiHelper::CreateUint64(env, groupId); napi_ref reference = NapiHelper::CreateReference(env, value, 0); - taskGroupManager.AddTask(groupId, reference, taskId); + group->AddTask(reference, taskId); napi_value res = nullptr; napi_create_uint32(env, 1, &res); @@ -2768,7 +2757,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest152, testing::ext::TestSize.Level0) } }, nullptr, nullptr); napi_ref ref = NapiHelper::CreateReference(env, thisValue, 1); - taskGroupManager.AddTask(groupId, nullptr, task->taskId_); + taskGroup->AddTask(nullptr, task->taskId_); GroupInfo* groupInfo = new GroupInfo(); groupInfo->priority = Priority::DEFAULT; @@ -2801,7 +2790,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest153, testing::ext::TestSize.Level0) } }, nullptr, nullptr); napi_ref ref = NapiHelper::CreateReference(env, thisValue, 1); - taskGroupManager.AddTask(groupId, ref, task->taskId_); + taskGroup->AddTask(ref, task->taskId_); GroupInfo* groupInfo = new GroupInfo(); groupInfo->priority = Priority::DEFAULT; @@ -3463,10 +3452,6 @@ HWTEST_F(NativeEngineTest, TaskpoolTest181, testing::ext::TestSize.Level0) GetSendableFunction(env, "foo", func); napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, func, 1); ListenerCallBackInfo* cbInfo = new ListenerCallBackInfo(env, callbackRef, nullptr); - - uv_async_t* req = new uv_async_t; - req->data = cbInfo; - Task::StartExecutionCallback(req); napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_TRUE(exception == nullptr); @@ -3476,10 +3461,6 @@ HWTEST_F(NativeEngineTest, TaskpoolTest182, testing::ext::TestSize.Level0) { napi_env env = (napi_env)engine_; ListenerCallBackInfo* cbInfo = new ListenerCallBackInfo(env, nullptr, nullptr); - - uv_async_t* req = new uv_async_t; - req->data = cbInfo; - Task::StartExecutionCallback(req); napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_TRUE(exception == nullptr); @@ -4644,7 +4625,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest230, testing::ext::TestSize.Level0) taskGroupManager.StoreTaskGroup(groupId, group); napi_value value = NapiHelper::CreateUint64(env, groupId); napi_ref reference = NapiHelper::CreateReference(env, value, 0); - taskGroupManager.AddTask(groupId, reference, taskId); + group->AddTask(reference, taskId); napi_value res = nullptr; napi_create_uint32(env, 1, &res); @@ -4783,8 +4764,6 @@ HWTEST_F(NativeEngineTest, TaskpoolTest240, testing::ext::TestSize.Level0) Task* task = new Task(); uint32_t taskId = TaskManager::GetInstance().CalculateTaskId(reinterpret_cast(task)); task->taskId_ = taskId; - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, NativeEngineTest::foo, task); task->taskType_ = TaskType::FUNCTION_TASK; Task::CleanupHookFunc(task); napi_value exception = nullptr; @@ -4828,7 +4807,7 @@ HWTEST_F(NativeEngineTest, TaskpoolTest243, testing::ext::TestSize.Level0) uint32_t taskId = TaskManager::GetInstance().CalculateTaskId(reinterpret_cast(task)); task->taskId_ = taskId; task->taskState_ = ExecuteState::CANCELED; - task->UpdateTask(0, nullptr); + task->UpdateRunningInfo(0, nullptr); task->isMainThreadTask_ = false; task->SetValid(false); Task::VerifyAndPostResult(task, Priority::DEFAULT); @@ -4860,10 +4839,8 @@ HWTEST_F(NativeEngineTest, TaskpoolTest245, testing::ext::TestSize.Level0) task->taskId_ = taskId; task->env_ = env; task->isMainThreadTask_ = false; - task->onStartExecutionSignal_ = nullptr; task->CheckStartExecution(Priority::DEFAULT); auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, NativeEngineTest::foo, task); task->SetValid(false); task->CheckStartExecution(Priority::DEFAULT); task->SetValid(true); @@ -4896,8 +4873,6 @@ HWTEST_F(NativeEngineTest, TaskpoolTest247, testing::ext::TestSize.Level0) task->taskId_ = taskId; task->env_ = env; task->isMainThreadTask_ = false; - auto loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_, NativeEngineTest::foo, task); TaskManager::GetInstance().ReleaseCallBackInfo(task); napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); @@ -5776,9 +5751,6 @@ HWTEST_F(NativeEngineTest, TaskpoolTest283, testing::ext::TestSize.Level0) TaskManager& taskManager = TaskManager::GetInstance(); AsyncRunnerManager& asyncRunnerManager = AsyncRunnerManager::GetInstance(); - uv_async_t* req = new uv_async_t; - req->data = nullptr; - Task::DiscardTask(req); napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_EQ(exception, nullptr); @@ -5789,15 +5761,11 @@ HWTEST_F(NativeEngineTest, TaskpoolTest283, testing::ext::TestSize.Level0) task->taskType_ = TaskType::ASYNCRUNNER_TASK; taskManager.StoreTask(task); DiscardTaskMessage* message = new DiscardTaskMessage(env, task->taskId_, 0, false); - req->data = message; - Task::DiscardTask(req); exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_EQ(exception, nullptr); DiscardTaskMessage* message2 = new DiscardTaskMessage(nullptr, 283, 0, false); - req->data = message2; - Task::DiscardTask(req); exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_EQ(exception, nullptr); @@ -5805,8 +5773,6 @@ HWTEST_F(NativeEngineTest, TaskpoolTest283, testing::ext::TestSize.Level0) task->env_ = env; task->SetValid(false); DiscardTaskMessage* message3 = new DiscardTaskMessage(env, task->taskId_, 0, false); - req->data = message3; - Task::DiscardTask(req); exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_EQ(exception, nullptr); @@ -5860,17 +5826,11 @@ HWTEST_F(NativeEngineTest, TaskpoolTest285, testing::ext::TestSize.Level0) HWTEST_F(NativeEngineTest, TaskpoolTest286, testing::ext::TestSize.Level0) { napi_env env = (napi_env)engine_; - uv_async_t* req = new uv_async_t; - req->data = nullptr; - TaskGroup::StartRejectResult(req); TaskGroup* group = new TaskGroup(env); - req->data = group; - TaskGroup::StartRejectResult(req); napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_EQ(exception, nullptr); delete group; - delete req; } HWTEST_F(NativeEngineTest, TaskpoolTest287, testing::ext::TestSize.Level0) @@ -5927,9 +5887,6 @@ HWTEST_F(NativeEngineTest, TaskpoolTest289, testing::ext::TestSize.Level0) { napi_env env = (napi_env)engine_; TaskGroup* group = new TaskGroup(env); - uv_loop_t* loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, group->onRejectResultSignal_, NativeEngineTest::foo); - group->TriggerRejectResult(); napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_EQ(exception, nullptr); @@ -6033,8 +5990,6 @@ HWTEST_F(NativeEngineTest, TaskpoolTest293, testing::ext::TestSize.Level0) CancelTaskMessage* message3 = new CancelTaskMessage(ExecuteState::RUNNING, task->taskId_); task->SetValid(true); task->TriggerCancel(message3); - uv_loop_t* loop = NapiHelper::GetLibUV(env); - ConcurrentHelper::UvHandleInit(loop, task->onStartCancelSignal_, NativeEngineTest::foo); CancelTaskMessage* message4 = new CancelTaskMessage(ExecuteState::RUNNING, task->taskId_); task->TriggerCancel(message4); napi_value exception = nullptr; @@ -6074,21 +6029,13 @@ HWTEST_F(NativeEngineTest, TaskpoolTest296, testing::ext::TestSize.Level0) napi_env env = (napi_env)engine_; Task* task = new Task(); task->env_ = env; - uv_async_t* req = new uv_async_t; - req->data = nullptr; - Task::Cancel(req); task->taskId_ = 1000; // 1000: test number CancelTaskMessage* message = new CancelTaskMessage(ExecuteState::RUNNING, task->taskId_); - req->data = message; - Task::Cancel(req); TaskManager::GetInstance().StoreTask(task); CancelTaskMessage* message2 = new CancelTaskMessage(ExecuteState::RUNNING, task->taskId_); - req->data = message2; - Task::Cancel(req); napi_value exception = nullptr; napi_get_and_clear_last_exception(env, &exception); ASSERT_TRUE(exception == nullptr); - delete req; } HWTEST_F(NativeEngineTest, TaskpoolTest297, testing::ext::TestSize.Level0) diff --git a/js_concurrent_module/taskpool/worker.cpp b/js_concurrent_module/taskpool/worker.cpp index b5c782784712ea35a82296a6bb93897ed1acdc27..63368f09c16c3f8f6e4dcb1bc1520ecd775f57a0 100644 --- a/js_concurrent_module/taskpool/worker.cpp +++ b/js_concurrent_module/taskpool/worker.cpp @@ -34,13 +34,22 @@ static constexpr uint32_t TASKPOOL_TYPE = 2; static constexpr uint32_t WORKER_ALIVE_TIME = 1800000; // 1800000: 30min static constexpr int32_t MAX_REPORT_TIMES = 3; +#if defined(ENABLE_TASKPOOL_FFRT) +static const std::map g_ffrtQosMap = { + {Priority::IDLE, ffrt::qos_background}, + {Priority::LOW, ffrt::qos_utility}, + {Priority::MEDIUM, ffrt::qos_default}, + {Priority::HIGH, ffrt::qos_user_initiated}, +}; +#endif + Worker::PriorityScope::PriorityScope(Worker* worker, Priority taskPriority) : worker_(worker) { if (taskPriority != worker->priority_) { HILOG_DEBUG("taskpool:: reset worker priority to match task priority"); if (TaskManager::GetInstance().EnableFfrt()) { #if defined(ENABLE_TASKPOOL_FFRT) - if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(taskPriority)) != 0) { + if (ffrt::this_task::update_qos(g_ffrtQosMap.at(taskPriority)) != 0) { SetWorkerPriority(taskPriority); } #endif @@ -427,7 +436,7 @@ void Worker::PerformTask(const uv_async_t* req) PriorityScope priorityScope(worker, taskInfo.second); Task* task = TaskManager::GetInstance().GetTask(taskInfo.first); if (task == nullptr) { - HILOG_DEBUG("taskpool:: task has been released"); + HILOG_WARN("taskpool:: task has been released"); return; } else if (!task->IsValid() && task->ShouldDeleteTask(false)) { HILOG_WARN("taskpool:: task is invalid"); @@ -436,11 +445,8 @@ void Worker::PerformTask(const uv_async_t* req) } // try to record the memory data for gc worker->NotifyTaskBegin(); + task->UpdateRunningInfo(startTime, worker); - if (!task->UpdateTask(startTime, worker)) { - worker->NotifyTaskFinished(); - return; - } if (task->IsGroupTask() && (!TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_))) { return; } @@ -587,7 +593,7 @@ void Worker::ResetWorkerPriority() if (priority_ != Priority::HIGH) { if (TaskManager::GetInstance().EnableFfrt()) { #if defined(ENABLE_TASKPOOL_FFRT) - if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(Priority::HIGH)) != 0) { + if (ffrt::this_task::update_qos(g_ffrtQosMap.at(Priority::HIGH)) != 0) { SetWorkerPriority(Priority::HIGH); } #endif diff --git a/js_concurrent_module/taskpool/worker.h b/js_concurrent_module/taskpool/worker.h index 97850b34316b7a55cc6304548dc32592088bff50..32e2962c01713a2c76dfea65deb7bfe213fb0a64 100644 --- a/js_concurrent_module/taskpool/worker.h +++ b/js_concurrent_module/taskpool/worker.h @@ -40,15 +40,6 @@ using namespace Commonlibrary::Platform; enum class WorkerState { IDLE, RUNNING, BLOCKED }; -#if defined(ENABLE_TASKPOOL_FFRT) -static const std::map WORKERPRIORITY_FFRTQOS_MAP = { - {Priority::IDLE, ffrt::qos_background}, - {Priority::LOW, ffrt::qos_utility}, - {Priority::MEDIUM, ffrt::qos_default}, - {Priority::HIGH, ffrt::qos_user_initiated}, -}; -#endif - class Worker { public: using DebuggerPostTask = std::function;