From fe9673d690188bc6a5196eec1ce879fe761fd360 Mon Sep 17 00:00:00 2001 From: wangchao Date: Sat, 23 Aug 2025 09:46:04 +0800 Subject: [PATCH 1/2] TaskQueue: Synchronize device with FORCE STOP --- torch_npu/csrc/core/npu/NPUQueue.cpp | 10 ++++++++++ torch_npu/csrc/npu/Module.cpp | 3 ++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/torch_npu/csrc/core/npu/NPUQueue.cpp b/torch_npu/csrc/core/npu/NPUQueue.cpp index 1f8cd124284..9c6938db83d 100644 --- a/torch_npu/csrc/core/npu/NPUQueue.cpp +++ b/torch_npu/csrc/core/npu/NPUQueue.cpp @@ -446,6 +446,11 @@ bool Repository::ReadQueue() } ClearQueue(); c10_npu::NPUEventManager::GetInstance().ClearUnrecordedCount(); + if (GetStatus() == RepoStatus::STOP_EXIT) { + auto acl_ret = c10_npu::acl::AclrtSynchronizeDeviceWithTimeout(); + ASCEND_LOGE("ReadQueue: SynchronizeDevice with FORCE STOP, device = %d, write_idx = %u, read_idx = %u, ret = %d, acl_ret = %d", + device_idx, write_idx.idx, read_idx.idx, GetStatus(), ret, acl_ret); + } return false; } @@ -459,6 +464,11 @@ bool Repository::ReadQueue() read_idx.idx = (read_idx.idx + 1) & (kQueueCapacity - 1); + if (GetStatus() == RepoStatus::STOP_EXIT) { + auto acl_ret = c10_npu::acl::AclrtSynchronizeDeviceWithTimeout(); + ASCEND_LOGE("ReadQueue: SynchronizeDevice with FORCE STOP, device = %d, write_idx = %u, read_idx = %u, ret = %d, acl_ret = %d", + device_idx, write_idx.idx, read_idx.idx, GetStatus(), ret, acl_ret); + } return true; } diff --git a/torch_npu/csrc/npu/Module.cpp b/torch_npu/csrc/npu/Module.cpp index dd0f006a174..d218a480457 100644 --- a/torch_npu/csrc/npu/Module.cpp +++ b/torch_npu/csrc/npu/Module.cpp @@ -636,9 +636,10 @@ PyObject* THNPModule_stopDevice_wrap(PyObject* self, PyObject* arg) { HANDLE_TH_ERRORS int device = THPUtils_unpackLong(arg); + ASCEND_LOGI("NPU stop device start, device is %d.", device); setDefaultStreamsStatus(device, c10_npu::RepoStatus::STOP_EXIT); int ret = c10_npu::acl::AclrtDeviceTaskAbort(device); - ASCEND_LOGI("NPU stop device success, device is %d, ret is %d.", device, ret); + ASCEND_LOGI("NPU stop device end, device is %d, ret is %d.", device, ret); if (ret == 0) { return PyLong_FromLong(0); } else { -- Gitee From fc596aecdf409abb128bf45aa0e9dce367445109 Mon Sep 17 00:00:00 2001 From: wangchao Date: Sat, 23 Aug 2025 12:05:15 +0800 Subject: [PATCH 2/2] Add explanatory notes for taskqueue synchronize --- torch_npu/csrc/core/npu/NPUQueue.cpp | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/torch_npu/csrc/core/npu/NPUQueue.cpp b/torch_npu/csrc/core/npu/NPUQueue.cpp index 9c6938db83d..45f821fe96c 100644 --- a/torch_npu/csrc/core/npu/NPUQueue.cpp +++ b/torch_npu/csrc/core/npu/NPUQueue.cpp @@ -447,8 +447,14 @@ bool Repository::ReadQueue() ClearQueue(); c10_npu::NPUEventManager::GetInstance().ClearUnrecordedCount(); if (GetStatus() == RepoStatus::STOP_EXIT) { + // The "stop_device" function will first set the "FORCE STOP" state, and then call the "devicetaskabort" interface. + // In a theoretical scenario, it is possible that before setting the FORCE STOP state, + // the dequeue thread had already got a task and was preparing to dispatch it. + // After calling the "devicetaskabort" interface, the task was finally ready to be dispatched. + // At this point, if the execution of this task fails, there will be an error state in the device, + // and it needs to be handled through the synchronization interface. auto acl_ret = c10_npu::acl::AclrtSynchronizeDeviceWithTimeout(); - ASCEND_LOGE("ReadQueue: SynchronizeDevice with FORCE STOP, device = %d, write_idx = %u, read_idx = %u, ret = %d, acl_ret = %d", + ASCEND_LOGI("ReadQueue: SynchronizeDevice with FORCE STOP, device = %d, write_idx = %u, read_idx = %u, ret = %d, acl_ret = %d", device_idx, write_idx.idx, read_idx.idx, GetStatus(), ret, acl_ret); } return false; @@ -465,8 +471,14 @@ bool Repository::ReadQueue() read_idx.idx = (read_idx.idx + 1) & (kQueueCapacity - 1); if (GetStatus() == RepoStatus::STOP_EXIT) { + // The "stop_device" function will first set the "FORCE STOP" state, and then call the "devicetaskabort" interface. + // In a theoretical scenario, it is possible that before setting the FORCE STOP state, + // the dequeue thread had already got a task and was preparing to dispatch it. + // After calling the "devicetaskabort" interface, the task was finally ready to be dispatched. + // At this point, if the execution of this task fails, there will be an error state in the device, + // and it needs to be handled through the synchronization interface. auto acl_ret = c10_npu::acl::AclrtSynchronizeDeviceWithTimeout(); - ASCEND_LOGE("ReadQueue: SynchronizeDevice with FORCE STOP, device = %d, write_idx = %u, read_idx = %u, ret = %d, acl_ret = %d", + ASCEND_LOGI("ReadQueue: SynchronizeDevice with FORCE STOP, device = %d, write_idx = %u, read_idx = %u, ret = %d, acl_ret = %d", device_idx, write_idx.idx, read_idx.idx, GetStatus(), ret, acl_ret); } return true; -- Gitee