diff --git a/js_concurrent_module/taskpool/BUILD.gn b/js_concurrent_module/taskpool/BUILD.gn index 5cd260a9435f99d6bfe58b0179485701315d89cd..99fece3a401d8c9d61d04525a80de6d184fc4d25 100644 --- a/js_concurrent_module/taskpool/BUILD.gn +++ b/js_concurrent_module/taskpool/BUILD.gn @@ -26,6 +26,7 @@ taskpool_sources = [ "task_manager.cpp", "task_queue.cpp", "task_runner.cpp", + "task_strategy.cpp", "taskpool.cpp", "thread.cpp", "worker.cpp", diff --git a/js_concurrent_module/taskpool/async_runner_manager.cpp b/js_concurrent_module/taskpool/async_runner_manager.cpp index e1b91b4f95bd532267736f3b3f4b6616545d7b59..c4e800cb4da23f8fa4df021ba3c5645930053893 100644 --- a/js_concurrent_module/taskpool/async_runner_manager.cpp +++ b/js_concurrent_module/taskpool/async_runner_manager.cpp @@ -103,7 +103,7 @@ void AsyncRunnerManager::RemoveGlobalAsyncRunner(const std::string& name) void AsyncRunnerManager::CancelAsyncRunnerTask(napi_env env, Task* task) { std::string errMsg = ""; - if (task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) { + if (task->taskState_ == ExecuteState::FINISHED) { errMsg = "AsyncRunner task has been executed."; HILOG_ERROR("taskpool:: %{public}s", errMsg.c_str()); ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); diff --git a/js_concurrent_module/taskpool/task.cpp b/js_concurrent_module/taskpool/task.cpp index d0e3b011f601c90b18938a25d1953361f28d9fc1..3dd328de4a91b53bcef454b168a7eff6563ec942 100644 --- a/js_concurrent_module/taskpool/task.cpp +++ b/js_concurrent_module/taskpool/task.cpp @@ -48,9 +48,9 @@ napi_value Task::TaskConstructor(napi_env env, napi_callback_info cbinfo) size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo); std::string errMessage = ""; if (argc < 1) { - errMessage = "taskpool:: create task need more than one param"; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str()); + std::string_view errMessage = "taskpool:: create task need more than one param"; + HILOG_ERROR("%{public}s", errMessage.data()); + ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.data()); return nullptr; } napi_value* args = new napi_value[argc]; @@ -71,10 +71,9 @@ napi_value Task::TaskConstructor(napi_env env, napi_callback_info cbinfo) argc -= 1; // 1: func } if (!NapiHelper::IsFunction(env, func)) { - errMessage = "taskpool:: the first or second param of task must be function"; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, - "the type of the first or second param of task must be function."); + std::string_view errMessage = "the first or second param of task must be function"; + HILOG_ERROR("taskpool:: %{public}s", errMessage.data()); + ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.data()); return nullptr; } @@ -84,7 +83,6 @@ napi_value Task::TaskConstructor(napi_env env, napi_callback_info cbinfo) HILOG_ERROR("taskpool::TaskConstructor napi_wrap return value is %{public}d", status); TaskManager::GetInstance().RemoveTask(task->taskId_); delete task; - task = nullptr; return nullptr; } napi_create_reference(env, thisVar, 0, &task->taskRef_); @@ -109,7 +107,6 @@ napi_value Task::LongTaskConstructor(napi_env env, napi_callback_info cbinfo) void Task::TaskDestructor(napi_env env, void* data, [[maybe_unused]] void* hint) { Task* task = static_cast(data); - HILOG_DEBUG("taskpool:: taskId:%{public}s TaskDestructor", std::to_string(task->taskId_).c_str()); if (!task->IsMainThreadTask()) { napi_remove_env_cleanup_hook(env, Task::CleanupHookFunc, task); } @@ -137,10 +134,6 @@ void Task::TaskDestructor(napi_env env, void* data, [[maybe_unused]] void* hint) void Task::CleanupHookFunc(void* arg) { - if (arg == nullptr) { - HILOG_ERROR("taskpool:: cleanupHook arg is nullptr"); - return; - } Task* task = static_cast(arg); { std::lock_guard lock(task->taskMutex_); @@ -186,7 +179,6 @@ void Task::Cancel(const uv_async_t* req) Task* Task::GenerateTask(napi_env env, napi_value napiTask, napi_value func, napi_value name, napi_value* args, size_t argc) { - HILOG_DEBUG("taskpool:: task GenerateTask"); napi_value argsArray = NapiHelper::CreateArrayWithLength(env, argc); for (size_t i = 0; i < argc; i++) { napi_set_element(env, argsArray, i, args[i]); @@ -227,14 +219,14 @@ Task* Task::GenerateTask(napi_env env, napi_value napiTask, napi_value func, Task* Task::GenerateFunctionTask(napi_env env, napi_value func, napi_value* args, size_t argc, TaskType type) { - HILOG_DEBUG("taskpool:: task GenerateFunctionTask"); napi_value argsArray; napi_create_array_with_length(env, argc, &argsArray); for (size_t i = 0; i < argc; i++) { napi_set_element(env, argsArray, i, args[i]); } napi_value undefined = NapiHelper::GetUndefinedValue(env); - TaskInfo* taskInfo = GenerateTaskInfo(env, func, argsArray, undefined, undefined, Priority::DEFAULT); + TaskInfo* taskInfo = nullptr; + // TaskInfo* taskInfo = GetTaskInfoPromise(env, func, argsArray, undefined, undefined, Priority::DEFAULT); if (taskInfo == nullptr) { HILOG_ERROR("taskpool:: task GenerateFunctionTask end, taskInfo is nullptr"); return nullptr; @@ -252,52 +244,71 @@ Task* Task::GenerateFunctionTask(napi_env env, napi_value func, napi_value* args return task; } -napi_value Task::GetTaskInfoPromise(napi_env env, napi_value task, TaskType taskType, Priority priority) +TaskInfo* Task::GenerateTaskInfo(napi_env env, napi_value task, Priority priority) { - TaskInfo* taskInfo = GetTaskInfo(env, task, priority); + // Serialize the concurrent function and args and if failed, taskInfo will be nullptr + TaskInfo* taskInfo = GenerateTaskInfoInner(env, task, priority); if (taskInfo == nullptr) { return nullptr; } - UpdateTaskType(taskType); - return NapiHelper::CreatePromise(env, &taskInfo->deferred); + // for multiple execution, the taskInfo will be stored in pendingTaskInfos_ if currentTaskInfo is not nullptr + if (currentTaskInfo_ == nullptr) { + currentTaskInfo_ = taskInfo; + } else { + pendingTaskInfos_.emplace_back(taskInfo); + } + return taskInfo; } -TaskInfo* Task::GetTaskInfo(napi_env env, napi_value napiTask, Priority priority) +TaskInfo* Task::GenerateTaskInfoInner(napi_env env, napi_value napiTask, Priority priority) { napi_value func = NapiHelper::GetNameProperty(env, napiTask, FUNCTION_STR); napi_value args = NapiHelper::GetNameProperty(env, napiTask, ARGUMENTS_STR); if (func == nullptr || args == nullptr) { - std::string errMessage = "taskpool:: task value is error"; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str()); + std::string_view errMessage = "taskpool:: task value is invalid"; + HILOG_ERROR("%{public}s", errMessage.data()); + ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.data()); return nullptr; } - napi_value transferList = NapiHelper::GetUndefinedValue(env); + napi_value undefined = NapiHelper::GetUndefinedValue(env); + napi_value transferList; if (NapiHelper::HasNameProperty(env, napiTask, TRANSFERLIST_STR)) { transferList = NapiHelper::GetNameProperty(env, napiTask, TRANSFERLIST_STR); } - napi_value cloneList = NapiHelper::GetUndefinedValue(env); + napi_value cloneList; if (NapiHelper::HasNameProperty(env, napiTask, CLONE_LIST_STR)) { cloneList = NapiHelper::GetNameProperty(env, napiTask, CLONE_LIST_STR); } - TaskInfo* pendingInfo = GenerateTaskInfo(env, func, args, transferList, cloneList, priority, - defaultTransfer_, defaultCloneSendable_); - if (pendingInfo == nullptr) { + + void* serializationFunction = nullptr; + napi_status status = napi_serialize_inner(env, func, undefined, undefined, + defaultTransfer_, defaultCloneSendable_, &serializationFunction); + if (status != napi_ok || serializationFunction == nullptr) { + std::string_view errMessage = "taskpool: failed to serialize function."; + HILOG_ERROR("%{public}s", errMessage.data()); + ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_CONCURRENT_FUNCTION, errMessage.data()); return nullptr; } - { - std::lock_guard lock(taskMutex_); - if (currentTaskInfo_ == nullptr) { - currentTaskInfo_ = pendingInfo; - } else { - pendingTaskInfos_.push_back(pendingInfo); - } + void* serializationArguments = nullptr; + status = napi_serialize_inner(env, args, transferList, cloneList, + defaultTransfer_, defaultCloneSendable_, &serializationArguments); + if (status != napi_ok || serializationArguments == nullptr) { + std::string_view errMessage = "taskpool: failed to serialize arguments."; + HILOG_ERROR("%{public}s", errMessage.data()); + ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.data()); + return nullptr; } + if (name_.empty()) { napi_value funcName = NapiHelper::GetNameProperty(env, func, NAME); name_ = NapiHelper::GetString(env, funcName); } - return pendingInfo; + + TaskInfo* taskInfo = new TaskInfo(); + taskInfo->serializationFunction = serializationFunction; + taskInfo->serializationArguments = serializationArguments; + taskInfo->priority = priority; + return taskInfo; } napi_value Task::SetTransferList(napi_env env, napi_callback_info cbinfo) @@ -352,7 +363,6 @@ napi_value Task::SetTransferList(napi_env env, napi_callback_info cbinfo) return nullptr; } } - HILOG_DEBUG("taskpool:: check setTransferList param success"); napi_set_named_property(env, thisVar, TRANSFERLIST_STR, args[0]); return nullptr; } @@ -539,7 +549,7 @@ napi_value Task::AddDependency(napi_env env, napi_callback_info cbinfo) ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_HAVE_DEPENDENCY); return nullptr; } - if (task->IsCommonTask() || task->IsSeqRunnerTask()) { + if (task->IsCommonTask() || task->IsSeqRunnerTask()) { // seems strange errMessage = "taskpool:: seqRunnerTask or executedTask cannot addDependency"; HILOG_ERROR("%{public}s", errMessage.c_str()); ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str()); @@ -940,7 +950,7 @@ napi_value Task::IsDone(napi_env env, napi_callback_info cbinfo) return NapiHelper::CreateBooleanValue(env, false); } - if (task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) { + if (task->taskState_ == ExecuteState::FINISHED) { return NapiHelper::CreateBooleanValue(env, true); } return NapiHelper::CreateBooleanValue(env, false); @@ -1005,6 +1015,30 @@ void Task::UpdateTaskType(TaskType taskType) { taskType_ = taskType; napi_reference_ref(env_, taskRef_, nullptr); + switch (taskType_) + { + case FUNCTION_TASK: + taskStrategy_ = std::make_unique(this); + break; + case COMMON_TASK: + taskStrategy_ = std::make_unique(this); + break; + case SEQRUNNER_TASK: + taskStrategy_ = std::make_unique(this); + break; + case ASYNCRUNNER_TASK: + taskStrategy_ = std::make_unique(this); + break; + case DELAYED_TASK: + taskStrategy_ = std::make_unique(this); + break; + case PERIODIC_TASK: + taskStrategy_ = std::make_unique(this); + break; + default: + HILOG_ERROR("taskpool:: The task type is not supported."); + break; + } } bool Task::IsRepeatableTask() const @@ -1032,6 +1066,10 @@ bool Task::IsCommonTask() const return taskType_ == TaskType::COMMON_TASK; } +bool Task::IsDelayedTask() const +{ + return taskType_ == TaskType::DELAYED_TASK; +} bool Task::IsSeqRunnerTask() const { return taskType_ == TaskType::SEQRUNNER_TASK; @@ -1063,48 +1101,14 @@ bool Task::IsExecuted() const return taskType_ != TaskType::TASK; } -TaskInfo* Task::GenerateTaskInfo(napi_env env, napi_value func, napi_value args, - napi_value transferList, napi_value cloneList, Priority priority, - bool defaultTransfer, bool defaultCloneSendable) -{ - HILOG_DEBUG("taskpool:: task GenerateTaskInfo"); - napi_value undefined = NapiHelper::GetUndefinedValue(env); - void* serializationFunction = nullptr; - napi_status status = napi_serialize_inner(env, func, undefined, undefined, - defaultTransfer, defaultCloneSendable, &serializationFunction); - std::string errMessage = ""; - if (status != napi_ok || serializationFunction == nullptr) { - errMessage = "taskpool: failed to serialize function."; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_CONCURRENT_FUNCTION, errMessage.c_str()); - return nullptr; - } - void* serializationArguments = nullptr; - status = napi_serialize_inner(env, args, transferList, cloneList, - defaultTransfer, defaultCloneSendable, &serializationArguments); - if (status != napi_ok || serializationArguments == nullptr) { - errMessage = "taskpool: failed to serialize arguments."; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str()); - return nullptr; - } - - TaskInfo* taskInfo = new TaskInfo(); - taskInfo->serializationFunction = serializationFunction; - taskInfo->serializationArguments = serializationArguments; - taskInfo->priority = priority; - reinterpret_cast(env)->IncreaseSubEnvCounter(); - return taskInfo; -} - void Task::IncreaseRefCount() { - taskRefCount_.fetch_add(2); // 2 : for PerformTask and TaskResultCallback + taskRefCount_ += 2; // 2 : for PerformTask and TaskResultCallback } void Task::DecreaseRefCount() { - taskRefCount_.fetch_sub(1); + taskRefCount_ -= 1; } bool Task::IsReadyToHandle() const @@ -1114,14 +1118,10 @@ bool Task::IsReadyToHandle() const void Task::NotifyPendingTask() { - HILOG_DEBUG("taskpool:: task:%{public}s NotifyPendingTask", std::to_string(taskId_).c_str()); TaskManager::GetInstance().NotifyDependencyTaskInfo(taskId_); - std::lock_guard lock(taskMutex_); delete currentTaskInfo_; if (pendingTaskInfos_.empty()) { currentTaskInfo_ = nullptr; - HILOG_DEBUG("taskpool:: task:%{public}s NotifyPendingTask end, currentTaskInfo_ nullptr", - std::to_string(taskId_).c_str()); return; } currentTaskInfo_ = pendingTaskInfos_.front(); @@ -1132,32 +1132,24 @@ void Task::NotifyPendingTask() void Task::CancelPendingTask(napi_env env) { - HILOG_DEBUG("taskpool:: task:%{public}s CancelPendingTask", std::to_string(taskId_).c_str()); - std::list deferreds {}; - { - std::lock_guard lock(taskMutex_); - if (pendingTaskInfos_.empty()) { - HILOG_DEBUG("taskpool:: task CancelPendingTask end, pendingTaskInfos_ nullptr"); - return; - } - auto engine = reinterpret_cast(env); - for (const auto& info : pendingTaskInfos_) { - engine->DecreaseSubEnvCounter(); - if (!IsPeriodicTask()) { - deferreds.push_back(info->deferred); - } - napi_reference_unref(env, taskRef_, nullptr); - delete info; - } - pendingTaskInfos_.clear(); + if (pendingTaskInfos_.empty()) { + return; + } + std::list deferreds; + auto engine = reinterpret_cast(env); + for (const auto& info : pendingTaskInfos_) { + engine->DecreaseSubEnvCounter(); + deferreds.push_back(info->deferred); + napi_reference_unref(env, taskRef_, nullptr); + delete info; } + pendingTaskInfos_.clear(); std::string error = "taskpool:: task has been canceled"; TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, error); } bool Task::UpdateTask(uint64_t startTime, void* worker) { - HILOG_DEBUG("taskpool:: task:%{public}s UpdateTask", std::to_string(taskId_).c_str()); if (taskState_ != ExecuteState::CANCELED) { taskState_ = ExecuteState::RUNNING; } @@ -1457,29 +1449,21 @@ void Task::InitHandle(napi_env env) void Task::ClearDelayedTimers() { - HILOG_DEBUG("taskpool:: task ClearDelayedTimers"); - std::list deferreds {}; - { - std::lock_guard lock(taskMutex_); - TaskMessage* taskMessage = nullptr; - for (auto t: delayedTimers_) { - if (t == nullptr) { - continue; - } - taskMessage = static_cast(t->data); - deferreds.push_back(taskMessage->deferred); - uv_timer_stop(t); - uv_close(reinterpret_cast(t), [](uv_handle_t* handle) { - delete (uv_timer_t*)handle; - handle = nullptr; - }); - delete taskMessage; - taskMessage = nullptr; + for (auto delayedTimer : delayedTimers_) { + if (delayedTimer == nullptr) { + continue; } - delayedTimers_.clear(); + TaskMessage* taskMessage = static_cast(delayedTimer->data); + uv_timer_stop(delayedTimer); + uv_close(reinterpret_cast(delayedTimer), [](uv_handle_t* handle) { + delete reinterpret_cast(handle); + }); + std::string_view error = "taskpool:: task has been canceled"; + napi_value message = ErrorHelper::NewError(env_, 0, error.data()); + napi_reject_deferred(env_, taskMessage->deferred, message); + delete taskMessage; } - std::string error = "taskpool:: task has been canceled"; - TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, error); + delayedTimers_.clear(); } bool Task::VerifyAndPostResult(Task* task, Priority priority) @@ -1685,32 +1669,72 @@ void Task::CancelInner(ExecuteState state) TaskManager::GetInstance().ClearDependentTask(taskId_); } std::list deferreds {}; - { - std::lock_guard lock(taskMutex_); - if (state == ExecuteState::WAITING && currentTaskInfo_ != nullptr && - TaskManager::GetInstance().EraseWaitingTaskId(taskId_, currentTaskInfo_->priority)) { - reinterpret_cast(env_)->DecreaseSubEnvCounter(); - DecreaseTaskLifecycleCount(); - TaskManager::GetInstance().DecreaseSendDataRefCount(env_, taskId_); - deferreds.push_back(currentTaskInfo_->deferred); - napi_reference_unref(env_, taskRef_, nullptr); - delete currentTaskInfo_; - currentTaskInfo_ = nullptr; - isCancelToFinish_ = true; - } - if (IsSeqRunnerTask() && state == ExecuteState::CANCELED) { - DisposeCanceledTask(); - return; - } - if (state == ExecuteState::DELAYED) { - isCancelToFinish_ = true; - } + if (state == ExecuteState::WAITING && currentTaskInfo_ != nullptr && + TaskManager::GetInstance().EraseWaitingTaskId(taskId_, currentTaskInfo_->priority)) { + reinterpret_cast(env_)->DecreaseSubEnvCounter(); + DecreaseTaskLifecycleCount(); + TaskManager::GetInstance().DecreaseSendDataRefCount(env_, taskId_); + deferreds.push_back(currentTaskInfo_->deferred); + napi_reference_unref(env_, taskRef_, nullptr); + delete currentTaskInfo_; + currentTaskInfo_ = nullptr; + isCancelToFinish_ = true; + } + if (IsSeqRunnerTask() && state == ExecuteState::CANCELED) { + DisposeCanceledTask(); + return; } std::string error = "taskpool:: task has been canceled"; TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, error); } -bool Task::IsSameEnv(napi_env env) +void Task::CancelDelayedTask(ExecuteState state) +{ + ClearDelayedTimers(); + CancelPendingTask(env_); + if (HasDependency()) { + TaskManager::GetInstance().ClearDependentTask(taskId_); + } + std::list deferreds {}; + if (state == ExecuteState::WAITING && currentTaskInfo_ != nullptr && + TaskManager::GetInstance().EraseWaitingTaskId(taskId_, currentTaskInfo_->priority)) { + reinterpret_cast(env_)->DecreaseSubEnvCounter(); + DecreaseTaskLifecycleCount(); + TaskManager::GetInstance().DecreaseSendDataRefCount(env_, taskId_); + deferreds.push_back(currentTaskInfo_->deferred); + napi_reference_unref(env_, taskRef_, nullptr); + delete currentTaskInfo_; + currentTaskInfo_ = nullptr; + isCancelToFinish_ = true; + } else if (state == ExecuteState::DELAYED) { + isCancelToFinish_ = true; + } + std::string error = "taskpool:: task has been canceled"; + TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, error); +} + +void Task::CancelCommonTask(ExecuteState state) +{ + CancelPendingTask(env_); + if (HasDependency()) { + TaskManager::GetInstance().ClearDependentTask(taskId_); + } + if (state == ExecuteState::WAITING && currentTaskInfo_ != nullptr && + TaskManager::GetInstance().EraseWaitingTaskId(taskId_, currentTaskInfo_->priority)) { + reinterpret_cast(env_)->DecreaseSubEnvCounter(); + DecreaseTaskLifecycleCount(); + TaskManager::GetInstance().DecreaseSendDataRefCount(env_, taskId_); + napi_reference_unref(env_, taskRef_, nullptr); + isCancelToFinish_ = true; + std::string_view error = "taskpool:: task has been canceled"; + napi_value message = ErrorHelper::NewError(env_, 0, error.data()); + napi_reject_deferred(env_, currentTaskInfo_->deferred, message); + delete currentTaskInfo_; + currentTaskInfo_ = nullptr; + } +} + +bool Task::IsSameEnv(napi_env env) const { return env_ == env; } @@ -1848,4 +1872,56 @@ uint32_t Task::GetTaskId() const { return taskId_; } + +bool Task::IsCanceled() const +{ + return taskState_ == ExecuteState::CANCELED; +} + +void Task::Cancel(napi_env env) +{ + if (taskStrategy_ == nullptr) { + HILOG_ERROR("taskpool:: taskStrategy_ is nullptr, %{public}u.", taskId_); + return; + } + taskStrategy_->Cancel(env); +} + +bool Task::IsFinished() const +{ + return taskState_ == ExecuteState::FINISHED; +} + +void Task::Handle(bool success, napi_value result) +{ + if (taskStrategy_ != nullptr) { + taskStrategy_->HandleTaskResult(success, result); + } +} + +void Task::HandelTaskResult(bool success, napi_value napiTaskResult) +{ + if (success) { + napi_resolve_deferred(env_, currentTaskInfo_->deferred, napiTaskResult); + if (onExecutionSucceededCallBackInfo_ != nullptr) { + ExecuteListenerCallback(onExecutionSucceededCallBackInfo_); + } + } else { + napi_reject_deferred(env_, currentTaskInfo_->deferred, napiTaskResult); + if (onExecutionFailedCallBackInfo_ != nullptr) { + onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult; + ExecuteListenerCallback(onExecutionFailedCallBackInfo_); + } + } +} + +napi_value Task::GetTaskInfoPromise(napi_env env, napi_value task, TaskType taskType, Priority priority) +{ + return nullptr; +} + +TaskInfo* Task::GetTaskInfo(napi_env env, napi_value task, Priority priority) +{ + return nullptr; +} } // namespace Commonlibrary::Concurrent::TaskPoolModule \ No newline at end of file diff --git a/js_concurrent_module/taskpool/task.h b/js_concurrent_module/taskpool/task.h index 07a7a1d955dc4f2ee45bfaef35a4b7f98084eeff..1d9c504b8124420fc5fc7ed24efad326abf1f7a7 100644 --- a/js_concurrent_module/taskpool/task.h +++ b/js_concurrent_module/taskpool/task.h @@ -30,6 +30,7 @@ #include "napi/native_node_api.h" #include "utils.h" #include "tools/log.h" +#include "task_strategy.h" #if defined(ENABLE_TASKPOOL_EVENTHANDLER) #include "event_handler.h" #endif @@ -43,7 +44,7 @@ namespace Commonlibrary::Concurrent::TaskPoolModule { using namespace Commonlibrary::Platform; extern const std::unordered_map g_napiPriorityMap; -enum ExecuteState { NOT_FOUND, WAITING, RUNNING, CANCELED, FINISHED, DELAYED, ENDING}; +enum ExecuteState { NOT_FOUND, WAITING, RUNNING, CANCELED, FINISHED, DELAYED}; enum TaskType { TASK, FUNCTION_TASK, @@ -51,7 +52,9 @@ enum TaskType { COMMON_TASK, GROUP_COMMON_TASK, GROUP_FUNCTION_TASK, - ASYNCRUNNER_TASK + ASYNCRUNNER_TASK, + DELAYED_TASK, + PERIODIC_TASK }; struct GroupInfo; @@ -153,6 +156,7 @@ public: bool IsSeqRunnerTask() const; bool IsFunctionTask() const; bool IsLongTask() const; + bool IsDelayedTask() const; bool IsPeriodicTask() const; bool IsMainThreadTask() const; bool IsExecuted() const; @@ -184,7 +188,7 @@ public: void SetTaskId(uint32_t taskId); void TriggerCancel(CancelTaskMessage* message); void CancelInner(ExecuteState state); - bool IsSameEnv(napi_env env); + bool IsSameEnv(napi_env env) const; void DiscardAsyncRunnerTask(DiscardTaskMessage* message); void DiscardInner(DiscardTaskMessage* message); void ReleaseData(); @@ -192,6 +196,15 @@ public: Worker* GetWorker() const; napi_env GetEnv() const; uint32_t GetTaskId() const; + bool IsCanceled() const; + bool IsFinished() const; + void Cancel(napi_env env); + void CancelCommonTask(ExecuteState state); + void CancelDelayedTask(ExecuteState state); + void Handle(bool success, napi_value result); + void HandelTaskResult(bool success, napi_value napiTaskResult); + TaskInfo* GenerateTaskInfo(napi_env env, napi_value task, Priority priority); + TaskInfo* GenerateTaskInfoInner(napi_env env, napi_value napiTask, Priority priority); private: Task(const Task &) = delete; @@ -247,6 +260,7 @@ public: bool isMainThreadTask_ {false}; Priority asyncTaskPriority_ {Priority::DEFAULT}; std::atomic isCancelToFinish_ {false}; + std::unique_ptr taskStrategy_ {}; }; struct CallbackInfo { diff --git a/js_concurrent_module/taskpool/task_group.cpp b/js_concurrent_module/taskpool/task_group.cpp index f5d61fe71a9d6222ebad2dbd2f92d6f8803db1b6..404ab6d29c83797fdab561936192fc7a51419025 100644 --- a/js_concurrent_module/taskpool/task_group.cpp +++ b/js_concurrent_module/taskpool/task_group.cpp @@ -17,6 +17,7 @@ #include "helper/concurrent_helper.h" #include "task_group_manager.h" +#include "taskpool.h" namespace Commonlibrary::Concurrent::TaskPoolModule { using namespace Commonlibrary::Concurrent::Common::Helper; @@ -31,123 +32,69 @@ napi_value TaskGroup::TaskGroupConstructor(napi_env env, napi_callback_info cbin ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be zero or one."); return nullptr; } - napi_value name; + napi_value name = NapiHelper::CreateEmptyString(env); if (argc == 1) { - // check 1st param is taskGroupName if (!NapiHelper::IsString(env, args[0])) { ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the first param must be string."); return nullptr; } name = args[0]; - } else { - name = NapiHelper::CreateEmptyString(env); } TaskGroup* group = new TaskGroup(env); - uint64_t groupId = reinterpret_cast(group); - group->groupId_ = groupId; - TaskGroupManager::GetInstance().StoreTaskGroup(groupId, group); + auto groupId = group->GetGroupId(); group->InitHandle(env); napi_value napiGroupId = NapiHelper::CreateUint64(env, groupId); napi_property_descriptor properties[] = { - DECLARE_NAPI_PROPERTY(GROUP_ID_STR, napiGroupId), + DECLARE_NAPI_PROPERTY("name", name), + DECLARE_NAPI_PROPERTY("groupId", napiGroupId), DECLARE_NAPI_FUNCTION_WITH_DATA("addTask", AddTask, thisVar), }; - napi_set_named_property(env, thisVar, NAME, name); napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties); napi_status status = napi_wrap(env, thisVar, group, TaskGroupDestructor, nullptr, nullptr); if (status != napi_ok) { HILOG_ERROR("taskpool::TaskGroupConstructor napi_wrap return value is %{public}d", status); - TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_); delete group; - group = nullptr; return nullptr; } - napi_create_reference(env, thisVar, 0, &group->groupRef_); + napi_create_reference(env, thisVar, 0, &group->GetGroupRef()); napi_add_env_cleanup_hook(env, TaskGroup::HostEnvCleanupHook, group); + TaskGroupManager::GetInstance().StoreTaskGroup(groupId, group); return thisVar; } void TaskGroup::TaskGroupDestructor(napi_env env, void* data, [[maybe_unused]] void* hint) { - HILOG_DEBUG("taskpool::TaskGroupDestructor"); TaskGroup* group = static_cast(data); napi_remove_env_cleanup_hook(env, TaskGroup::HostEnvCleanupHook, group); TaskGroupManager::GetInstance().ReleaseTaskGroupData(env, group); - napi_delete_reference(env, group->groupRef_); + napi_delete_reference(env, group->GetGroupRef()); delete group; } napi_value TaskGroup::AddTask(napi_env env, napi_callback_info cbinfo) { size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo); - std::string errMessage = ""; if (argc < 1) { - errMessage = "taskGroup:: the number of params must be at least one"; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one."); + std::string_view errMessage = "the number of params must be at least one"; + HILOG_ERROR("taskGroup:: %{public}s", errMessage.data()); + ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.data()); return nullptr; } napi_value* args = new napi_value[argc]; ObjectScope scope(args, true); napi_value thisVar; napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr); - napi_value napiGroupId = NapiHelper::GetNameProperty(env, thisVar, GROUP_ID_STR); - uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId); - TaskGroup* group = TaskGroupManager::GetInstance().GetTaskGroup(groupId); - if (group == nullptr || group->groupState_ != ExecuteState::NOT_FOUND) { - errMessage = "taskpool:: executed taskGroup cannot addTask"; - HILOG_ERROR("%{public}s", errMessage.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str()); - return nullptr; + TaskGroup* group = nullptr; + napi_unwrap(env, thisVar, reinterpret_cast(&group)); + if (group != nullptr) { + group->AddTask(argc, args); } - napi_valuetype type = napi_undefined; - napi_typeof(env, args[0], &type); - if (type == napi_object) { - Task* task = nullptr; - napi_unwrap(env, args[0], reinterpret_cast(&task)); - if (task == nullptr) { - ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task."); - return nullptr; - } - if (!task->CanForTaskGroup(env)) { - return nullptr; - } - task->taskType_ = TaskType::GROUP_COMMON_TASK; - task->groupId_ = groupId; - napi_reference_ref(env, task->taskRef_, nullptr); - TaskGroupManager::GetInstance().AddTask(groupId, task->taskRef_, task->taskId_); - return nullptr; - } else if (type == napi_function) { - napi_value napiTask = NapiHelper::CreateObject(env); - Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::GROUP_FUNCTION_TASK); - if (task == nullptr) { - return nullptr; - } - task->groupId_ = groupId; - napi_status status = napi_wrap(env, napiTask, task, Task::TaskDestructor, nullptr, nullptr); - if (status != napi_ok) { - HILOG_ERROR("taskpool::AddTask napi_wrap return value is %{public}d", status); - TaskManager::GetInstance().RemoveTask(task->taskId_); - delete task; - task = nullptr; - return nullptr; - } - napi_create_reference(env, napiTask, 1, &task->taskRef_); - TaskGroupManager::GetInstance().AddTask(groupId, task->taskRef_, task->taskId_); - return nullptr; - } - ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be object or function."); return nullptr; } void TaskGroup::HostEnvCleanupHook(void* data) { - if (data == nullptr) { - HILOG_ERROR("taskpool:: taskGroup cleanupHook arg is nullptr"); - return; - } TaskGroup* group = static_cast(data); - std::lock_guard lock(group->taskGroupMutex_); ConcurrentHelper::UvHandleClose(group->onRejectResultSignal_); group->isValid_ = false; } @@ -179,68 +126,63 @@ uint32_t TaskGroup::GetTaskIndex(uint32_t taskId) } return index; } - -void TaskGroup::NotifyGroupTask(napi_env env) +void TaskGroup::NotifyGroupTask() { - HILOG_DEBUG("taskpool:: NotifyGroupTask"); - std::lock_guard lock(taskGroupMutex_); if (pendingGroupInfos_.empty()) { return; } + groupState_ = ExecuteState::WAITING; currentGroupInfo_ = pendingGroupInfos_.front(); pendingGroupInfos_.pop_front(); - for (auto iter = taskRefs_.begin(); iter != taskRefs_.end(); iter++) { - napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter); + for (napi_ref taskRef : taskRefs_) { + napi_value napiTask = NapiHelper::GetReferenceValue(env_, taskRef); Task* task = nullptr; - napi_unwrap(env, napiTask, reinterpret_cast(&task)); + napi_unwrap(env_, napiTask, reinterpret_cast(&task)); + if (task == nullptr) { HILOG_ERROR("taskpool::ExecuteGroup task is nullptr"); return; } - napi_reference_ref(env, task->taskRef_, nullptr); + + napi_reference_ref(env_, task->taskRef_, nullptr); + task->IncreaseRefCount(); + TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_); + task->taskState_ = ExecuteState::WAITING; Priority priority = currentGroupInfo_->priority; + if (task->IsGroupCommonTask()) { - task->GetTaskInfo(env, napiTask, priority); + task->GetTaskInfo(env_, napiTask, priority); } else { - reinterpret_cast(env)->IncreaseSubEnvCounter(); + reinterpret_cast(env_)->IncreaseSubEnvCounter(); } - task->IncreaseRefCount(); - TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_); - task->taskState_ = ExecuteState::WAITING; TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority); } } -void TaskGroup::CancelPendingGroup(napi_env env) +void TaskGroup::CancelPendingGroup() { - HILOG_DEBUG("taskpool:: CancelPendingGroup"); - std::list deferreds {}; - { - std::lock_guard lock(taskGroupMutex_); - if (pendingGroupInfos_.empty()) { - return; - } - auto pendingIter = pendingGroupInfos_.begin(); - auto engine = reinterpret_cast(env); - for (; pendingIter != pendingGroupInfos_.end(); ++pendingIter) { - for (size_t i = 0; i < taskIds_.size(); i++) { - engine->DecreaseSubEnvCounter(); - } - GroupInfo* info = *pendingIter; - deferreds.push_back(info->deferred); - napi_reference_unref(env, groupRef_, nullptr); - delete info; + if (pendingGroupInfos_.empty()) { + return; + } + std::list deferreds; + auto engine = reinterpret_cast(env_); + for (GroupInfo* info : pendingGroupInfos_) { + // Decrease counter for each task in the group + for (size_t i = 0; i < taskIds_.size(); i++) { + engine->DecreaseSubEnvCounter(); } - pendingIter = pendingGroupInfos_.begin(); - pendingGroupInfos_.erase(pendingIter, pendingGroupInfos_.end()); + deferreds.push_back(info->deferred); + napi_reference_unref(env_, groupRef_, nullptr); + delete info; } - TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, "taskpool:: taskGroup has been canceled"); + pendingGroupInfos_.clear(); + TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, "taskpool:: taskGroup has been canceled"); } void TaskGroup::CancelGroupTask(napi_env env, uint32_t taskId) { - TaskGroupManager::GetInstance().CancelGroupTask(env_, taskId, this); + // TaskGroupManager::GetInstance().CancelGroupTask(env_, taskId, this); if (IsSameEnv(env)) { RejectResult(env); return; @@ -261,7 +203,6 @@ void TaskGroup::RejectResult(napi_env env) { std::list deferreds {}; { - std::lock_guard lock(taskGroupMutex_); if (currentGroupInfo_ != nullptr && currentGroupInfo_->finishedTaskNum == taskNum_) { deferreds.push_back(currentGroupInfo_->deferred); napi_delete_reference(env, currentGroupInfo_->resArr); @@ -282,7 +223,6 @@ void TaskGroup::InitHandle(napi_env env) void TaskGroup::TriggerRejectResult() { - std::lock_guard lock(taskGroupMutex_); ConcurrentHelper::UvCheckAndAsyncSend(onRejectResultSignal_); } @@ -290,4 +230,260 @@ bool TaskGroup::IsSameEnv(napi_env env) { return env_ == env; } + +bool TaskGroup::HasExecuted() const +{ + return groupState_ != ExecuteState::NOT_FOUND; +} + +void TaskGroup::AddTask(size_t argc, napi_value* args) +{ + if (HasExecuted()) { + std::string_view errMessage = "taskpool:: executed taskGroup cannot addTask"; + HILOG_ERROR("%{public}s", errMessage.data()); + ErrorHelper::ThrowError(env_, ErrorHelper::TYPE_ERROR, errMessage.data()); + return; + } + napi_valuetype type = napi_undefined; + napi_typeof(env_, args[0], &type); + if (type == napi_object) { + Task* task = nullptr; + napi_unwrap(env_, args[0], reinterpret_cast(&task)); + if (task == nullptr) { + ErrorHelper::ThrowError(env_, ErrorHelper::TYPE_ERROR, "the type of the params must be task."); + return; + } + if (!task->CanForTaskGroup(env_)) { + return; + } + task->taskType_ = TaskType::GROUP_COMMON_TASK; + task->groupId_ = groupId_; + napi_reference_ref(env_, task->taskRef_, nullptr); + taskRefs_.push_back(task->taskRef_); + taskNum_++; + taskIds_.push_back(task->taskId_); + return; + } + if (type == napi_function) { + napi_value napiTask = NapiHelper::CreateObject(env_); + Task* task = Task::GenerateFunctionTask(env_, args[0], args + 1, argc - 1, TaskType::GROUP_FUNCTION_TASK); + if (task == nullptr) { + return; + } + task->groupId_ = groupId_; + napi_status status = napi_wrap(env_, napiTask, task, Task::TaskDestructor, nullptr, nullptr); + if (status != napi_ok) { + HILOG_ERROR("taskpool::AddTask napi_wrap return value is %{public}d", status); + TaskManager::GetInstance().RemoveTask(task->taskId_); + delete task; + return; + } + napi_create_reference(env_, napiTask, 1, &task->taskRef_); + taskRefs_.push_back(task->taskRef_); + taskNum_++; + taskIds_.push_back(task->taskId_); + return; + } + ErrorHelper::ThrowError(env_, ErrorHelper::TYPE_ERROR, "the type of the first param must be object or function."); +} + +uint64_t TaskGroup::GetGroupId() const +{ + return groupId_; +} + +napi_ref& TaskGroup::GetGroupRef() +{ + return groupRef_; +} + +ExecuteState TaskGroup::GetGroupState() const +{ + return groupState_; +} + +void TaskGroup::SetGroupState(ExecuteState state) +{ + groupState_ = state; +} + +uint32_t TaskGroup::GetGroupTasksNum() const +{ + return taskNum_; +} + +void TaskGroup::ReleaseGroupData() +{ + if (onRejectResultSignal_ != nullptr) { + if (!ConcurrentHelper::IsUvClosing(onRejectResultSignal_)) { + ConcurrentHelper::UvHandleClose(onRejectResultSignal_); + } else { + delete onRejectResultSignal_; + onRejectResultSignal_ = nullptr; + } + } + if (isValid_) { + for (uint32_t taskId : taskIds_) { + Task* task = TaskManager::GetInstance().GetTask(taskId); + if (task == nullptr || !task->IsValid()) { + continue; + } + napi_reference_unref(task->env_, task->taskRef_, nullptr); + } + } + if (currentGroupInfo_ != nullptr) { + delete currentGroupInfo_; + currentGroupInfo_ = nullptr; + } + CancelPendingGroup(); +} + +void TaskGroup::CancelGroup() +{ + if (groupState_ == ExecuteState::CANCELED) { + return; + } + if (currentGroupInfo_ == nullptr || groupState_ == ExecuteState::NOT_FOUND || + groupState_ == ExecuteState::FINISHED) { + std::string_view errMsg = "taskpool:: taskGroup is not executed or has been executed"; + HILOG_ERROR("%{public}s", errMsg.data()); + ErrorHelper::ThrowError(env_, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.data()); + return; + } + ExecuteState groupState = groupState_.exchange(ExecuteState::CANCELED); + CancelPendingGroup(); + if (currentGroupInfo_->finishedTaskNum != taskNum_) { + for (uint32_t taskId : taskIds_) { + CancelAllTasks(taskId); + } + if (currentGroupInfo_->finishedTaskNum == taskNum_) { + napi_value error = ErrorHelper::NewError(env_, 0, "taskpool:: taskGroup has been canceled"); + RejectResult(env_, error); + return; + } + } + if (groupState == ExecuteState::WAITING && currentGroupInfo_ != nullptr) { + auto engine = reinterpret_cast(env_); + for (size_t i = 0; i < taskIds_.size(); i++) { + engine->DecreaseSubEnvCounter(); + } + napi_value error = ErrorHelper::NewError(env_, 0, "taskpool:: taskGroup has been canceled"); + RejectResult(env_, error); + } +} + +void TaskGroup::CancelAllTasks(uint32_t taskId) +{ + auto task = TaskManager::GetInstance().GetTask(taskId); + if (task == nullptr) { + HILOG_ERROR("taskpool:: CancelGroupTask task is nullptr"); + return; + } + if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr && + TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) { + reinterpret_cast(env_)->DecreaseSubEnvCounter(); + task->DecreaseTaskLifecycleCount(); + TaskManager::GetInstance().DecreaseSendDataRefCount(env_, taskId); + delete task->currentTaskInfo_; + task->currentTaskInfo_ = nullptr; + if (currentGroupInfo_ != nullptr) { + currentGroupInfo_->finishedTaskNum++; + } + } + task->taskState_ = ExecuteState::CANCELED; +} + +napi_value TaskGroup::Execute(napi_value napiTaskGroup, Priority priority) +{ + napi_reference_ref(env_, groupRef_, nullptr); + if (groupState_ == ExecuteState::NOT_FOUND || groupState_ == ExecuteState::FINISHED || + groupState_ == ExecuteState::CANCELED) { + groupState_ = ExecuteState::WAITING; + } + GroupInfo* groupInfo = new GroupInfo(); + groupInfo->priority = priority; + napi_value resArr; + napi_create_array_with_length(env_, taskNum_, &resArr); + napi_ref arrRef = NapiHelper::CreateReference(env_, resArr, 1); + groupInfo->resArr = arrRef; + napi_value promise = NapiHelper::CreatePromise(env_, &groupInfo->deferred); + if (taskNum_ == 0) { + napi_resolve_deferred(env_, groupInfo->deferred, resArr); + groupState_ = ExecuteState::FINISHED; + napi_delete_reference(env_, groupInfo->resArr); + napi_reference_unref(env_, groupRef_, nullptr); + delete groupInfo; + currentGroupInfo_ = nullptr; + return promise; + } + if (currentGroupInfo_ == nullptr) { + currentGroupInfo_ = groupInfo; + for (auto iter = taskRefs_.begin(); iter != taskRefs_.end(); iter++) { + napi_value napiTask = NapiHelper::GetReferenceValue(env_, *iter); + Task* task = nullptr; + napi_unwrap(env_, napiTask, reinterpret_cast(&task)); + if (task == nullptr) { + HILOG_ERROR("taskpool::ExecuteGroup task is nullptr"); + return nullptr; + } + napi_reference_ref(env_, task->taskRef_, nullptr); + if (task->IsGroupCommonTask()) { + task->GetTaskInfo(env_, napiTask, static_cast(priority)); + } + TaskPool::ExecuteTask(env_, task, static_cast(priority)); + } + } else { + pendingGroupInfos_.push_back(groupInfo); + } + return promise; +} + +void TaskGroup::UpdateGroupResult(bool success, Task* task, napi_value res) +{ + if (currentGroupInfo_ == nullptr) { + return; + } + uint32_t index = GetTaskIndex(task->taskId_); + auto groupInfo = currentGroupInfo_; + napi_ref arrRef = groupInfo->resArr; + napi_value resArr = NapiHelper::GetReferenceValue(env_, arrRef); + napi_set_element(env_, resArr, index, res); + groupInfo->finishedTaskNum++; + // store the index when the first exception occurs + if (!success && !groupInfo->HasException()) { + groupInfo->SetFailedIndex(index); + } + // we will not handle the result until all tasks are finished + if (groupInfo->finishedTaskNum < taskNum_) { + return; + } + // if there is no exception, just resolve + if (!groupInfo->HasException()) { + HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str()); + napi_resolve_deferred(env_, groupInfo->deferred, resArr); + for (uint32_t taskId : taskIds_) { + auto task = TaskManager::GetInstance().GetTask(taskId); + if (task != nullptr && task->onExecutionSucceededCallBackInfo_ != nullptr) { + task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_); + } + } + } else { + napi_value res = nullptr; + napi_get_element(env_, resArr, groupInfo->GetFailedIndex(), &res); + napi_reject_deferred(env_, groupInfo->deferred, res); + auto iter = taskIds_.begin(); + std::advance(iter, groupInfo->GetFailedIndex()); + auto task = iter != taskIds_.end() ? TaskManager::GetInstance().GetTask(*iter) : nullptr; + if (task != nullptr && task->onExecutionFailedCallBackInfo_ != nullptr) { + task->onExecutionFailedCallBackInfo_->taskError_ = res; + task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_); + } + } + groupState_ = ExecuteState::FINISHED; + napi_delete_reference(env_, groupInfo->resArr); + napi_reference_unref(env_, groupRef_, nullptr); + delete groupInfo; + currentGroupInfo_ = nullptr; + NotifyGroupTask(); +} } // namespace Commonlibrary::Concurrent::TaskPoolModule \ No newline at end of file diff --git a/js_concurrent_module/taskpool/task_group.h b/js_concurrent_module/taskpool/task_group.h index e5dab8d4ff1132613af4d5145e11e46aff5ccd0b..3a3b5c62a5c08adafd34133b0249a25b151d73bb 100644 --- a/js_concurrent_module/taskpool/task_group.h +++ b/js_concurrent_module/taskpool/task_group.h @@ -50,7 +50,7 @@ private: class TaskGroup { public: - explicit TaskGroup(napi_env env) : env_(env) {} + explicit TaskGroup(napi_env env) : env_(env), groupId_(reinterpret_cast(this)) {} TaskGroup() = default; ~TaskGroup() = default; @@ -60,14 +60,27 @@ public: static void StartRejectResult(const uv_async_t* req); uint32_t GetTaskIndex(uint32_t taskId); - void NotifyGroupTask(napi_env env); - void CancelPendingGroup(napi_env env); - void CancelGroupTask(napi_env env, uint32_t taskId); + void NotifyGroupTask(); + void CancelPendingGroup(); void RejectResult(napi_env env, napi_value res); void RejectResult(napi_env env); void InitHandle(napi_env env); void TriggerRejectResult(); bool IsSameEnv(napi_env env); + void Execute(); + bool HasExecuted() const; + void AddTask(size_t argc, napi_value* args); + uint64_t GetGroupId() const; + napi_ref& GetGroupRef(); + ExecuteState GetGroupState() const; + void SetGroupState(ExecuteState state); + uint32_t GetGroupTasksNum() const; + void ReleaseGroupData(); + void CancelGroup(); + void CancelAllTasks(uint32_t taskId); + napi_value Execute(napi_value napiTaskGroup, Priority priority); + void CancelGroupTask(napi_env env, uint32_t taskId); + void UpdateGroupResult(bool success, Task* task, napi_value res); private: TaskGroup(const TaskGroup &) = delete; @@ -78,7 +91,7 @@ private: static void TaskGroupDestructor(napi_env env, void* data, void* hint); friend class NativeEngineTest; -public: + napi_env env_ = nullptr; uint64_t groupId_ {}; GroupInfo* currentGroupInfo_ {}; @@ -88,7 +101,6 @@ public: uint32_t taskNum_ {}; std::atomic groupState_ {ExecuteState::NOT_FOUND}; napi_ref groupRef_ {}; - std::recursive_mutex taskGroupMutex_ {}; uv_async_t* onRejectResultSignal_ = nullptr; std::atomic isValid_ {true}; }; diff --git a/js_concurrent_module/taskpool/task_group_manager.cpp b/js_concurrent_module/taskpool/task_group_manager.cpp index c5d1e0d4609563e48c1d4f4f3bf74fc3f1aecfff..cf2dbb1c8a591bff31c4b5e514368f979631324b 100644 --- a/js_concurrent_module/taskpool/task_group_manager.cpp +++ b/js_concurrent_module/taskpool/task_group_manager.cpp @@ -24,56 +24,13 @@ TaskGroupManager& TaskGroupManager::GetInstance() return groupManager; } -void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint32_t taskId) -{ - std::lock_guard lock(taskGroupsMutex_); - auto groupIter = taskGroups_.find(groupId); - if (groupIter == taskGroups_.end()) { - HILOG_DEBUG("taskpool:: taskGroup has been released"); - return; - } - auto taskGroup = reinterpret_cast(groupIter->second); - if (taskGroup == nullptr) { - HILOG_ERROR("taskpool:: taskGroup is null"); - return; - } - taskGroup->taskRefs_.push_back(taskRef); - taskGroup->taskNum_++; - taskGroup->taskIds_.push_back(taskId); -} - void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group) { - HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group"); - TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_); - { - std::lock_guard lock(group->taskGroupMutex_); - if (group->onRejectResultSignal_ != nullptr) { - if (!ConcurrentHelper::IsUvClosing(group->onRejectResultSignal_)) { - ConcurrentHelper::UvHandleClose(group->onRejectResultSignal_); - } else { - delete group->onRejectResultSignal_; - group->onRejectResultSignal_ = nullptr; - } - } - if (group->isValid_) { - for (uint32_t taskId : group->taskIds_) { - Task* task = TaskManager::GetInstance().GetTask(taskId); - if (task == nullptr || !task->IsValid()) { - continue; - } - napi_reference_unref(task->env_, task->taskRef_, nullptr); - } - } - if (group->currentGroupInfo_ != nullptr) { - delete group->currentGroupInfo_; - group->currentGroupInfo_ = nullptr; - } - } - group->CancelPendingGroup(env); + TaskGroupManager::GetInstance().RemoveTaskGroup(group->GetGroupId()); + group->ReleaseGroupData(); } -void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId) +void TaskGroupManager::CancelGroup(uint64_t groupId) { std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId); HITRACE_HELPER_METER_NAME(strTrace); @@ -83,64 +40,7 @@ void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId) HILOG_ERROR("taskpool:: CancelGroup group is nullptr"); return; } - if (taskGroup->groupState_ == ExecuteState::CANCELED) { - return; - } - { - std::lock_guard lock(taskGroup->taskGroupMutex_); - if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND || - taskGroup->groupState_ == ExecuteState::FINISHED) { - std::string errMsg = "taskpool:: taskGroup is not executed or has been executed"; - HILOG_ERROR("%{public}s", errMsg.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str()); - return; - } - } - ExecuteState groupState = taskGroup->groupState_; - taskGroup->groupState_ = ExecuteState::CANCELED; - taskGroup->CancelPendingGroup(env); - std::lock_guard lock(taskGroup->taskGroupMutex_); - if (taskGroup->currentGroupInfo_->finishedTaskNum != taskGroup->taskNum_) { - for (uint32_t taskId : taskGroup->taskIds_) { - CancelGroupTask(env, taskId, taskGroup); - } - if (taskGroup->currentGroupInfo_->finishedTaskNum == taskGroup->taskNum_) { - napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled"); - taskGroup->RejectResult(env, error); - return; - } - } - if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) { - auto engine = reinterpret_cast(env); - for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) { - engine->DecreaseSubEnvCounter(); - } - napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled"); - taskGroup->RejectResult(env, error); - } -} - -void TaskGroupManager::CancelGroupTask(napi_env env, uint32_t taskId, TaskGroup* group) -{ - HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str()); - auto task = TaskManager::GetInstance().GetTask(taskId); - if (task == nullptr) { - HILOG_INFO("taskpool:: CancelGroupTask task is nullptr"); - return; - } - std::lock_guard lock(task->taskMutex_); - if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr && - TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) { - reinterpret_cast(env)->DecreaseSubEnvCounter(); - task->DecreaseTaskLifecycleCount(); - TaskManager::GetInstance().DecreaseSendDataRefCount(env, taskId); - delete task->currentTaskInfo_; - task->currentTaskInfo_ = nullptr; - if (group->currentGroupInfo_ != nullptr) { - group->currentGroupInfo_->finishedTaskNum++; - } - } - task->taskState_ = ExecuteState::CANCELED; + taskGroup->CancelGroup(); } void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup) @@ -176,11 +76,11 @@ bool TaskGroupManager::UpdateGroupState(uint64_t groupId) return false; } TaskGroup* group = reinterpret_cast(groupIter->second); - if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) { + if (group == nullptr || group->GetGroupState() == ExecuteState::CANCELED) { HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled"); return false; } - group->groupState_ = ExecuteState::RUNNING; + group->SetGroupState(ExecuteState::RUNNING); return true; } } // namespace Commonlibrary::Concurrent::TaskPoolModule \ No newline at end of file diff --git a/js_concurrent_module/taskpool/task_group_manager.h b/js_concurrent_module/taskpool/task_group_manager.h index f1c70a3aa584feb0b5db020f26a22560b96ce847..a866450bceef30cf7742073c6814dd780de36d7e 100644 --- a/js_concurrent_module/taskpool/task_group_manager.h +++ b/js_concurrent_module/taskpool/task_group_manager.h @@ -23,14 +23,12 @@ class TaskGroupManager { public: static TaskGroupManager& GetInstance(); - void AddTask(uint64_t groupId, napi_ref taskRef, uint32_t taskId); void StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup); void RemoveTaskGroup(uint64_t groupId); TaskGroup* GetTaskGroup(uint64_t groupId); - void CancelGroup(napi_env env, uint64_t groupId); - void CancelGroupTask(napi_env env, uint32_t taskId, TaskGroup* group); void ReleaseTaskGroupData(napi_env env, TaskGroup* group); bool UpdateGroupState(uint64_t groupId); + void CancelGroup(uint64_t groupId); private: TaskGroupManager() = default; diff --git a/js_concurrent_module/taskpool/task_manager.cpp b/js_concurrent_module/taskpool/task_manager.cpp index 77188f5d850e73db4f802b2da4cfce06f8285f07..f27ac335491b6e6e993fe43148811052f075e844 100644 --- a/js_concurrent_module/taskpool/task_manager.cpp +++ b/js_concurrent_module/taskpool/task_manager.cpp @@ -190,7 +190,7 @@ napi_value TaskManager::GetTaskInfos(napi_env env) std::lock_guard lock(tasksMutex_); int32_t i = 0; for (const auto& [_, task] : tasks_) { - if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED || + if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) { continue; } @@ -201,7 +201,7 @@ napi_value TaskManager::GetTaskInfos(napi_env env) napi_set_named_property(env, taskInfoValue, "name", name); ExecuteState state = task->taskState_; uint64_t duration = 0; - if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) { + if (state == ExecuteState::RUNNING) { duration = ConcurrentHelper::GetMilliseconds() - task->startTime_; } napi_value stateValue = NapiHelper::CreateUint32(env, static_cast(state)); @@ -629,10 +629,6 @@ void TaskManager::RunTaskManager() void TaskManager::CancelTask(napi_env env, uint32_t taskId) { - // 1. Cannot find taskInfo by executeId, throw error - // 2. Find executing taskInfo, skip it - // 3. Find waiting taskInfo, cancel it - // 4. Find canceled taskInfo, skip it std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId); HILOG_INFO("taskpool:: %{public}s", strTrace.c_str()); HITRACE_HELPER_METER_NAME(strTrace); @@ -643,70 +639,17 @@ void TaskManager::CancelTask(napi_env env, uint32_t taskId) ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); return; } - if (task->taskState_ == ExecuteState::CANCELED) { - HILOG_DEBUG("taskpool:: task has been canceled"); + if (task->IsCanceled()) { return; } - if (task->IsGroupCommonTask()) { - // when task is a group common task, still check the state - if (task->currentTaskInfo_ == nullptr || task->taskState_ == ExecuteState::NOT_FOUND || - task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) { - std::string errMsg = "taskpool:: task is not executed or has been executed"; - HILOG_ERROR("%{public}s", errMsg.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); - return; - } - TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_); - if (taskGroup == nullptr) { - return; - } - return taskGroup->CancelGroupTask(env, task->taskId_); - } - if (task->IsPeriodicTask()) { - task->taskState_.exchange(ExecuteState::CANCELED); - return; - } - if (task->IsSeqRunnerTask()) { - CancelSeqRunnerTask(env, task); - return; - } - if (task->IsAsyncRunnerTask()) { - AsyncRunnerManager::GetInstance().CancelAsyncRunnerTask(env, task); - return; - } - ExecuteState state = ExecuteState::NOT_FOUND; - { - std::lock_guard lock(task->taskMutex_); - if (task->taskState_ == ExecuteState::CANCELED) { - HILOG_DEBUG("taskpool:: task has been canceled"); - return; - } - if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) || - task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED || - task->taskState_ == ExecuteState::ENDING) { - std::string errMsg = "taskpool:: task is not executed or has been executed"; - HILOG_ERROR("%{public}s", errMsg.c_str()); - ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); - return; - } - state = task->taskState_.exchange(ExecuteState::CANCELED); - } - if (task->IsSameEnv(env)) { - task->CancelInner(state); - return; - } - CancelTaskMessage* message = new CancelTaskMessage(state, task->taskId_); - task->TriggerCancel(message); + task->Cancel(env); } void TaskManager::CancelSeqRunnerTask(napi_env env, Task* task) { - { - std::lock_guard lock(task->taskMutex_); - if (task->taskState_ != ExecuteState::FINISHED) { - task->taskState_ = ExecuteState::CANCELED; - return; - } + if (task->taskState_ != ExecuteState::FINISHED) { + task->taskState_ = ExecuteState::CANCELED; + return; } std::string errMsg = "taskpool:: sequenceRunner task has been executed"; HILOG_ERROR("%{public}s", errMsg.c_str()); @@ -1053,7 +996,6 @@ void TaskManager::ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, Task void TaskManager::NotifyDependencyTaskInfo(uint32_t taskId) { - HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str()); HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); std::unique_lock lock(dependentTaskInfosMutex_); auto iter = dependentTaskInfos_.find(taskId); @@ -1108,12 +1050,12 @@ bool TaskManager::IsDependentByTaskId(uint32_t dependentTaskId) bool TaskManager::StoreTaskDependency(uint32_t taskId, std::set taskIdSet) { HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str()); - StoreDependentTaskInfo(taskIdSet, taskId); + StoreDependentTaskInfo(taskIdSet, taskId); // 反向依赖记录 std::unique_lock lock(dependTaskInfosMutex_); - auto iter = dependTaskInfos_.find(taskId); - if (iter == dependTaskInfos_.end()) { - for (const auto& dependentId : taskIdSet) { - auto idIter = dependTaskInfos_.find(dependentId); + auto iter = dependTaskInfos_.find(taskId); // 查找当前Task的依赖集合 + if (iter == dependTaskInfos_.end()) { // 当前Task无过往的依赖关系 + for (const auto& dependentId : taskIdSet) { // 待添加的依赖集合 + auto idIter = dependTaskInfos_.find(dependentId); // 查找依赖集合 if (idIter == dependTaskInfos_.end()) { continue; } @@ -1144,7 +1086,7 @@ bool TaskManager::CheckCircularDependency(std::set dependentIdSet, std if (id == taskId) { return false; } - auto iter = dependentIdSet.find(id); + auto iter = dependentIdSet.find(id); // 待添加的id集合 if (iter != dependentIdSet.end()) { continue; } diff --git a/js_concurrent_module/taskpool/task_manager.h b/js_concurrent_module/taskpool/task_manager.h index af84072b28f9a28fab08deca8ed28841e8bd9f7b..1869f159ae5f5ead2b1a45e04b1ab8e4fa5d4158 100644 --- a/js_concurrent_module/taskpool/task_manager.h +++ b/js_concurrent_module/taskpool/task_manager.h @@ -39,7 +39,6 @@ using namespace Commonlibrary::Concurrent::Common; static constexpr char ARGUMENTS_STR[] = "arguments"; static constexpr char NAME[] = "name"; static constexpr char FUNCTION_STR[] = "function"; -static constexpr char GROUP_ID_STR[] = "groupId"; static constexpr char TASKID_STR[] = "taskId"; static constexpr char TASKINFO_STR[] = "taskInfo"; static constexpr char TRANSFERLIST_STR[] = "transferList"; diff --git a/js_concurrent_module/taskpool/task_strategy.cpp b/js_concurrent_module/taskpool/task_strategy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..838d85174d76f206a0b442a98512ab95a41448c3 --- /dev/null +++ b/js_concurrent_module/taskpool/task_strategy.cpp @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "task_strategy.h" +#include "task_manager.h" +#include "task_group_manager.h" +#include "sequence_runner_manager.h" +#include "async_runner_manager.h" +#include "taskpool.h" + +#include "task.h" + +namespace Commonlibrary::Concurrent::TaskPoolModule { + +BasicTaskStrategy::BasicTaskStrategy(Task* task) : task(task) {} + +void CommonTaskStrategy::Cancel(napi_env env) +{ + if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) { + std::string errMsg = "taskpool:: task is not executed or has been executed"; + HILOG_ERROR("%{public}s", errMsg.c_str()); + ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); + return; + } + ExecuteState state = task->taskState_.exchange(ExecuteState::CANCELED); + if (task->IsSameEnv(env)) { + task->CancelCommonTask(state); + return; + } + CancelTaskMessage* message = new CancelTaskMessage(state, task->taskId_); + task->TriggerCancel(message); +} + +void FunctionTaskStrategy::Cancel(napi_env env) +{ + +} + +void GroupCommonTaskStrategy::Cancel(napi_env env) +{ + if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) { + std::string errMsg = "taskpool:: task is not executed or has been executed"; + HILOG_ERROR("%{public}s", errMsg.c_str()); + ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); + return; + } + TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_); + if (taskGroup == nullptr) { + return; + } + taskGroup->CancelGroupTask(env, task->taskId_); +} + +void GroupFunctionTaskStrategy::Cancel(napi_env env) +{ + +} + +void SeqRunnerTaskStrategy::Cancel(napi_env env) +{ + if (task->taskState_ != ExecuteState::FINISHED) { + task->taskState_ = ExecuteState::CANCELED; + return; + } + std::string errMsg = "taskpool:: sequenceRunner task has been executed"; + HILOG_ERROR("%{public}s", errMsg.c_str()); + ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); +} + +void AsyncRunnerTaskStrategy::Cancel(napi_env env) +{ + AsyncRunnerManager::GetInstance().CancelAsyncRunnerTask(env, task); +} + +void DelayedTaskStrategy::Cancel(napi_env env) +{ + if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) { + std::string errMsg = "taskpool:: task is not executed or has been executed"; + HILOG_ERROR("%{public}s", errMsg.c_str()); + ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str()); + return; + } + ExecuteState state = task->taskState_.exchange(ExecuteState::CANCELED); + if (task->IsSameEnv(env)) { + task->CancelDelayedTask(state); + return; + } + CancelTaskMessage* message = new CancelTaskMessage(state, task->taskId_); + task->TriggerCancel(message); +} + +void PeriodicTaskStrategy::Cancel(napi_env env) +{ + task->taskState_.exchange(ExecuteState::CANCELED); +} + +void CommonTaskStrategy::HandleTaskResult(bool success, napi_value result) +{ + task->HandelTaskResult(success, result); + task->NotifyPendingTask(); +} + +void FunctionTaskStrategy::HandleTaskResult(bool success, napi_value result) +{ + task->HandelTaskResult(success, result); + task->ReleaseData(); + TaskManager::GetInstance().RemoveTask(task->taskId_); + delete task; +} + +void GroupCommonTaskStrategy::HandleTaskResult(bool success, napi_value result) +{ + TaskPool::UpdateGroupInfoByResult(task->GetEnv(), task, result, success); +} + +void GroupFunctionTaskStrategy::HandleTaskResult(bool success, napi_value result) +{ + TaskPool::UpdateGroupInfoByResult(task->GetEnv(), task, result, success); +} + +void SeqRunnerTaskStrategy::HandleTaskResult(bool success, napi_value result) +{ + task->HandelTaskResult(success, result); + if (!SequenceRunnerManager::GetInstance().TriggerSeqRunner(task->env_, task)) { + HILOG_ERROR("taskpool:: task %{public}s trigger in seqRunner %{public}s failed", + std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str()); + } +} + +void AsyncRunnerTaskStrategy::HandleTaskResult(bool success, napi_value result) +{ + task->HandelTaskResult(success, result); + if (!AsyncRunnerManager::GetInstance().TriggerAsyncRunner(task->env_, task)) { + HILOG_ERROR("taskpool:: task %{public}s trigger in asyncRunner %{public}s failed", + std::to_string(task->taskId_).c_str(), std::to_string(task->asyncRunnerId_).c_str()); + } +} + +void DelayedTaskStrategy::HandleTaskResult(bool success, napi_value result) +{ + task->HandelTaskResult(success, result); + task->NotifyPendingTask(); +} + +void PeriodicTaskStrategy::HandleTaskResult(bool success, napi_value result) +{ +} +} diff --git a/js_concurrent_module/taskpool/task_strategy.h b/js_concurrent_module/taskpool/task_strategy.h new file mode 100644 index 0000000000000000000000000000000000000000..e733c4566edf63574b24182368d52c8fb730b74b --- /dev/null +++ b/js_concurrent_module/taskpool/task_strategy.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef JS_CONCURRENT_MODULE_TASKPOOL_TASK_STRATEGY_H +#define JS_CONCURRENT_MODULE_TASKPOOL_TASK_STRATEGY_H + +#include "napi/native_api.h" + +namespace Commonlibrary::Concurrent::TaskPoolModule { +class Task; + +class BasicTaskStrategy { +public: + virtual void Execute() = 0; + virtual void Cancel(napi_env env) = 0; + virtual void TriggerTask() = 0; + virtual void HandleTaskResult(bool success, napi_value result) = 0; + + BasicTaskStrategy(Task* task); + virtual ~BasicTaskStrategy() {} + +protected: + Task* task = nullptr; +}; + +class CommonTaskStrategy : public BasicTaskStrategy { +public: + using BasicTaskStrategy::BasicTaskStrategy; + virtual void Execute() override {} + virtual void Cancel(napi_env env) override; + virtual void TriggerTask() override {} + virtual void HandleTaskResult(bool success, napi_value result) override; +}; + +class FunctionTaskStrategy : public BasicTaskStrategy { +public: + using BasicTaskStrategy::BasicTaskStrategy; + virtual void Execute() override {} + virtual void Cancel(napi_env env) override; + virtual void TriggerTask() override {} + virtual void HandleTaskResult(bool success, napi_value result) override; +}; + +class SeqRunnerTaskStrategy : public BasicTaskStrategy { +public: + using BasicTaskStrategy::BasicTaskStrategy; + virtual void Execute() override {} + virtual void Cancel(napi_env env) override; + virtual void TriggerTask() override {} + virtual void HandleTaskResult(bool success, napi_value result) override; +}; + +class GroupCommonTaskStrategy : public BasicTaskStrategy { +public: + using BasicTaskStrategy::BasicTaskStrategy; + virtual void Execute() override {} + virtual void Cancel(napi_env env) override; + virtual void TriggerTask() override {} + virtual void HandleTaskResult(bool success, napi_value result) override; +}; + +class GroupFunctionTaskStrategy : public BasicTaskStrategy { +public: + using BasicTaskStrategy::BasicTaskStrategy; + virtual void Execute() override {} + virtual void Cancel(napi_env env) override; + virtual void TriggerTask() override {} + virtual void HandleTaskResult(bool success, napi_value result) override; +}; + +class AsyncRunnerTaskStrategy : public BasicTaskStrategy { +public: + using BasicTaskStrategy::BasicTaskStrategy; + virtual void Execute() override {} + virtual void Cancel(napi_env env) override; + virtual void TriggerTask() override {} + virtual void HandleTaskResult(bool success, napi_value result) override; +}; + +class DelayedTaskStrategy : public BasicTaskStrategy { +public: + using BasicTaskStrategy::BasicTaskStrategy; + virtual void Execute() override {} + virtual void Cancel(napi_env env) override; + virtual void TriggerTask() override {} + virtual void HandleTaskResult(bool success, napi_value result) override; +}; + +class PeriodicTaskStrategy : public BasicTaskStrategy { +public: + using BasicTaskStrategy::BasicTaskStrategy; + virtual void Execute() override {} + virtual void Cancel(napi_env env) override; + virtual void TriggerTask() override {} + virtual void HandleTaskResult(bool success, napi_value result) override; +}; +} +#endif \ No newline at end of file diff --git a/js_concurrent_module/taskpool/taskpool.cpp b/js_concurrent_module/taskpool/taskpool.cpp index e26372937c927a592276c726e7048389cb9aad44..b29803da7269223fe60206409e0c7685909c915e 100644 --- a/js_concurrent_module/taskpool/taskpool.cpp +++ b/js_concurrent_module/taskpool/taskpool.cpp @@ -178,7 +178,7 @@ napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo) { HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo); - if (argc < 1) { + if (UNLIKELY(argc < 1)) { ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one."); return nullptr; } @@ -196,11 +196,11 @@ napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo) } priority = NapiHelper::GetUint32Value(env, args[1]); if (priority >= Priority::NUMBER) { - ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error"); + ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is invalid"); return nullptr; } } - if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) { + if (NapiHelper::HasNameProperty(env, args[0], "groupId")) { return ExecuteGroup(env, args[0], static_cast(priority)); } Task* task = nullptr; @@ -212,13 +212,10 @@ napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo) if (!task->CanExecute(env)) { return nullptr; } - napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK, - static_cast(priority)); - if (promise == nullptr) { - return nullptr; - } + TaskInfo* taskInfo = task->GenerateTaskInfo(env, args[0], static_cast(priority)); + task->UpdateTaskType(TaskType::COMMON_TASK); ExecuteTask(env, task, static_cast(priority)); - return promise; + return NapiHelper::CreatePromise(env, &taskInfo->deferred); } if (type != napi_function) { ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, @@ -230,9 +227,9 @@ napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo) HILOG_ERROR("taskpool:: GenerateFunctionTask failed"); return nullptr; } - napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred); + task->UpdateTaskType(TaskType::FUNCTION_TASK); ExecuteTask(env, task); - return promise; + return NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred);; } void TaskPool::DelayTask(uv_timer_t* handle) @@ -241,9 +238,8 @@ void TaskPool::DelayTask(uv_timer_t* handle) auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId); napi_status status = napi_ok; if (task == nullptr) { - HILOG_DEBUG("taskpool:: task is nullptr"); - } else if (task->taskState_ == ExecuteState::CANCELED) { - HILOG_DEBUG("taskpool:: DelayTask task has been canceled"); + HILOG_ERROR("taskpool:: task is nullptr"); + } else if (task->IsCanceled()) { HandleScope scope(task->env_, status); if (status != napi_ok) { HILOG_ERROR("taskpool:: napi_open_handle_scope failed"); @@ -276,14 +272,10 @@ void TaskPool::DelayTask(uv_timer_t* handle) } } } - if (task != nullptr) { - std::lock_guard lock(task->taskMutex_); - task->delayedTimers_.erase(handle); - } + task->delayedTimers_.erase(handle); uv_timer_stop(handle); ConcurrentHelper::UvHandleClose(handle); delete taskMessage; - taskMessage = nullptr; } napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo) @@ -296,11 +288,10 @@ napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo) return nullptr; } - if (!task->IsExecuted() || task->taskState_ == ExecuteState::CANCELED || - task->taskState_ == ExecuteState::FINISHED) { + if (!task->IsExecuted() || task->IsCanceled() || task->IsFinished()) { task->taskState_ = ExecuteState::DELAYED; } - task->UpdateTaskType(TaskType::COMMON_TASK); + task->UpdateTaskType(TaskType::DELAYED_TASK); uv_loop_t* loop = NapiHelper::GetLibUV(env); uv_update_time(loop); @@ -309,7 +300,6 @@ napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo) TaskMessage* taskMessage = new TaskMessage(); taskMessage->priority = static_cast(priority); taskMessage->taskId = task->taskId_; - napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred); timer->data = taskMessage; std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_); @@ -327,68 +317,22 @@ napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo) if (engine->IsMainThread()) { uv_async_send(&loop->wq_async); } - return promise; + return NapiHelper::CreatePromise(env, &taskMessage->deferred); } napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority) { - napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR); - uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId); - HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str()); - auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId); + TaskGroup* taskGroup = nullptr; + napi_unwrap(env, napiTaskGroup, reinterpret_cast(&taskGroup)); if (taskGroup == nullptr) { ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskGroup is nullptr."); return nullptr; } - napi_reference_ref(env, taskGroup->groupRef_, nullptr); - if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED || - taskGroup->groupState_ == ExecuteState::CANCELED) { - taskGroup->groupState_ = ExecuteState::WAITING; - } - GroupInfo* groupInfo = new GroupInfo(); - groupInfo->priority = priority; - napi_value resArr; - napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr); - napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1); - groupInfo->resArr = arrRef; - napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred); - { - std::lock_guard lock(taskGroup->taskGroupMutex_); - if (taskGroup->taskNum_ == 0) { - napi_resolve_deferred(env, groupInfo->deferred, resArr); - taskGroup->groupState_ = ExecuteState::FINISHED; - napi_delete_reference(env, groupInfo->resArr); - napi_reference_unref(env, taskGroup->groupRef_, nullptr); - delete groupInfo; - taskGroup->currentGroupInfo_ = nullptr; - return promise; - } - if (taskGroup->currentGroupInfo_ == nullptr) { - taskGroup->currentGroupInfo_ = groupInfo; - for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) { - napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter); - Task* task = nullptr; - napi_unwrap(env, napiTask, reinterpret_cast(&task)); - if (task == nullptr) { - HILOG_ERROR("taskpool::ExecuteGroup task is nullptr"); - return nullptr; - } - napi_reference_ref(env, task->taskRef_, nullptr); - if (task->IsGroupCommonTask()) { - task->GetTaskInfo(env, napiTask, static_cast(priority)); - } - ExecuteTask(env, task, static_cast(priority)); - } - } else { - taskGroup->pendingGroupInfos_.push_back(groupInfo); - } - } - return promise; + return taskGroup->Execute(napiTaskGroup, priority); } void TaskPool::HandleTaskResult(Task* task) { - HILOG_DEBUG("taskpool:: HandleTaskResult task"); HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__); if (!task->IsMainThreadTask()) { if (task->ShouldDeleteTask(false)) { @@ -405,20 +349,23 @@ void TaskPool::HandleTaskResult(Task* task) void TaskPool::HandleTaskResultInner(Task* task) { + auto env = task->GetEnv(); + auto taskId = task->GetTaskId(); napi_handle_scope scope = nullptr; - NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope)); + napi_open_handle_scope(env, &scope); napi_value napiTaskResult = nullptr; - napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult); - napi_delete_serialization_data(task->env_, task->result_); + napi_status status = napi_deserialize(env, task->result_, &napiTaskResult); + napi_delete_serialization_data(env, task->result_); + // tag for trace parse: Task PerformTask End - std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_); - std::string taskLog = "Task PerformTask End: " + std::to_string(task->taskId_); - if (task->taskState_ == ExecuteState::CANCELED) { + std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(taskId); + std::string taskLog = "Task PerformTask End: " + std::to_string(taskId); + if (task->IsCanceled()) { strTrace += ", performResult : IsCanceled"; napiTaskResult = task->IsAsyncRunnerTask() ? - ErrorHelper::NewError(task->env_, ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED) : - ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled"); + ErrorHelper::NewError(env, ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED) : + ErrorHelper::NewError(env, 0, "taskpool:: task has been canceled"); } else if (status != napi_ok) { strTrace += ", performResult : DeserializeFailed"; taskLog += ", DeserializeFailed"; @@ -431,70 +378,19 @@ void TaskPool::HandleTaskResultInner(Task* task) HITRACE_HELPER_METER_NAME(strTrace); HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str()); if (napiTaskResult == nullptr) { - napi_get_undefined(task->env_, &napiTaskResult); + napi_get_undefined(env, &napiTaskResult); } - reinterpret_cast(task->env_)->DecreaseSubEnvCounter(); - bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_); - task->taskState_ = ExecuteState::ENDING; + reinterpret_cast(env)->DecreaseSubEnvCounter(); + bool success = ((status == napi_ok) && (!task->IsCanceled())) && (task->success_); task->isCancelToFinish_ = false; - if (task->IsGroupTask()) { - UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success); - } else if (!task->IsPeriodicTask()) { - if (success) { - napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult); - if (task->onExecutionSucceededCallBackInfo_ != nullptr) { - task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_); - } - } else { - napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult); - if (task->onExecutionFailedCallBackInfo_ != nullptr) { - task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult; - task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_); - } - } - } - NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope)); - TriggerTask(task); -} - -void TaskPool::TriggerTask(Task* task) -{ - HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str()); - if (task->IsGroupTask()) { - return; - } + task->Handle(success, napiTaskResult); + napi_close_handle_scope(env, scope); + // TriggerTask(task); TaskManager::GetInstance().DecreaseSendDataRefCount(task->env_, task->taskId_); task->taskState_ = ExecuteState::FINISHED; - // seqRunnerTask will trigger the next - if (task->IsSeqRunnerTask()) { - if (!SequenceRunnerManager::GetInstance().TriggerSeqRunner(task->env_, task)) { - HILOG_ERROR("taskpool:: task %{public}s trigger in seqRunner %{public}s failed", - std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str()); - } - } else if (task->IsCommonTask()) { - task->NotifyPendingTask(); - } else if (task->IsAsyncRunnerTask()) { - if (!AsyncRunnerManager::GetInstance().TriggerAsyncRunner(task->env_, task)) { - HILOG_ERROR("taskpool:: task %{public}s trigger in asyncRunner %{public}s failed", - std::to_string(task->taskId_).c_str(), std::to_string(task->asyncRunnerId_).c_str()); - } - } - if (task->IsPeriodicTask()) { - return; - } - if (!task->IsFunctionTask()) { - napi_reference_unref(task->env_, task->taskRef_, nullptr); - return; - } - // function task need release data - task->ReleaseData(); - TaskManager::GetInstance().RemoveTask(task->taskId_); - delete task; } - void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success) { - HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str()); TaskManager::GetInstance().DecreaseSendDataRefCount(task->env_, task->taskId_); task->taskState_ = ExecuteState::FINISHED; napi_reference_unref(env, task->taskRef_, nullptr); @@ -503,53 +399,11 @@ void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, task->currentTaskInfo_ = nullptr; } TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_); - if (taskGroup == nullptr || taskGroup->currentGroupInfo_ == nullptr) { + if (taskGroup == nullptr) { HILOG_DEBUG("taskpool:: taskGroup may have been released or canceled"); return; } - // store the result - uint32_t index = taskGroup->GetTaskIndex(task->taskId_); - auto groupInfo = taskGroup->currentGroupInfo_; - napi_ref arrRef = groupInfo->resArr; - napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef); - napi_set_element(env, resArr, index, res); - groupInfo->finishedTaskNum++; - // store the index when the first exception occurs - if (!success && !groupInfo->HasException()) { - groupInfo->SetFailedIndex(index); - } - // we will not handle the result until all tasks are finished - if (groupInfo->finishedTaskNum < taskGroup->taskNum_) { - return; - } - // if there is no exception, just resolve - if (!groupInfo->HasException()) { - HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str()); - napi_resolve_deferred(env, groupInfo->deferred, resArr); - for (uint32_t taskId : taskGroup->taskIds_) { - auto task = TaskManager::GetInstance().GetTask(taskId); - if (task != nullptr && task->onExecutionSucceededCallBackInfo_ != nullptr) { - task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_); - } - } - } else { - napi_value res = nullptr; - napi_get_element(env, resArr, groupInfo->GetFailedIndex(), &res); - napi_reject_deferred(env, groupInfo->deferred, res); - auto iter = taskGroup->taskIds_.begin(); - std::advance(iter, groupInfo->GetFailedIndex()); - auto task = iter != taskGroup->taskIds_.end() ? TaskManager::GetInstance().GetTask(*iter) : nullptr; - if (task != nullptr && task->onExecutionFailedCallBackInfo_ != nullptr) { - task->onExecutionFailedCallBackInfo_->taskError_ = res; - task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_); - } - } - taskGroup->groupState_ = ExecuteState::FINISHED; - napi_delete_reference(env, groupInfo->resArr); - napi_reference_unref(env, taskGroup->groupRef_, nullptr); - delete groupInfo; - taskGroup->currentGroupInfo_ = nullptr; - taskGroup->NotifyGroupTask(env); + } void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority) @@ -594,7 +448,7 @@ napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo) return nullptr; } - if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) { + if (!NapiHelper::HasNameProperty(env, args[0], "groupId")) { napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR); if (napiTaskId == nullptr) { ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task."); @@ -603,13 +457,13 @@ napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo) uint32_t taskId = NapiHelper::GetUint32Value(env, napiTaskId); TaskManager::GetInstance().CancelTask(env, taskId); } else { - napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR); + napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], "groupId"); if (napiGroupId == nullptr) { ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be taskGroup."); return nullptr; } uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId); - TaskGroupManager::GetInstance().CancelGroup(env, groupId); + TaskGroupManager::GetInstance().CancelGroup(groupId); } return nullptr; } @@ -679,7 +533,7 @@ void TaskPool::PeriodicTaskCallback(uv_timer_t* handle) napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo) { int32_t period = 0; - uint32_t priority = Priority::DEFAULT; + Priority priority = Priority::DEFAULT; Task* periodicTask = nullptr; if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) { return nullptr; @@ -769,7 +623,7 @@ bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint3 } bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period, - uint32_t &priority, Task* &periodicTask) + Priority &priority, Task* &periodicTask) { size_t argc = 3; // 3 : period, task, priority napi_value args[3]; // 3 : period, task, priority @@ -797,7 +651,7 @@ bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority."); return false; } - priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority + priority = static_cast(NapiHelper::GetUint32Value(env, args[2])); // 2 : priority if (priority >= Priority::NUMBER) { ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid."); return false; diff --git a/js_concurrent_module/taskpool/taskpool.h b/js_concurrent_module/taskpool/taskpool.h index 2223d8e83809d22cb503ec954728c346337e92d3..3706f66d2d37e3d5abb576a4cb58435d1f834c68 100644 --- a/js_concurrent_module/taskpool/taskpool.h +++ b/js_concurrent_module/taskpool/taskpool.h @@ -35,6 +35,8 @@ class TaskPool { public: static napi_value InitTaskPool(napi_env env, napi_value exports); static void HandleTaskResult(Task* task); + static void UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success); + static void ExecuteTask(napi_env env, Task* task, Priority priority = Priority::DEFAULT); private: TaskPool() = delete; @@ -55,15 +57,14 @@ private: static void PeriodicTaskCallback(uv_timer_t* handle); static void HandleTaskResultInner(Task* task); - static void UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success); - static void ExecuteTask(napi_env env, Task* task, Priority priority = Priority::DEFAULT); + static napi_value ExecuteGroup(napi_env env, napi_value taskGroup, Priority priority); static void TriggerTask(Task* task); static void TriggerTimer(napi_env env, Task* task, int32_t period); static bool CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t& priority, int32_t& delayTime, Task*& task); - static bool CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t& period, uint32_t& priority, + static bool CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t& period, Priority& priority, Task*& task); static void ExecuteOnReceiveDataCallback(CallbackInfo* callbackInfo, TaskResultInfo* resultInfo); friend class TaskManager;