From f71c6952f10f9ab43976ea565bb59c24c665ea4a Mon Sep 17 00:00:00 2001 From: dongwenbo6 Date: Sat, 17 Feb 2024 16:09:02 +0800 Subject: [PATCH 1/4] optimize plog ERROR --- torch_npu/csrc/core/npu/NPUQueue.cpp | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/torch_npu/csrc/core/npu/NPUQueue.cpp b/torch_npu/csrc/core/npu/NPUQueue.cpp index ec26c9a9526..cb49d26b596 100644 --- a/torch_npu/csrc/core/npu/NPUQueue.cpp +++ b/torch_npu/csrc/core/npu/NPUQueue.cpp @@ -3,6 +3,7 @@ #include "torch_npu/csrc/core/npu/npu_log.h" #include "torch_npu/csrc/framework/utils/NpuUtils.h" #include "torch_npu/csrc/core/npu/NPUFunctions.h" +#include "torch_npu/csrc/framework/OpParamMaker.h" #ifndef BUILD_LIBTORCH #include @@ -300,7 +301,19 @@ void Repository::Enqueue(void* cur_paras) { } if (GetStatus() != RUN && GetStatus() != INIT) { - ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). !!"); + auto queueParam = static_cast(cur_paras); + auto type = queueParam->paramType; + if (type ==c10_npu::queue::COMPILE_AND_EXECUTE) { + auto cur_paras = static_cast(queueParam->paramVal); + auto op_param = cur_paras->opType; + ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). op name is=%s", *op_name); + } else if (type ==c10_npu::queue::ASYNC_MEMCPY) { + auto cur_paras = static_cast(queueParam->paramVal); + ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). src=%p, dst=%p", cur_paras->src, cur_paras->dst); + } else { + auto cur_paras = static_cast(queueParam->paramVal); + ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). event is=%p", cur_paras->event); + } return; } bool ret = false; -- Gitee From 54191db45d58c82df6307f3a0e87eed8c74c76d1 Mon Sep 17 00:00:00 2001 From: dongwenbo6 Date: Sun, 18 Feb 2024 10:46:55 +0800 Subject: [PATCH 2/4] reorganize files --- torch_npu/csrc/core/npu/NPUQueue.cpp | 808 +++++++++++++-------------- 1 file changed, 404 insertions(+), 404 deletions(-) diff --git a/torch_npu/csrc/core/npu/NPUQueue.cpp b/torch_npu/csrc/core/npu/NPUQueue.cpp index cb49d26b596..1c54693e136 100644 --- a/torch_npu/csrc/core/npu/NPUQueue.cpp +++ b/torch_npu/csrc/core/npu/NPUQueue.cpp @@ -24,98 +24,98 @@ namespace { class CallBackManager { public: - CallBackManager() {} - ~CallBackManager() {} - void SetExec(const ACL_EXEC_FUNC& func) { - this->execFunc = func; - } + CallBackManager() {} + ~CallBackManager() {} + void SetExec(const ACL_EXEC_FUNC& func) { + this->execFunc = func; + } - void SetCopy(const ACL_COPY_FUNC& func) { - this->copyFunc = func; - } + void SetCopy(const ACL_COPY_FUNC& func) { + this->copyFunc = func; + } - void SetRelease(const ACL_RELEASE_FUNC& func) { - this->releaseFunc = func; - } + void SetRelease(const ACL_RELEASE_FUNC& func) { + this->releaseFunc = func; + } - void SetCopyReleaseParam(const ACL_COPY_RELEASE_PARM_FUNC& func) { - this->copyReleaseParamFunc = func; - } + void SetCopyReleaseParam(const ACL_COPY_RELEASE_PARM_FUNC& func) { + this->copyReleaseParamFunc = func; + } - void SetReleaseParam(const ACL_RELEASE_PARAM_FUNC& func) { - this->releaseParamFunc = func; - } + void SetReleaseParam(const ACL_RELEASE_PARAM_FUNC& func) { + this->releaseParamFunc = func; + } - void SetNew(const ACL_NEW_FUNC& func) { - this->newFunc = func; - } + void SetNew(const ACL_NEW_FUNC& func) { + this->newFunc = func; + } - void SetDelete(const ACL_DELETE_FUNC& func) { - this->deleteFunc = func; - } + void SetDelete(const ACL_DELETE_FUNC& func) { + this->deleteFunc = func; + } - int Call(void* head, int offset) { - TORCH_CHECK(this->execFunc, "Failed to find execution function."); - auto dstPtr = (uint8_t*)head + sizePerParams * offset; - return this->execFunc(dstPtr); - } + int Call(void* head, int offset) { + TORCH_CHECK(this->execFunc, "Failed to find execution function."); + auto dstPtr = (uint8_t*)head + sizePerParams * offset; + return this->execFunc(dstPtr); + } - void Copy(void* dstHead, int offset, void* src) { - TORCH_CHECK(this->copyFunc, "Failed to find copy function."); - auto dstPtr = (uint8_t*)dstHead + sizePerParams * offset; - return this->copyFunc(dstPtr, src); - } + void Copy(void* dstHead, int offset, void* src) { + TORCH_CHECK(this->copyFunc, "Failed to find copy function."); + auto dstPtr = (uint8_t*)dstHead + sizePerParams * offset; + return this->copyFunc(dstPtr, src); + } - void Release(void* head, int offset, ReleaseQueue& releaseQueue) { - TORCH_CHECK(this->releaseFunc, "Failed to find release function."); - auto ptr = (uint8_t*)head + sizePerParams * offset; - return this->releaseFunc(ptr, releaseQueue); - } + void Release(void* head, int offset, ReleaseQueue& releaseQueue) { + TORCH_CHECK(this->releaseFunc, "Failed to find release function."); + auto ptr = (uint8_t*)head + sizePerParams * offset; + return this->releaseFunc(ptr, releaseQueue); + } - void CopyRealseParam(void* dstHead, int offset, void* src) { - TORCH_CHECK(this->copyReleaseParamFunc, "Failed to find copy release params function."); - auto dstPtr = (uint8_t*)dstHead + sizePerParams * offset; - return this->copyReleaseParamFunc(dstPtr, src); - } + void CopyRealseParam(void* dstHead, int offset, void* src) { + TORCH_CHECK(this->copyReleaseParamFunc, "Failed to find copy release params function."); + auto dstPtr = (uint8_t*)dstHead + sizePerParams * offset; + return this->copyReleaseParamFunc(dstPtr, src); + } - void ReleaseParam(void* head, int offset) { - TORCH_CHECK(this->releaseParamFunc, "Failed to find release params function."); - auto ptr = (uint8_t*)head + sizePerParams * offset; - return this->releaseParamFunc(ptr); - } + void ReleaseParam(void* head, int offset) { + TORCH_CHECK(this->releaseParamFunc, "Failed to find release params function."); + auto ptr = (uint8_t*)head + sizePerParams * offset; + return this->releaseParamFunc(ptr); + } - void* Init(int capacity) { - TORCH_CHECK(this->newFunc, "Failed to find new function."); - void* ptr = this->newFunc(capacity, sizePerParams); // not check as CUDA - return ptr; - } + void* Init(int capacity) { + TORCH_CHECK(this->newFunc, "Failed to find new function."); + void* ptr = this->newFunc(capacity, sizePerParams); // not check as CUDA + return ptr; + } - void DeInit(void* ptr) { - if (ptr != nullptr) { - TORCH_CHECK(this->deleteFunc, "Failed to find delete function."); - this->deleteFunc(ptr); - ptr = nullptr; + void DeInit(void* ptr) { + if (ptr != nullptr) { + TORCH_CHECK(this->deleteFunc, "Failed to find delete function."); + this->deleteFunc(ptr); + ptr = nullptr; + } } - } private: - int sizePerParams = 0; - ACL_EXEC_FUNC execFunc = nullptr; - ACL_COPY_FUNC copyFunc = nullptr; - ACL_RELEASE_FUNC releaseFunc = nullptr; - ACL_NEW_FUNC newFunc = nullptr; - ACL_DELETE_FUNC deleteFunc = nullptr; - ACL_COPY_RELEASE_PARM_FUNC copyReleaseParamFunc = nullptr; - ACL_RELEASE_PARAM_FUNC releaseParamFunc = nullptr; + int sizePerParams = 0; + ACL_EXEC_FUNC execFunc = nullptr; + ACL_COPY_FUNC copyFunc = nullptr; + ACL_RELEASE_FUNC releaseFunc = nullptr; + ACL_NEW_FUNC newFunc = nullptr; + ACL_DELETE_FUNC deleteFunc = nullptr; + ACL_COPY_RELEASE_PARM_FUNC copyReleaseParamFunc = nullptr; + ACL_RELEASE_PARAM_FUNC releaseParamFunc = nullptr; }; // class CallBackManager CallBackManager& manager() { - static CallBackManager instance; - return instance; + static CallBackManager instance; + return instance; } CallBackManager& releaseManager() { - static CallBackManager releaseinstance; - return releaseinstance; + static CallBackManager releaseinstance; + return releaseinstance; } } // namespace @@ -124,15 +124,15 @@ NPUCallBackRegisterBuilder::NPUCallBackRegisterBuilder(const ACL_EXEC_FUNC& exec const ACL_COPY_FUNC& copyFunc, const ACL_RELEASE_FUNC& releaseFunc, const ACL_NEW_FUNC& newFunc, const ACL_DELETE_FUNC& deleteFunc, const ACL_COPY_RELEASE_PARM_FUNC& copyReleaseParamF, const ACL_RELEASE_PARAM_FUNC& releaseParamF) { - manager().SetExec(execFunc); - manager().SetCopy(copyFunc); - manager().SetRelease(releaseFunc); - manager().SetNew(newFunc); - manager().SetDelete(deleteFunc); - releaseManager().SetCopyReleaseParam(copyReleaseParamF); - releaseManager().SetReleaseParam(releaseParamF); - releaseManager().SetNew(newFunc); - releaseManager().SetDelete(deleteFunc); + manager().SetExec(execFunc); + manager().SetCopy(copyFunc); + manager().SetRelease(releaseFunc); + manager().SetNew(newFunc); + manager().SetDelete(deleteFunc); + releaseManager().SetCopyReleaseParam(copyReleaseParamF); + releaseManager().SetReleaseParam(releaseParamF); + releaseManager().SetNew(newFunc); + releaseManager().SetDelete(deleteFunc); } } // namespace register_queue_cb @@ -144,36 +144,36 @@ NPUCallBackRegisterBuilder::NPUCallBackRegisterBuilder(const ACL_EXEC_FUNC& exec static constexpr size_t kQueueCapacity = 4096; RepoStatus Repository::GetStatus() const { - if (initialized == false) { - ASCEND_LOGE("Task queue is not initialized, shouldn't call GetStatus(). !!"); - } + if (initialized == false) { + ASCEND_LOGE("Task queue is not initialized, shouldn't call GetStatus(). !!"); + } - return repo_status.load(); + return repo_status.load(); } void Repository::SetStatus(RepoStatus desired) { - if (initialized == false) { - ASCEND_LOGE("Task queue is not initialized, shouldn't call SetStatus(). !!"); - return; - } + if (initialized == false) { + ASCEND_LOGE("Task queue is not initialized, shouldn't call SetStatus(). !!"); + return; + } - repo_status = desired; + repo_status = desired; } void Repository::ChangeStatus(RepoStatus expected, RepoStatus desired) { - if (initialized == false) { - ASCEND_LOGE("Task queue is not initialized, shouldn't call ChangeStatus(). !!"); - return; - } + if (initialized == false) { + ASCEND_LOGE("Task queue is not initialized, shouldn't call ChangeStatus(). !!"); + return; + } - repo_status.compare_exchange_strong(expected, desired); + repo_status.compare_exchange_strong(expected, desired); } NPUStatus Repository::MakeSureQueueEmpty() { - if (initialized == false) { - ASCEND_LOGE("Task queue is not initialized, shouldn't call MakeSureQueueEmpty(). !!"); - return FAILED; - } + if (initialized == false) { + ASCEND_LOGE("Task queue is not initialized, shouldn't call MakeSureQueueEmpty(). !!"); + return FAILED; + } ASCEND_LOGI("Begin to makesure taskqueue empty."); // While waiting for ACL thread to launch tasks, // the current thread should not hold GIL. @@ -182,292 +182,292 @@ NPUStatus Repository::MakeSureQueueEmpty() { // If the current thread does not release the GIL, a deadlock will // occur. #ifndef BUILD_LIBTORCH - PyThreadState *gilState = nullptr; - if (PyGILState_Check()) { - gilState = PyEval_SaveThread(); - } + PyThreadState *gilState = nullptr; + if (PyGILState_Check()) { + gilState = PyEval_SaveThread(); + } #endif if (consumer.joinable()) { - ssize_t s; - uint64_t u = 1; - while (!IsEmptyQueue()) { - std::lock_guard lock(mu_empty); - need_empty = true; - __sync_synchronize(); - if (!IsEmptyQueue()) { // double-check, very important idea - s = eventfd_read(efd_empty, &u); - if (s != 0) { - if (errno == EINTR) { - continue; - } - ASCEND_LOGE("eventfd_read failed. s=%zd, errno=%s.", s, strerror(errno)); + ssize_t s; + uint64_t u = 1; + while (!IsEmptyQueue()) { + std::lock_guard lock(mu_empty); + need_empty = true; + __sync_synchronize(); + if (!IsEmptyQueue()) { // double-check, very important idea + s = eventfd_read(efd_empty, &u); + if (s != 0) { + if (errno == EINTR) { + continue; + } + ASCEND_LOGE("eventfd_read failed. s=%zd, errno=%s.", s, strerror(errno)); #ifndef BUILD_LIBTORCH - // Get the GIL - if (gilState) { - PyEval_RestoreThread(gilState); - } + // Get the GIL + if (gilState) { + PyEval_RestoreThread(gilState); + } #endif - return INTERNEL_ERROR; - } + return INTERNEL_ERROR; + } + } + need_empty = false; } - need_empty = false; - } } - if (GetStatus() == RepoStatus::ERROR_EXIT) { - // Avoid repeatedly throwing exceptions - SetStatus(CAN_EXIT); - throw std::runtime_error("The Inner error is reported as above.\n "\ - "Since the operator is called asynchronously, the stacktrace may be inaccurate. "\ - "If you want to get the accurate stacktrace, "\ - "pleace set the environment variable ASCEND_LAUNCH_BLOCKING=1."); - } + if (GetStatus() == RepoStatus::ERROR_EXIT) { + // Avoid repeatedly throwing exceptions + SetStatus(CAN_EXIT); + throw std::runtime_error("The Inner error is reported as above.\n "\ + "Since the operator is called asynchronously, the stacktrace may be inaccurate. "\ + "If you want to get the accurate stacktrace, "\ + "pleace set the environment variable ASCEND_LAUNCH_BLOCKING=1."); + } #ifndef BUILD_LIBTORCH - // Get the GIL - if (gilState) { - PyEval_RestoreThread(gilState); - } + // Get the GIL + if (gilState) { + PyEval_RestoreThread(gilState); + } #endif - return SUCCESS; + return SUCCESS; } bool Repository::WriteQueue(void* cur_paras) { - std::lock_guard lock(mu_enqueue); - if (IsFullQueue()) { - return false; - } + std::lock_guard lock(mu_enqueue); + if (IsFullQueue()) { + return false; + } - __sync_synchronize(); - manager().Copy(datas, write_idx.idx, cur_paras); - __sync_synchronize(); + __sync_synchronize(); + manager().Copy(datas, write_idx.idx, cur_paras); + __sync_synchronize(); - write_idx.idx = (write_idx.idx + 1) & (kQueueCapacity - 1); - return true; + write_idx.idx = (write_idx.idx + 1) & (kQueueCapacity - 1); + return true; } bool Repository::ReadQueue() { - if (IsEmptyQueue()) { - return false; - } + if (IsEmptyQueue()) { + return false; + } - __sync_synchronize(); + __sync_synchronize(); #ifndef BUILD_LIBTORCH - at_npu::native::NpuUtils::ProfReportMarkDataToNpuProfiler(2, datas, read_idx.idx); - auto ret = manager().Call(datas, read_idx.idx); - at_npu::native::NpuUtils::ProfReportMarkDataToNpuProfiler(3, datas, read_idx.idx); + at_npu::native::NpuUtils::ProfReportMarkDataToNpuProfiler(2, datas, read_idx.idx); + auto ret = manager().Call(datas, read_idx.idx); + at_npu::native::NpuUtils::ProfReportMarkDataToNpuProfiler(3, datas, read_idx.idx); #else - auto ret = manager().Call(datas, read_idx.idx); + auto ret = manager().Call(datas, read_idx.idx); #endif - if (ret != 0) { - ASCEND_LOGE("---Thread---%llu: device = %d, write_idx = %u, read_idx = %u, status = %d, ret = %d", - std::this_thread::get_id(), device_idx, write_idx.idx, read_idx.idx, GetStatus(), ret); - while (!IsEmptyQueue()) { // ignore other tasks - manager().Release(datas, read_idx.idx, releaseQueue); - read_idx.idx = (read_idx.idx + 1) & (kQueueCapacity - 1); + if (ret != 0) { + ASCEND_LOGE("---Thread---%llu: device = %d, write_idx = %u, read_idx = %u, status = %d, ret = %d", + std::this_thread::get_id(), device_idx, write_idx.idx, read_idx.idx, GetStatus(), ret); + while (!IsEmptyQueue()) { // ignore other tasks + manager().Release(datas, read_idx.idx, releaseQueue); + read_idx.idx = (read_idx.idx + 1) & (kQueueCapacity - 1); + } + + SetStatus(ERROR_EXIT); + read_idx.idx = write_idx.idx; + __sync_synchronize(); + eventfd_write(efd_empty, 1); + eventfd_write(efd_write, 1); + return false; } - SetStatus(ERROR_EXIT); - read_idx.idx = write_idx.idx; + manager().Release(datas, read_idx.idx, releaseQueue); __sync_synchronize(); - eventfd_write(efd_empty, 1); - eventfd_write(efd_write, 1); - return false; - } - - manager().Release(datas, read_idx.idx, releaseQueue); - __sync_synchronize(); - read_idx.idx = (read_idx.idx + 1) & (kQueueCapacity - 1); + read_idx.idx = (read_idx.idx + 1) & (kQueueCapacity - 1); - return true; + return true; } void Repository::Enqueue(void* cur_paras) { - if (initialized == false) { - ASCEND_LOGE("Task queue is not initialized, shouldn't call Enqueue(). !!"); - return; - } - - if (GetStatus() == RepoStatus::ERROR_EXIT) { - // Avoid repeatedly throwing exceptions - SetStatus(CAN_EXIT); - throw std::runtime_error("The Inner error is reported as above.\n "\ - "Since the operator is called asynchronously, the stacktrace may be inaccurate. "\ - "If you want to get the accurate stacktrace, "\ - "pleace set the environment variable ASCEND_LAUNCH_BLOCKING=1."); - } + if (initialized == false) { + ASCEND_LOGE("Task queue is not initialized, shouldn't call Enqueue(). !!"); + return; + } - if (GetStatus() != RUN && GetStatus() != INIT) { - auto queueParam = static_cast(cur_paras); - auto type = queueParam->paramType; - if (type ==c10_npu::queue::COMPILE_AND_EXECUTE) { - auto cur_paras = static_cast(queueParam->paramVal); - auto op_param = cur_paras->opType; - ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). op name is=%s", *op_name); - } else if (type ==c10_npu::queue::ASYNC_MEMCPY) { - auto cur_paras = static_cast(queueParam->paramVal); - ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). src=%p, dst=%p", cur_paras->src, cur_paras->dst); - } else { - auto cur_paras = static_cast(queueParam->paramVal); - ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). event is=%p", cur_paras->event); + if (GetStatus() == RepoStatus::ERROR_EXIT) { + // Avoid repeatedly throwing exceptions + SetStatus(CAN_EXIT); + throw std::runtime_error("The Inner error is reported as above.\n "\ + "Since the operator is called asynchronously, the stacktrace may be inaccurate. "\ + "If you want to get the accurate stacktrace, "\ + "pleace set the environment variable ASCEND_LAUNCH_BLOCKING=1."); } - return; - } - bool ret = false; - ssize_t s; - uint64_t u = 1; - SetWriteWorking(true); - while (ret == false) { - ret = WriteQueue(cur_paras); - if (ret == false) { - SetWriteWorking(false); - __sync_synchronize(); - if (IsFullQueue()) { -#ifndef BUILD_LIBTORCH - // double check the current thread hold a Gil lock - if (PyGILState_Check()) { - Py_BEGIN_ALLOW_THREADS s = eventfd_read(efd_write, &u); - Py_END_ALLOW_THREADS + if (GetStatus() != RUN && GetStatus() != INIT) { + auto queueParam = static_cast(cur_paras); + auto type = queueParam->paramType; + if (type ==c10_npu::queue::COMPILE_AND_EXECUTE) { + auto cur_paras = static_cast(queueParam->paramVal); + auto op_param = cur_paras->opType; + ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). op name is=%s", *op_name); + } else if (type ==c10_npu::queue::ASYNC_MEMCPY) { + auto cur_paras = static_cast(queueParam->paramVal); + ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). src=%p, dst=%p", cur_paras->src, cur_paras->dst); } else { - s = eventfd_read(efd_write, &u); + auto cur_paras = static_cast(queueParam->paramVal); + ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). event is=%p", cur_paras->event); } + return; + } + bool ret = false; + ssize_t s; + uint64_t u = 1; + + SetWriteWorking(true); + while (ret == false) { + ret = WriteQueue(cur_paras); + if (ret == false) { + SetWriteWorking(false); + __sync_synchronize(); + if (IsFullQueue()) { +#ifndef BUILD_LIBTORCH + // double check the current thread hold a Gil lock + if (PyGILState_Check()) { + Py_BEGIN_ALLOW_THREADS s = eventfd_read(efd_write, &u); + Py_END_ALLOW_THREADS + } else { + s = eventfd_read(efd_write, &u); + } #else - s = eventfd_read(efd_write, &u); + s = eventfd_read(efd_write, &u); #endif - if (s != 0) { - if (errno == EINTR) { + if (s != 0) { + if (errno == EINTR) { + continue; + } + ASCEND_LOGE("waiting dequeue failed. s=%zd, errno=%s.", s, strerror(errno)); + return; + } + SetWriteWorking(true); + } continue; - } - ASCEND_LOGE("waiting dequeue failed. s=%zd, errno=%s.", s, strerror(errno)); - return; } - SetWriteWorking(true); - } - continue; - } - __sync_synchronize(); - while (!IsReadWorking()) { - s = eventfd_write(efd_read, u); - if (s != 0) { - if (errno == EINTR) { - continue; + __sync_synchronize(); + while (!IsReadWorking()) { + s = eventfd_write(efd_read, u); + if (s != 0) { + if (errno == EINTR) { + continue; + } + ASCEND_LOGE("notify consumer failed!! s=%zd, errno=%s", s, strerror(errno)); + return; + } + break; } - ASCEND_LOGE("notify consumer failed!! s=%zd, errno=%s", s, strerror(errno)); - return; - } - break; } - } - SetWriteWorking(false); + SetWriteWorking(false); } void Repository::Dequeue() { - if (initialized == false) { - ASCEND_LOGE("Task queue is not initialized, shouldn't call Dequeue(). !!"); - return; - } - - bool ret = false; - bool notify_empty = false; - ssize_t s; - uint64_t u = 1; - - SetReadWorking(true); - while (ret == false && GetStatus() != RepoStatus::CAN_EXIT) { - ret = ReadQueue(); - if (ret == false) { - if (GetStatus() == RepoStatus::NEED_EXIT) { - ChangeStatus(NEED_EXIT, CAN_EXIT); - break; - } + if (initialized == false) { + ASCEND_LOGE("Task queue is not initialized, shouldn't call Dequeue(). !!"); + return; + } - if (GetStatus() == RepoStatus::ERROR_EXIT) { - break; - } + bool ret = false; + bool notify_empty = false; + ssize_t s; + uint64_t u = 1; - SetReadWorking(false); - __sync_synchronize(); - if (IsEmptyQueue()) { - s = eventfd_read(efd_read, &u); - if (s != 0) { - if (errno == EINTR) { + SetReadWorking(true); + while (ret == false && GetStatus() != RepoStatus::CAN_EXIT) { + ret = ReadQueue(); + if (ret == false) { + if (GetStatus() == RepoStatus::NEED_EXIT) { + ChangeStatus(NEED_EXIT, CAN_EXIT); + break; + } + + if (GetStatus() == RepoStatus::ERROR_EXIT) { + break; + } + + SetReadWorking(false); + __sync_synchronize(); + if (IsEmptyQueue()) { + s = eventfd_read(efd_read, &u); + if (s != 0) { + if (errno == EINTR) { + continue; + } + ASCEND_LOGE("waiting enqueue failed. s=%zd, errno=%s.", s, strerror(errno)); + return; + } + SetReadWorking(true); + } continue; - } - ASCEND_LOGE("waiting enqueue failed. s=%zd, errno=%s.", s, strerror(errno)); - return; } - SetReadWorking(true); - } - continue; - } - __sync_synchronize(); - notify_empty = need_empty && - IsEmptyQueue(); // need_empty && (ret == false || IsEmptyQueue()); - while (notify_empty) { - s = eventfd_write(efd_empty, u); - if (s != 0) { - if (errno == EINTR) { - continue; + __sync_synchronize(); + notify_empty = need_empty && + IsEmptyQueue(); // need_empty && (ret == false || IsEmptyQueue()); + while (notify_empty) { + s = eventfd_write(efd_empty, u); + if (s != 0) { + if (errno == EINTR) { + continue; + } + ASCEND_LOGE("notify make_sure failed. s=%zd, errno=%s.", s, strerror(errno)); + return; + } + break; } - ASCEND_LOGE("notify make_sure failed. s=%zd, errno=%s.", s, strerror(errno)); - return; - } - break; - } - __sync_synchronize(); - while (!IsWriteWorking()) { - s = eventfd_write(efd_write, u); - if (s != 0) { - if (errno == EINTR) { - continue; + __sync_synchronize(); + while (!IsWriteWorking()) { + s = eventfd_write(efd_write, u); + if (s != 0) { + if (errno == EINTR) { + continue; + } + ASCEND_LOGE("notify producer failed. s=%zd, errno=%s.", s, strerror(errno)); + return; + } + break; } - ASCEND_LOGE("notify producer failed. s=%zd, errno=%s.", s, strerror(errno)); - return; - } - break; } - } - SetReadWorking(false); + SetReadWorking(false); } void Repository::ReleaseResource() { - manager().DeInit(datas); - if (efd_read > 0) { - close(efd_read); - efd_read = -1; - } - if (efd_write > 0) { - close(efd_write); - efd_write = -1; - } - if (efd_empty > 0) { - close(efd_empty); - efd_empty = -1; - } + manager().DeInit(datas); + if (efd_read > 0) { + close(efd_read); + efd_read = -1; + } + if (efd_write > 0) { + close(efd_write); + efd_write = -1; + } + if (efd_empty > 0) { + close(efd_empty); + efd_empty = -1; + } } Repository::~Repository() { - if (initialized) { - if (consumer.joinable()) { - SetStatus(NEED_EXIT); - (void)eventfd_write(efd_read, 1); // escape wait - consumer.join(); - } - eventfd_write(efd_empty, 1); - ReleaseResource(); - } + if (initialized) { + if (consumer.joinable()) { + SetStatus(NEED_EXIT); + (void)eventfd_write(efd_read, 1); // escape wait + consumer.join(); + } + eventfd_write(efd_empty, 1); + ReleaseResource(); + } } bool Repository::IsFullQueue() const { - return ((write_idx.idx + 1) & (kQueueCapacity - 1)) == read_idx.idx; + return ((write_idx.idx + 1) & (kQueueCapacity - 1)) == read_idx.idx; } bool Repository::CheckInit() const { - return initialized; + return initialized; } void StartConsume(Repository* repo, c10::DeviceIndex device_id) { @@ -488,146 +488,146 @@ void StartConsume(Repository* repo, c10::DeviceIndex device_id) { } void Repository::InitRepo(c10::DeviceIndex device_id) { - if (datas == nullptr) { - datas = manager().Init(kQueueCapacity); - ASCEND_LOGI("TaskQueue is enable"); - } + if (datas == nullptr) { + datas = manager().Init(kQueueCapacity); + ASCEND_LOGI("TaskQueue is enable"); + } - efd_read = eventfd(0, 0); - efd_write = eventfd(0, 0); - efd_empty = eventfd(0, 0); + efd_read = eventfd(0, 0); + efd_write = eventfd(0, 0); + efd_empty = eventfd(0, 0); - initialized = true; - SetStatus(INIT); - device_idx = device_id; - std::thread cur_consumer(StartConsume, this, device_id); - consumer = std::move(cur_consumer); + initialized = true; + SetStatus(INIT); + device_idx = device_id; + std::thread cur_consumer(StartConsume, this, device_id); + consumer = std::move(cur_consumer); - releaseQueue.InitReleaseQueue(); + releaseQueue.InitReleaseQueue(); } static constexpr size_t kReleaseQueueCapacity = 8192; bool ReleaseQueue::WriteToReleaseQueue(void* cur_paras) { - if (IsFullQueue()) { - return false; - } - __sync_synchronize(); - releaseManager().CopyRealseParam(datas, write_idx.idx, cur_paras); + if (IsFullQueue()) { + return false; + } + __sync_synchronize(); + releaseManager().CopyRealseParam(datas, write_idx.idx, cur_paras); - __sync_synchronize(); - write_idx.idx = (write_idx.idx + 1) & (kReleaseQueueCapacity - 1); - return true; + __sync_synchronize(); + write_idx.idx = (write_idx.idx + 1) & (kReleaseQueueCapacity - 1); + return true; } void ReleaseQueue::PushToReleaseQueue(void* cur_paras) { if (initialized == false) { - ASCEND_LOGE("Release queue is not initialized, shouldn't call PushToReleaseQueue(). !!"); - return; + ASCEND_LOGE("Release queue is not initialized, shouldn't call PushToReleaseQueue(). !!"); + return; } bool ret = false; while (ret == false) { - ret = WriteToReleaseQueue(cur_paras); - if (ret == true) { - break; - } + ret = WriteToReleaseQueue(cur_paras); + if (ret == true) { + break; + } } } bool ReleaseQueue::ReadFromReleaseQueue() { - if (IsEmptyQueue()) { - return false; - } + if (IsEmptyQueue()) { + return false; + } - __sync_synchronize(); - releaseManager().ReleaseParam(datas, read_idx.idx); + __sync_synchronize(); + releaseManager().ReleaseParam(datas, read_idx.idx); - __sync_synchronize(); - read_idx.idx = (read_idx.idx + 1) & (kReleaseQueueCapacity - 1); + __sync_synchronize(); + read_idx.idx = (read_idx.idx + 1) & (kReleaseQueueCapacity - 1); - return true; + return true; } void ReleaseQueue::PopFromReleaseQueue() { - if (initialized == false) { - ASCEND_LOGE("Release queue is not initialized, shouldn't call PopFromReleaseQueue(). !!"); - return; - } + if (initialized == false) { + ASCEND_LOGE("Release queue is not initialized, shouldn't call PopFromReleaseQueue(). !!"); + return; + } - bool ret = false; - while ((ret == false) && (GetStatus() != RepoStatus::CAN_EXIT)) { - ret = ReadFromReleaseQueue(); - if (ret == false) { - if (GetStatus() == RepoStatus::NEED_EXIT) { - ChangeStatus(NEED_EXIT, CAN_EXIT); - break; - } - delay.tv_usec = 1; - select(0, nullptr, nullptr, nullptr, &delay); + bool ret = false; + while ((ret == false) && (GetStatus() != RepoStatus::CAN_EXIT)) { + ret = ReadFromReleaseQueue(); + if (ret == false) { + if (GetStatus() == RepoStatus::NEED_EXIT) { + ChangeStatus(NEED_EXIT, CAN_EXIT); + break; + } + delay.tv_usec = 1; + select(0, nullptr, nullptr, nullptr, &delay); + } } - } } void StartRelease(ReleaseQueue* releaseQue) { - if (prctl(PR_SET_NAME, ("Release_thread")) != 0) { - ASCEND_LOGE("set thread name failed!"); - } + if (prctl(PR_SET_NAME, ("Release_thread")) != 0) { + ASCEND_LOGE("set thread name failed!"); + } - while (releaseQue->GetStatus() != RepoStatus::CAN_EXIT) { - releaseQue->PopFromReleaseQueue(); - } - return; + while (releaseQue->GetStatus() != RepoStatus::CAN_EXIT) { + releaseQue->PopFromReleaseQueue(); + } + return; } void ReleaseQueue::InitReleaseQueue() { - if (datas == nullptr) { - datas = releaseManager().Init(kReleaseQueueCapacity); - } + if (datas == nullptr) { + datas = releaseManager().Init(kReleaseQueueCapacity); + } - initialized = true; - SetStatus(INIT); - std::thread cur_releaser(StartRelease, this); - releaser = std::move(cur_releaser); + initialized = true; + SetStatus(INIT); + std::thread cur_releaser(StartRelease, this); + releaser = std::move(cur_releaser); } ReleaseQueue::~ReleaseQueue() { - if (initialized) { - if (releaser.joinable()) { - SetStatus(NEED_EXIT); - releaser.join(); + if (initialized) { + if (releaser.joinable()) { + SetStatus(NEED_EXIT); + releaser.join(); + } } - } - releaseManager().DeInit(datas); + releaseManager().DeInit(datas); } bool ReleaseQueue::IsFullQueue() const { - return ((write_idx.idx + 1) % kReleaseQueueCapacity) == read_idx.idx; + return ((write_idx.idx + 1) % kReleaseQueueCapacity) == read_idx.idx; } RepoStatus ReleaseQueue::GetStatus() const { - if (initialized == false) { - ASCEND_LOGE("Release queue is not initialized, shouldn't call GetStatus(). !!"); - } + if (initialized == false) { + ASCEND_LOGE("Release queue is not initialized, shouldn't call GetStatus(). !!"); + } - return repo_status.load(); + return repo_status.load(); } void ReleaseQueue::SetStatus(RepoStatus desired) { - if (initialized == false) { - ASCEND_LOGE("Release queue is not initialized, shouldn't call SetStatus(). !!"); - return; - } + if (initialized == false) { + ASCEND_LOGE("Release queue is not initialized, shouldn't call SetStatus(). !!"); + return; + } - repo_status = desired; + repo_status = desired; } void ReleaseQueue::ChangeStatus(RepoStatus expected, RepoStatus desired) { - if (initialized == false) { - ASCEND_LOGE("Release queue is not initialized, shouldn't call ChangeStatus(). !!"); - return; - } + if (initialized == false) { + ASCEND_LOGE("Release queue is not initialized, shouldn't call ChangeStatus(). !!"); + return; + } - repo_status.compare_exchange_strong(expected, desired); + repo_status.compare_exchange_strong(expected, desired); } } // namespace c10_npu -- Gitee From 2ffcd4d2730989c0826387befcb3ed3ee42acfcf Mon Sep 17 00:00:00 2001 From: dongwenbo6 Date: Sun, 18 Feb 2024 11:17:33 +0800 Subject: [PATCH 3/4] fix bug --- torch_npu/csrc/core/npu/NPUQueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torch_npu/csrc/core/npu/NPUQueue.cpp b/torch_npu/csrc/core/npu/NPUQueue.cpp index 1c54693e136..382ca74bc45 100644 --- a/torch_npu/csrc/core/npu/NPUQueue.cpp +++ b/torch_npu/csrc/core/npu/NPUQueue.cpp @@ -305,7 +305,7 @@ void Repository::Enqueue(void* cur_paras) { auto type = queueParam->paramType; if (type ==c10_npu::queue::COMPILE_AND_EXECUTE) { auto cur_paras = static_cast(queueParam->paramVal); - auto op_param = cur_paras->opType; + auto op_name = cur_paras->opType; ASCEND_LOGE("Task queue thread is exit, cann't call Enqueue(). op name is=%s", *op_name); } else if (type ==c10_npu::queue::ASYNC_MEMCPY) { auto cur_paras = static_cast(queueParam->paramVal); -- Gitee From 6e2bfa15e76aea7b7b2f0d06da4399367dc543a7 Mon Sep 17 00:00:00 2001 From: dongwenbo6 Date: Sun, 18 Feb 2024 11:53:25 +0800 Subject: [PATCH 4/4] reorganize files --- torch_npu/csrc/core/npu/NPUQueue.cpp | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/torch_npu/csrc/core/npu/NPUQueue.cpp b/torch_npu/csrc/core/npu/NPUQueue.cpp index 382ca74bc45..21d59bcdbf4 100644 --- a/torch_npu/csrc/core/npu/NPUQueue.cpp +++ b/torch_npu/csrc/core/npu/NPUQueue.cpp @@ -91,11 +91,11 @@ public: } void DeInit(void* ptr) { - if (ptr != nullptr) { - TORCH_CHECK(this->deleteFunc, "Failed to find delete function."); - this->deleteFunc(ptr); - ptr = nullptr; - } + if (ptr != nullptr) { + TORCH_CHECK(this->deleteFunc, "Failed to find delete function."); + this->deleteFunc(ptr); + ptr = nullptr; + } } private: int sizePerParams = 0; @@ -452,13 +452,13 @@ void Repository::ReleaseResource() { Repository::~Repository() { if (initialized) { - if (consumer.joinable()) { - SetStatus(NEED_EXIT); - (void)eventfd_write(efd_read, 1); // escape wait - consumer.join(); - } - eventfd_write(efd_empty, 1); - ReleaseResource(); + if (consumer.joinable()) { + SetStatus(NEED_EXIT); + (void)eventfd_write(efd_read, 1); // escape wait + consumer.join(); + } + eventfd_write(efd_empty, 1); + ReleaseResource(); } } -- Gitee