From ad0af3407b230dc0ce2ec4f515432e75c68dab55 Mon Sep 17 00:00:00 2001 From: chen_liqing Date: Mon, 21 Jul 2025 20:16:08 +0800 Subject: [PATCH] Add timeout for task queue --- .../csrc/core/npu/register/OptionsManager.cpp | 14 ++++++++++ .../csrc/core/npu/register/OptionsManager.h | 2 ++ torch_npu/csrc/framework/utils/NpuUtils.cpp | 27 +++++++++++++++++++ 3 files changed, 43 insertions(+) diff --git a/torch_npu/csrc/core/npu/register/OptionsManager.cpp b/torch_npu/csrc/core/npu/register/OptionsManager.cpp index d58b186ce27..9655402fe31 100644 --- a/torch_npu/csrc/core/npu/register/OptionsManager.cpp +++ b/torch_npu/csrc/core/npu/register/OptionsManager.cpp @@ -529,6 +529,20 @@ uint32_t OptionsManager::GetTaskQueueEnable() return task_queue_enable; } +uint32_t OptionsManager::GetTaskQueueEnqueueTimeout() +{ + char* env_val = std::getenv("TASK_QUEUE_ENQUEUE_TIMEOUT"); + int64_t envFlag = (env_val != nullptr) ? strtol(env_val, nullptr, 10) : 50000; + return static_cast(envFlag); +} + +uint32_t OptionsManager::GetTaskQueueDequeueTimeout() +{ + char* env_val = std::getenv("TASK_QUEUE_DEQUEUE_TIMEOUT"); + int64_t envFlag = (env_val != nullptr) ? strtol(env_val, nullptr, 10) : 50000; + return static_cast(envFlag); +} + bool OptionsManager::CheckForceUncached() { const static bool force_uncached = []() -> bool { diff --git a/torch_npu/csrc/core/npu/register/OptionsManager.h b/torch_npu/csrc/core/npu/register/OptionsManager.h index 8fd9f0446db..5257229271c 100644 --- a/torch_npu/csrc/core/npu/register/OptionsManager.h +++ b/torch_npu/csrc/core/npu/register/OptionsManager.h @@ -126,6 +126,8 @@ public: static uint32_t GetHcclBufferSize(); static uint32_t GetP2PBufferSize(); static uint32_t GetTaskQueueEnable(); + static uint32_t GetTaskQueueEnqueueTimeout(); + static uint32_t GetTaskQueueDequeueTimeout(); static uint32_t GetAclOpInitMode(); static uint32_t GetStreamsPerDevice(); static char* GetCpuAffinityConf(); diff --git a/torch_npu/csrc/framework/utils/NpuUtils.cpp b/torch_npu/csrc/framework/utils/NpuUtils.cpp index 6202f3d0de5..51dcf74568b 100644 --- a/torch_npu/csrc/framework/utils/NpuUtils.cpp +++ b/torch_npu/csrc/framework/utils/NpuUtils.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "torch_npu/csrc/aten/CustomFunctions.h" #include "torch_npu/csrc/aten/NPUNativeFunctions.h" @@ -269,6 +270,8 @@ void NpuUtils::check_1d(const at::Tensor &t, const char *arg, const char *fn) #ifndef BUILD_LIBTORCH +static thread_local std::chrono::time_point enqueue_start; + void NpuUtils::ProfReportMarkDataToNpuProfiler(uint32_t category, const std::string &data, uint64_t correlation_id) { if (data.empty()) { @@ -277,6 +280,17 @@ void NpuUtils::ProfReportMarkDataToNpuProfiler(uint32_t category, const std::str if (torch_npu::profiler::profDataReportEnable().load(std::memory_order_relaxed)) { torch_npu::profiler::reportMarkDataToNpuProfiler(category, data, correlation_id); } + + static auto enqueue_timeout = c10_npu::option::OptionsManager::GetTaskQueueEnqueueTimeout(); + if (category == 0) { + enqueue_start = std::chrono::high_resolution_clock::now(); + } else if (category == 1) { + auto enqueue_end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(enqueue_end - enqueue_start).count(); + if (duration >= enqueue_timeout) { + ASCEND_LOGE("Enqueue (duration = %lld us) is timeout for %s", static_cast(duration), data.c_str()); + } + } } void NpuUtils::DqueueCompileExcute(c10_npu::queue::QueueParas* para, uint32_t category) @@ -303,6 +317,8 @@ void NpuUtils::DqueueAnyncMemcpy(c10_npu::queue::QueueParas* para, uint32_t cate category, c10_npu::queue::CopyParas::COPY_PARAS_MAP[param_val->kind], para->correlation_id); } +static thread_local std::chrono::time_point dequeue_start; + void NpuUtils::ProfReportMarkDataToNpuProfiler(uint32_t category, void *data, size_t offset) { if (C10_UNLIKELY(!data)) { @@ -326,6 +342,17 @@ void NpuUtils::ProfReportMarkDataToNpuProfiler(uint32_t category, void *data, si entry->second(cur_param, category); } } + + static auto dequeue_timeout = c10_npu::option::OptionsManager::GetTaskQueueDequeueTimeout(); + if (category == 2) { + dequeue_start = std::chrono::high_resolution_clock::now(); + } else if (category == 3) { + auto dequeue_end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(dequeue_end - dequeue_start).count(); + if (duration >= dequeue_timeout) { + ASCEND_LOGE("Dequeue (duration = %lld us) is timeout.", static_cast(duration)); + } + } } #endif -- Gitee