diff --git a/torch_npu/csrc/core/npu/NPUQueue.cpp b/torch_npu/csrc/core/npu/NPUQueue.cpp index a356eb558a819b4f649faa563b9d70bedc6e141d..c41eba08c10837ed44f28eec214803f874da1705 100644 --- a/torch_npu/csrc/core/npu/NPUQueue.cpp +++ b/torch_npu/csrc/core/npu/NPUQueue.cpp @@ -314,17 +314,6 @@ void Repository::Enqueue(void* cur_paras) { continue; } __sync_synchronize(); - while (!IsReadWorking()) { - s = eventfd_write(efd_read, u); - if (s != 0) { - if (errno == EINTR) { - continue; - } - ASCEND_LOGE("notify consumer failed!! s=%zd, errno=%s", s, strerror(errno)); - return; - } - break; - } } SetWriteWorking(false); } @@ -352,14 +341,7 @@ void Repository::Dequeue() { SetReadWorking(false); __sync_synchronize(); if (IsEmptyQueue()) { - s = eventfd_read(efd_read, &u); - if (s != 0) { - if (errno == EINTR) { - continue; - } - ASCEND_LOGE("waiting enqueue failed. s=%zd, errno=%s.", s, strerror(errno)); - return; - } + WaitForTaskArrival(); SetReadWorking(true); } continue; @@ -394,6 +376,26 @@ void Repository::Dequeue() { SetReadWorking(false); } +void Repository::WaitForTaskArrival() { + uint64_t loop_cnt = 0; + struct timeval delay = {0, 0}; + + while (IsEmptyQueue()) { + if (GetStatus() != RepoStatus::RUN) { + break; + } + loop_cnt++; + if (loop_cnt < TASK_ARRIVAL_LOOP_CNT) { + continue; + } + delay.tv_usec = 1; + select(0, nullptr, nullptr, nullptr, &delay); + __sync_synchronize(); + } + + return; +} + void Repository::ReleaseResource() { manager().DeInit(datas); if (efd_read > 0) { diff --git a/torch_npu/csrc/core/npu/NPUQueue.h b/torch_npu/csrc/core/npu/NPUQueue.h index 7295fe0448779a1a9a02111765ac31b559416999..8ce46d87a88da60eac6f2391435a350f34d842d4 100644 --- a/torch_npu/csrc/core/npu/NPUQueue.h +++ b/torch_npu/csrc/core/npu/NPUQueue.h @@ -11,6 +11,7 @@ namespace c10_npu { +#define TASK_ARRIVAL_LOOP_CNT (3030000) // about 10ms struct sring_idx { bool working = false; volatile unsigned int idx = 0; @@ -96,6 +97,7 @@ private: bool IsReadWorking() const {return read_idx.working;}; bool WriteQueue(void* cur_paras); bool ReadQueue(); + void WaitForTaskArrival(); private: void* datas = nullptr;