From 7f8f35e1e5d5692bc380c4302e70942d4ac01d86 Mon Sep 17 00:00:00 2001 From: wangchao Date: Sat, 13 Sep 2025 10:14:40 +0800 Subject: [PATCH 1/2] [IPC] Add the ipc_collect interface; Synchronize with unmap; LazySetDevice; Remove clearipchandles --- .../csrc/core/npu/NPUCachingAllocator.cpp | 18 +++++------------- torch_npu/csrc/core/npu/NPUCachingAllocator.h | 6 ------ torch_npu/csrc/ipc/StorageSharing.cpp | 2 ++ torch_npu/csrc/npu/Module.cpp | 10 ++++++++++ torch_npu/csrc/npu/NPUPluggableAllocator.cpp | 6 ------ torch_npu/csrc/npu/NPUPluggableAllocator.h | 1 - torch_npu/npu/__init__.py | 5 +++-- torch_npu/npu/utils.py | 15 ++++++++++++++- 8 files changed, 34 insertions(+), 29 deletions(-) diff --git a/torch_npu/csrc/core/npu/NPUCachingAllocator.cpp b/torch_npu/csrc/core/npu/NPUCachingAllocator.cpp index 40105094de..4eaf09b131 100644 --- a/torch_npu/csrc/core/npu/NPUCachingAllocator.cpp +++ b/torch_npu/csrc/core/npu/NPUCachingAllocator.cpp @@ -576,9 +576,7 @@ private: // cannot call c10::npu::stream_synchronize because // it might grab the GIL which can lead to a deadlock // Locking order must be GIL -> Allocator Lock - if (stream_) { - NPU_CHECK_ERROR(aclrtSynchronizeStream(*stream_)); - } else { + { c10_npu::NPUGuard device_guard(device_); c10_npu::npuSynchronizeDevice(true); } @@ -3331,15 +3329,6 @@ public: ASCEND_LOGD("End empty cache with check_error = %d", check_error); } - void clearIpcHandles() override - { - std::lock_guard lock(ipcHandleMutex); - for (auto &handle : ipcHandles) { - NPU_CHECK_ERROR(c10_npu::acl::AclrtFreePhysical(handle)); - } - ipcHandles.clear(); - } - void *getBaseAllocation(void *ptr, size_t *outSize) override { Block *block = get_allocated_block(ptr); @@ -3652,7 +3641,10 @@ public: void clear() { if (npu_ipc_ptr_) { - c10_npu::NPUGuard device_guard(device_); + { + c10_npu::NPUGuard device_guard(device_); + c10_npu::npuSynchronizeDevice(true); + } NPU_CHECK_ERROR(c10_npu::acl::AclrtIpcMemClose(handle_s.c_str())); npu_ipc_ptr_ = nullptr; } diff --git a/torch_npu/csrc/core/npu/NPUCachingAllocator.h b/torch_npu/csrc/core/npu/NPUCachingAllocator.h index 30a42035ec..a5368b705d 100644 --- a/torch_npu/csrc/core/npu/NPUCachingAllocator.h +++ b/torch_npu/csrc/core/npu/NPUCachingAllocator.h @@ -203,7 +203,6 @@ public: virtual bool initialized() = 0; virtual void setMemoryFraction(double fraction, int device) = 0; virtual void emptyCache(bool check_error) = 0; - virtual void clearIpcHandles() = 0; virtual void cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock) = 0; virtual void* getBaseAllocation(void* ptr, size_t* size) = 0; virtual void recordStream(const c10::DataPtr& ptr, c10_npu::NPUStream stream) = 0; @@ -311,11 +310,6 @@ C10_NPU_API inline void emptyCache(bool check_error = true) return get()->emptyCache(check_error); } -inline void clearIpcHandles() -{ - return get()->clearIpcHandles(); -} - inline void cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock) { return get()->cacheInfo(dev_id, cachedAndFree, largestBlock); diff --git a/torch_npu/csrc/ipc/StorageSharing.cpp b/torch_npu/csrc/ipc/StorageSharing.cpp index 18fdd4c5e0..1de9d10dee 100644 --- a/torch_npu/csrc/ipc/StorageSharing.cpp +++ b/torch_npu/csrc/ipc/StorageSharing.cpp @@ -47,6 +47,7 @@ static PyObject* THNPStorage_shareNpu(PyObject* self, PyObject* args) } at::DeviceGuard device_guard(storage.device()); + c10_npu::LazySetDevice(storage.device().index()); THPObjectPtr tuple(PyTuple_New(8)); THPObjectPtr device(THPUtils_packInt32(storage.device().index())); THPObjectPtr _handle(Py_None); @@ -193,6 +194,7 @@ static PyObject* THNPStorage_newSharedNpu(PyObject* _unused, PyObject* args) const auto device = c10::checked_convert( THPUtils_unpackLong(_device), "c10::DeviceIndex"); c10_npu::NPUGuard device_guard(device); + c10_npu::LazySetDevice(device); if (PyObject_IsTrue(_event_sync_required)) { // TO BE DONE diff --git a/torch_npu/csrc/npu/Module.cpp b/torch_npu/csrc/npu/Module.cpp index 6f76b83047..7778a01095 100644 --- a/torch_npu/csrc/npu/Module.cpp +++ b/torch_npu/csrc/npu/Module.cpp @@ -47,6 +47,7 @@ #include "torch_npu/csrc/core/npu/interface/OpInterface.h" #include "torch_npu/csrc/core/npu/GetCANNInfo.h" #include "torch_npu/csrc/core/npu/NPUWorkspaceAllocator.h" +#include "torch_npu/csrc/ipc/NPUIPCTypes.h" #include "op_plugin/utils/custom_functions/opapi/FFTCommonOpApi.h" #include "torch_npu/csrc/aten/common/from_blob.h" #include "torch_npu/csrc/profiler/combined_traceback.h" @@ -994,6 +995,14 @@ PyObject* THNPModule_emptyCache(PyObject *_unused, PyObject *noargs) Py_RETURN_NONE; } +PyObject* THNPModule_npu_ipc_collect(PyObject *_unused, PyObject *noargs) +{ + HANDLE_TH_ERRORS + torch_npu::ipc::NpuIPCCollect(); + END_HANDLE_TH_ERRORS + Py_RETURN_NONE; +} + PyObject* THNPModule_memoryStats(PyObject *_unused, PyObject *arg) { HANDLE_TH_ERRORS @@ -1941,6 +1950,7 @@ static struct PyMethodDef THNPModule_methods[] = { {"_npu_is_jit_compile_false", (PyCFunction)THNPModule_is_jit_compile_false_wrap, METH_NOARGS, nullptr}, {"_npu_setMemoryFraction", (PyCFunction) THNPModule_setMemoryFraction, METH_VARARGS, nullptr}, {"_npu_emptyCache", (PyCFunction) THNPModule_emptyCache, METH_NOARGS, nullptr}, + {"_npu_ipc_collect", (PyCFunction) THNPModule_npu_ipc_collect, METH_NOARGS, nullptr}, {"_npu_memoryStats", (PyCFunction) THNPModule_memoryStats, METH_O, nullptr}, {"_npu_resetAccumulatedMemoryStats", (PyCFunction) THNPModule_resetAccumulatedMemoryStats, METH_O, nullptr}, {"_npu_resetPeakMemoryStats", (PyCFunction) THNPModule_resetPeakMemoryStats, METH_O, nullptr}, diff --git a/torch_npu/csrc/npu/NPUPluggableAllocator.cpp b/torch_npu/csrc/npu/NPUPluggableAllocator.cpp index 7610374a3b..14ea0ce7e7 100644 --- a/torch_npu/csrc/npu/NPUPluggableAllocator.cpp +++ b/torch_npu/csrc/npu/NPUPluggableAllocator.cpp @@ -189,12 +189,6 @@ void NPUPluggableAllocator::emptyCache(bool check_error) } } -void NPUPluggableAllocator::clearIpcHandles() -{ - TORCH_NPU_WARN("NPUPluggableAllocator does not yet support clearIpcHandles. " - "If you need it, please file an issue describing your use case."); -} - void NPUPluggableAllocator::cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock) { TORCH_NPU_WARN("NPUPluggableAllocator does not yet support cacheInfo. " diff --git a/torch_npu/csrc/npu/NPUPluggableAllocator.h b/torch_npu/csrc/npu/NPUPluggableAllocator.h index 266db02a60..a3691d48ee 100644 --- a/torch_npu/csrc/npu/NPUPluggableAllocator.h +++ b/torch_npu/csrc/npu/NPUPluggableAllocator.h @@ -60,7 +60,6 @@ struct NPUPluggableAllocator bool initialized() override; void setMemoryFraction(double fraction, int device) override; void emptyCache(bool check_error) override; - void clearIpcHandles() override; void cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock) override; void* getBaseAllocation(void* ptr, size_t* size) override; void recordStream(const c10::DataPtr&, streamType stream) override; diff --git a/torch_npu/npu/__init__.py b/torch_npu/npu/__init__.py index 313ea8ea5e..5506ae48ef 100644 --- a/torch_npu/npu/__init__.py +++ b/torch_npu/npu/__init__.py @@ -119,7 +119,8 @@ __all__ = [ "get_device_limit", "set_stream_limit", "reset_stream_limit", - "get_stream_limit" + "get_stream_limit", + "ipc_collect" ] from typing import Tuple, Union, List, cast, Optional @@ -139,7 +140,7 @@ from .utils import (synchronize, can_device_access_peer, set_device, current_dev device, device_of, StreamContext, stream, set_stream, current_stream, default_stream, set_sync_debug_mode, get_sync_debug_mode, init_dump, current_blas_handle, is_bf16_supported, utilization, finalize_dump, set_dump, get_npu_overflow_flag, clear_npu_overflow_flag, mem_get_info, - check_uce_in_memory, stress_detect, _get_uce_addr) + check_uce_in_memory, stress_detect, _get_uce_addr, ipc_collect) from ._recovery import restart_device, stop_device from .streams import Stream, Event, SyncLaunchStream, ExternalEvent from .mstx import mstx diff --git a/torch_npu/npu/utils.py b/torch_npu/npu/utils.py index 114664a7ce..cf4b325ade 100644 --- a/torch_npu/npu/utils.py +++ b/torch_npu/npu/utils.py @@ -17,7 +17,7 @@ __all__ = ["synchronize", "device_count", "can_device_access_peer", "set_device" "stream", "set_stream", "current_stream", "default_stream", "set_sync_debug_mode", "get_sync_debug_mode", "init_dump", "set_dump", "finalize_dump", "is_support_inf_nan", "is_bf16_supported", "get_npu_overflow_flag", "npu_check_overflow", "clear_npu_overflow_flag", "current_blas_handle", - "check_uce_in_memory", "stress_detect", "get_cann_version"] + "check_uce_in_memory", "stress_detect", "get_cann_version", "ipc_collect"] def get_cann_version(module="CANN"): @@ -58,6 +58,19 @@ def synchronize(device=None): return torch_npu._C._npu_synchronize() +def ipc_collect(): + r"""Force collects NPU memory after it has been released by NPU IPC. + + .. note:: + Checks if any sent NPU tensors could be cleaned from the memory. Force + closes shared memory file used for reference counting if there is no + active counters. Useful when the producer process stopped actively sending + tensors and want to release unused memory. + """ + torch_npu.npu._lazy_init() + return torch_npu._C._npu_ipc_collect() + + def device_count() -> int: return torch_npu.npu.device_count() -- Gitee From 013654fdad8222195574e2a96df4a81f2b3ef48c Mon Sep 17 00:00:00 2001 From: wangchao Date: Sat, 13 Sep 2025 10:18:27 +0800 Subject: [PATCH 2/2] cleancode --- test/torch_npu_schema.json | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/torch_npu_schema.json b/test/torch_npu_schema.json index 945194ba07..e6548382e6 100644 --- a/test/torch_npu_schema.json +++ b/test/torch_npu_schema.json @@ -1079,6 +1079,9 @@ "torch_npu.npu.check_uce_in_memory": { "signature": "(device_id)" }, + "torch_npu.npu.ipc_collect": { + "signature": "()" + }, "torch_npu.npu.clear_npu_overflow_flag": { "signature": "()" }, @@ -1640,6 +1643,9 @@ "torch_npu.npu.utils.check_uce_in_memory": { "signature": "(device_id)" }, + "torch_npu.npu.utils.ipc_collect": { + "signature": "()" + }, "torch_npu.npu.utils.current_blas_handle": { "signature": "()" }, -- Gitee