diff --git a/test/torch_npu_schema.json b/test/torch_npu_schema.json index 945194ba07f72b616b3f9b5781fc112e16a6d949..e6548382e6b783d068c6bb37d5e94d7a214c01fc 100644 --- a/test/torch_npu_schema.json +++ b/test/torch_npu_schema.json @@ -1079,6 +1079,9 @@ "torch_npu.npu.check_uce_in_memory": { "signature": "(device_id)" }, + "torch_npu.npu.ipc_collect": { + "signature": "()" + }, "torch_npu.npu.clear_npu_overflow_flag": { "signature": "()" }, @@ -1640,6 +1643,9 @@ "torch_npu.npu.utils.check_uce_in_memory": { "signature": "(device_id)" }, + "torch_npu.npu.utils.ipc_collect": { + "signature": "()" + }, "torch_npu.npu.utils.current_blas_handle": { "signature": "()" }, diff --git a/torch_npu/csrc/core/npu/NPUCachingAllocator.cpp b/torch_npu/csrc/core/npu/NPUCachingAllocator.cpp index 40105094de6c19b3d0793ab58de304207cf9c8f9..4eaf09b1311b04526ecaf4de63380acc4a17dc67 100644 --- a/torch_npu/csrc/core/npu/NPUCachingAllocator.cpp +++ b/torch_npu/csrc/core/npu/NPUCachingAllocator.cpp @@ -576,9 +576,7 @@ private: // cannot call c10::npu::stream_synchronize because // it might grab the GIL which can lead to a deadlock // Locking order must be GIL -> Allocator Lock - if (stream_) { - NPU_CHECK_ERROR(aclrtSynchronizeStream(*stream_)); - } else { + { c10_npu::NPUGuard device_guard(device_); c10_npu::npuSynchronizeDevice(true); } @@ -3331,15 +3329,6 @@ public: ASCEND_LOGD("End empty cache with check_error = %d", check_error); } - void clearIpcHandles() override - { - std::lock_guard lock(ipcHandleMutex); - for (auto &handle : ipcHandles) { - NPU_CHECK_ERROR(c10_npu::acl::AclrtFreePhysical(handle)); - } - ipcHandles.clear(); - } - void *getBaseAllocation(void *ptr, size_t *outSize) override { Block *block = get_allocated_block(ptr); @@ -3652,7 +3641,10 @@ public: void clear() { if (npu_ipc_ptr_) { - c10_npu::NPUGuard device_guard(device_); + { + c10_npu::NPUGuard device_guard(device_); + c10_npu::npuSynchronizeDevice(true); + } NPU_CHECK_ERROR(c10_npu::acl::AclrtIpcMemClose(handle_s.c_str())); npu_ipc_ptr_ = nullptr; } diff --git a/torch_npu/csrc/core/npu/NPUCachingAllocator.h b/torch_npu/csrc/core/npu/NPUCachingAllocator.h index 30a42035ec1914d44e408e23520e3b593b2b37b6..a5368b705da1cf59e9ed40532283ad66872fec88 100644 --- a/torch_npu/csrc/core/npu/NPUCachingAllocator.h +++ b/torch_npu/csrc/core/npu/NPUCachingAllocator.h @@ -203,7 +203,6 @@ public: virtual bool initialized() = 0; virtual void setMemoryFraction(double fraction, int device) = 0; virtual void emptyCache(bool check_error) = 0; - virtual void clearIpcHandles() = 0; virtual void cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock) = 0; virtual void* getBaseAllocation(void* ptr, size_t* size) = 0; virtual void recordStream(const c10::DataPtr& ptr, c10_npu::NPUStream stream) = 0; @@ -311,11 +310,6 @@ C10_NPU_API inline void emptyCache(bool check_error = true) return get()->emptyCache(check_error); } -inline void clearIpcHandles() -{ - return get()->clearIpcHandles(); -} - inline void cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock) { return get()->cacheInfo(dev_id, cachedAndFree, largestBlock); diff --git a/torch_npu/csrc/ipc/StorageSharing.cpp b/torch_npu/csrc/ipc/StorageSharing.cpp index 18fdd4c5e0722bcde2133239e3ccf9c0f9ad6ba0..1de9d10dee47eab1625c89cae11b133442eb3c68 100644 --- a/torch_npu/csrc/ipc/StorageSharing.cpp +++ b/torch_npu/csrc/ipc/StorageSharing.cpp @@ -47,6 +47,7 @@ static PyObject* THNPStorage_shareNpu(PyObject* self, PyObject* args) } at::DeviceGuard device_guard(storage.device()); + c10_npu::LazySetDevice(storage.device().index()); THPObjectPtr tuple(PyTuple_New(8)); THPObjectPtr device(THPUtils_packInt32(storage.device().index())); THPObjectPtr _handle(Py_None); @@ -193,6 +194,7 @@ static PyObject* THNPStorage_newSharedNpu(PyObject* _unused, PyObject* args) const auto device = c10::checked_convert( THPUtils_unpackLong(_device), "c10::DeviceIndex"); c10_npu::NPUGuard device_guard(device); + c10_npu::LazySetDevice(device); if (PyObject_IsTrue(_event_sync_required)) { // TO BE DONE diff --git a/torch_npu/csrc/npu/Module.cpp b/torch_npu/csrc/npu/Module.cpp index 6f76b830472fa264d00c683a24986ca5ba6dac8a..7778a010955587d785918d09aceef27720668094 100644 --- a/torch_npu/csrc/npu/Module.cpp +++ b/torch_npu/csrc/npu/Module.cpp @@ -47,6 +47,7 @@ #include "torch_npu/csrc/core/npu/interface/OpInterface.h" #include "torch_npu/csrc/core/npu/GetCANNInfo.h" #include "torch_npu/csrc/core/npu/NPUWorkspaceAllocator.h" +#include "torch_npu/csrc/ipc/NPUIPCTypes.h" #include "op_plugin/utils/custom_functions/opapi/FFTCommonOpApi.h" #include "torch_npu/csrc/aten/common/from_blob.h" #include "torch_npu/csrc/profiler/combined_traceback.h" @@ -994,6 +995,14 @@ PyObject* THNPModule_emptyCache(PyObject *_unused, PyObject *noargs) Py_RETURN_NONE; } +PyObject* THNPModule_npu_ipc_collect(PyObject *_unused, PyObject *noargs) +{ + HANDLE_TH_ERRORS + torch_npu::ipc::NpuIPCCollect(); + END_HANDLE_TH_ERRORS + Py_RETURN_NONE; +} + PyObject* THNPModule_memoryStats(PyObject *_unused, PyObject *arg) { HANDLE_TH_ERRORS @@ -1941,6 +1950,7 @@ static struct PyMethodDef THNPModule_methods[] = { {"_npu_is_jit_compile_false", (PyCFunction)THNPModule_is_jit_compile_false_wrap, METH_NOARGS, nullptr}, {"_npu_setMemoryFraction", (PyCFunction) THNPModule_setMemoryFraction, METH_VARARGS, nullptr}, {"_npu_emptyCache", (PyCFunction) THNPModule_emptyCache, METH_NOARGS, nullptr}, + {"_npu_ipc_collect", (PyCFunction) THNPModule_npu_ipc_collect, METH_NOARGS, nullptr}, {"_npu_memoryStats", (PyCFunction) THNPModule_memoryStats, METH_O, nullptr}, {"_npu_resetAccumulatedMemoryStats", (PyCFunction) THNPModule_resetAccumulatedMemoryStats, METH_O, nullptr}, {"_npu_resetPeakMemoryStats", (PyCFunction) THNPModule_resetPeakMemoryStats, METH_O, nullptr}, diff --git a/torch_npu/csrc/npu/NPUPluggableAllocator.cpp b/torch_npu/csrc/npu/NPUPluggableAllocator.cpp index 7610374a3ba35297c97eac5d17dbd11cc3bba0b9..14ea0ce7e73dbe0b18c255b8678c3a23ad44c5bc 100644 --- a/torch_npu/csrc/npu/NPUPluggableAllocator.cpp +++ b/torch_npu/csrc/npu/NPUPluggableAllocator.cpp @@ -189,12 +189,6 @@ void NPUPluggableAllocator::emptyCache(bool check_error) } } -void NPUPluggableAllocator::clearIpcHandles() -{ - TORCH_NPU_WARN("NPUPluggableAllocator does not yet support clearIpcHandles. " - "If you need it, please file an issue describing your use case."); -} - void NPUPluggableAllocator::cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock) { TORCH_NPU_WARN("NPUPluggableAllocator does not yet support cacheInfo. " diff --git a/torch_npu/csrc/npu/NPUPluggableAllocator.h b/torch_npu/csrc/npu/NPUPluggableAllocator.h index 266db02a604c906f0e5a4abf6e07d0f407504613..a3691d48eefbaf3743f5ce29a304a0dab3560151 100644 --- a/torch_npu/csrc/npu/NPUPluggableAllocator.h +++ b/torch_npu/csrc/npu/NPUPluggableAllocator.h @@ -60,7 +60,6 @@ struct NPUPluggableAllocator bool initialized() override; void setMemoryFraction(double fraction, int device) override; void emptyCache(bool check_error) override; - void clearIpcHandles() override; void cacheInfo(int dev_id, size_t* cachedAndFree, size_t* largestBlock) override; void* getBaseAllocation(void* ptr, size_t* size) override; void recordStream(const c10::DataPtr&, streamType stream) override; diff --git a/torch_npu/npu/__init__.py b/torch_npu/npu/__init__.py index 313ea8ea5e43fdf8dbdd3668ad2c44b28531916e..5506ae48efe802b8fd5bac2a5ef04895ac200629 100644 --- a/torch_npu/npu/__init__.py +++ b/torch_npu/npu/__init__.py @@ -119,7 +119,8 @@ __all__ = [ "get_device_limit", "set_stream_limit", "reset_stream_limit", - "get_stream_limit" + "get_stream_limit", + "ipc_collect" ] from typing import Tuple, Union, List, cast, Optional @@ -139,7 +140,7 @@ from .utils import (synchronize, can_device_access_peer, set_device, current_dev device, device_of, StreamContext, stream, set_stream, current_stream, default_stream, set_sync_debug_mode, get_sync_debug_mode, init_dump, current_blas_handle, is_bf16_supported, utilization, finalize_dump, set_dump, get_npu_overflow_flag, clear_npu_overflow_flag, mem_get_info, - check_uce_in_memory, stress_detect, _get_uce_addr) + check_uce_in_memory, stress_detect, _get_uce_addr, ipc_collect) from ._recovery import restart_device, stop_device from .streams import Stream, Event, SyncLaunchStream, ExternalEvent from .mstx import mstx diff --git a/torch_npu/npu/utils.py b/torch_npu/npu/utils.py index 114664a7ce963f10ed0845bb3f273ac33b9f4059..cf4b325adec1ab06f4376e89721839eedfb40892 100644 --- a/torch_npu/npu/utils.py +++ b/torch_npu/npu/utils.py @@ -17,7 +17,7 @@ __all__ = ["synchronize", "device_count", "can_device_access_peer", "set_device" "stream", "set_stream", "current_stream", "default_stream", "set_sync_debug_mode", "get_sync_debug_mode", "init_dump", "set_dump", "finalize_dump", "is_support_inf_nan", "is_bf16_supported", "get_npu_overflow_flag", "npu_check_overflow", "clear_npu_overflow_flag", "current_blas_handle", - "check_uce_in_memory", "stress_detect", "get_cann_version"] + "check_uce_in_memory", "stress_detect", "get_cann_version", "ipc_collect"] def get_cann_version(module="CANN"): @@ -58,6 +58,19 @@ def synchronize(device=None): return torch_npu._C._npu_synchronize() +def ipc_collect(): + r"""Force collects NPU memory after it has been released by NPU IPC. + + .. note:: + Checks if any sent NPU tensors could be cleaned from the memory. Force + closes shared memory file used for reference counting if there is no + active counters. Useful when the producer process stopped actively sending + tensors and want to release unused memory. + """ + torch_npu.npu._lazy_init() + return torch_npu._C._npu_ipc_collect() + + def device_count() -> int: return torch_npu.npu.device_count()