From 42847feb4a1505e2a2b1d50e8040b1e181822b3f Mon Sep 17 00:00:00 2001 From: Gallium Date: Mon, 17 Mar 2025 18:03:07 +0800 Subject: [PATCH 1/2] dynolog lazy init --- .../_dynamic_profiler/_dynamic_monitor_proxy.py | 11 ++++++----- .../_dynamic_profiler_config_context.py | 5 +++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/torch_npu/profiler/_dynamic_profiler/_dynamic_monitor_proxy.py b/torch_npu/profiler/_dynamic_profiler/_dynamic_monitor_proxy.py index 18e0e51a5d7..d86de245cef 100644 --- a/torch_npu/profiler/_dynamic_profiler/_dynamic_monitor_proxy.py +++ b/torch_npu/profiler/_dynamic_profiler/_dynamic_monitor_proxy.py @@ -6,16 +6,17 @@ from ._dynamic_profiler_utils import DynamicProfilerUtils class PyDynamicMonitorProxySingleton(): def __init__(self): self._proxy = None - self._load_proxy() + self._load_success = True def _load_proxy(self): - if not self._proxy: + if not self._proxy and self._load_success: try: from IPCMonitor import PyDynamicMonitorProxy - self._proxy = PyDynamicMonitorProxy() except Exception as e: - dynamic_profiler_utils.stdout_log(f"Import IPCMonitro module failed :{e}!", - dynamic_profiler_utils.LoggerLevelEnum.WARNING) + self._load_success = False + return + self._proxy = PyDynamicMonitorProxy() def get_proxy(self): + self._load_proxy() return self._proxy \ No newline at end of file diff --git a/torch_npu/profiler/_dynamic_profiler/_dynamic_profiler_config_context.py b/torch_npu/profiler/_dynamic_profiler/_dynamic_profiler_config_context.py index 27060c49580..627f220393a 100644 --- a/torch_npu/profiler/_dynamic_profiler/_dynamic_profiler_config_context.py +++ b/torch_npu/profiler/_dynamic_profiler/_dynamic_profiler_config_context.py @@ -185,7 +185,8 @@ class ConfigContext: if not self._is_dyno: self._analyse = json_data.get("analyse", False) else: - self._analyse = json_data.get("PROFILE_ANALYSE", False) + self._analyse = json_data.get("PROFILE_ANALYSE", 'false') + self._analyse = self.BOOL_MAP.get(self._analyse.lower(), False) def _parse_dyno_exp_cfg(self, json_data: dict): profiler_level = json_data.get('PROFILE_PROFILER_LEVEL', 'Level0') @@ -197,7 +198,7 @@ class ConfigContext: op_attr = json_data.get('PROFILE_OP_ATTR', 'false') op_attr = self.BOOL_MAP.get(op_attr.lower(), False) gc_detect_threshold = json_data.get('PROFILE_GC_DETECT_THRESHOLD', None) - if gc_detect_threshold is not None: + if isinstance(gc_detect_threshold, str) and gc_detect_threshold != "None": gc_detect_threshold = float(gc_detect_threshold) data_simplification = json_data.get('PROFILE_DATA_SIMPLIFICATION', 'true') data_simplification = self.BOOL_MAP.get(data_simplification.lower(), True) -- Gitee From f8af2255c25d7e1081b5948c6b2349b18e7c1691 Mon Sep 17 00:00:00 2001 From: Gallium Date: Mon, 28 Jul 2025 11:32:57 +0800 Subject: [PATCH 2/2] =?UTF-8?q?streamId=E6=94=B9=E4=BB=8Estream=E4=B8=AD?= =?UTF-8?q?=E7=9B=B4=E6=8E=A5=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../csrc/distributed/ProcessGroupHCCL.cpp | 130 +++++++----------- 1 file changed, 50 insertions(+), 80 deletions(-) diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp index 244f6ef211f..19b86456e81 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp @@ -3773,7 +3773,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce( std::vector tensors_cp = {tensors[0]}; std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collective( tensors_cp, tensors_cp, @@ -3786,9 +3785,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllReduce( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -3831,7 +3830,6 @@ c10::intrusive_ptr ProcessGroupHCCL::batch_isend_irecv( } std::vector tensors_tmp = {tensors[0]}; - auto streamId = getStreamId(false, -1); return collective( tensors_tmp, tensors_tmp, @@ -3846,7 +3844,7 @@ c10::intrusive_ptr ProcessGroupHCCL::batch_isend_irecv( numel_list.push_back(getNumelForHCCL(tensors[i])); type_list.push_back(getHcclDataType(tensors[i].scalar_type())); } - auto hccl_call = [tensor_ptr_list, numel_list, type_list, remote_rank_list, op_type, itemNum, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [tensor_ptr_list, numel_list, type_list, remote_rank_list, op_type, itemNum, comm, stream, is_dispatched]() -> int { HcclSendRecvItem sendRecvInfo[itemNum]; HcclSendRecvType currType; for (size_t i = 0; i < op_type.size(); ++i) { @@ -3865,7 +3863,7 @@ c10::intrusive_ptr ProcessGroupHCCL::batch_isend_irecv( }; } torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclBatchSendRecv", sendRecvInfo[0].count, sendRecvInfo[0].dataType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclBatchSendRecv", sendRecvInfo[0].count, sendRecvInfo[0].dataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclBatchIsendIrecv(sendRecvInfo, itemNum, comm, stream.stream(false)); *is_dispatched = true; @@ -3900,7 +3898,6 @@ c10::intrusive_ptr ProcessGroupHCCL::broadcast( if (C10_UNLIKELY(at_npu::native::env::CheckOpHookEnable())) { at_npu::native::OpHook::GetInstance().PreHook("broadcast", tensors); } - auto streamId = getStreamId(false, -1); return collective( tensors, tensors, @@ -3911,9 +3908,9 @@ c10::intrusive_ptr ProcessGroupHCCL::broadcast( auto inputDataPtr = input.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, numel, hcclType, root, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, numel, hcclType, root, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclBroadcast(inputDataPtr, numel, hcclType, root, comm, stream.stream(false)); *is_dispatched = true; @@ -3947,7 +3944,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce_coalesced( check_npu_tensors_same_device(tensors); std::vector tensors_cp = tensors; std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collectiveCoalesced( tensors_cp, tensors_cp, @@ -3960,9 +3956,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce_coalesced( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllReduce( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -4015,7 +4011,6 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce( std::string functionName = __FUNCTION__; uint64_t rank = opts.rootRank; std::vector tensors_cp = {tensors[0]}; - auto streamId = getStreamId(false, -1); return collective( tensors_cp, tensors_cp, @@ -4028,9 +4023,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto reduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduce( inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream.stream(false)); @@ -4075,7 +4070,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_oop( std::vector inputTensors = {inputTensor}; std::vector outputTensors = {outputTensor}; std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collective( inputTensors, outputTensors, @@ -4088,9 +4082,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_oop( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto reduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduce( inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream.stream(false)); @@ -4187,7 +4181,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base_uneven( auto inputTensors_ = cast_to_origin_format(inputTensors); auto outputTensors_ = cast_to_origin_format(outputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors_, @@ -4210,10 +4203,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base_uneven( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduceScatterV( inputDataPtr, @@ -4275,7 +4267,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base_uneven( auto inputTensors_ = cast_to_origin_format(inputTensors); auto outputTensors_ = cast_to_origin_format(outputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors_, @@ -4296,10 +4287,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base_uneven( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAllGatherV( inputDataPtr, @@ -4354,7 +4344,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( auto outputFlattened = flatten_for_scatter_gather(byte_alignment_outputTensors, byte_alignment_inputTensors_, size_); check_npu_tensors_different_devices(outputFlattened); - auto streamId = getStreamId(false, -1); return collective( byte_alignment_inputTensors_, outputFlattened, @@ -4368,9 +4357,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4425,7 +4414,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( } std::vector inputFlattened = {at::flatten(inputTensors[0])}; std::vector outputFlattened = {at::cat(flattenedOutputTensors, 0)}; - auto streamId = getStreamId(false, -1); return collective( inputFlattened, outputFlattened, @@ -4446,10 +4434,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAllGatherV( inputDataPtr, @@ -4500,8 +4487,7 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( const auto num_devices = outputTensors.size(); const auto num_reduces = outputTensors[0].size(); std::vector> works; - auto streamId = getStreamId(false, -1); - // Need to add a method like startCoalescing(); + // Need to add a method like startCoalescing(); for (const auto i : c10::irange(num_reduces)) { std::vector inputs_multi_dev(num_devices); std::vector outputs_multi_dev(num_devices); @@ -4525,7 +4511,7 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclBroadcast(inputDataPtr, numel, hcclType, root, comm, stream.stream()); *is_dispatched = true; @@ -4550,7 +4536,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_into_tensor_coalesced const c10d::AllgatherOptions& opts) { auto inputTensors_ = cast_to_origin_format(inputs); - auto streamId = getStreamId(false, -1); return collectiveCoalesced( inputTensors_, outputs, @@ -4563,9 +4548,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_into_tensor_coalesced auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4596,7 +4581,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_togather( } auto inputTensors_ = cast_to_origin_format(inputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors, @@ -4609,9 +4593,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_togather( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4647,7 +4631,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base( } auto inputTensors_ = cast_to_origin_format(inputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors, @@ -4660,9 +4643,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4687,7 +4670,6 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter( if (C10_UNLIKELY(at_npu::native::env::CheckOpHookEnable())) { at_npu::native::OpHook::GetInstance().PreHook("reduce_scatter", outputTensors, inputTensors); } - auto streamId = getStreamId(false, -1); bool same_size = check_same_size(inputTensors.back()); if (same_size) { auto inputFlattened = flatten_for_scatter_gather(inputTensors, outputTensors, size_); @@ -4707,9 +4689,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclReduceScatter( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -4789,10 +4771,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduceScatterV( inputDataPtr, @@ -4887,7 +4868,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base( if (C10_UNLIKELY(at_npu::native::env::CheckOpHookEnable())) { at_npu::native::OpHook::GetInstance().PreHook("_reduce_scatter_base", outputs, inputs); } - auto streamId = getStreamId(false, -1); std::string functionName = __FUNCTION__; return collective( inputs, @@ -4903,9 +4883,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclReduceScatter( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -4934,7 +4914,6 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter_tensor_coalesced const c10d::ReduceScatterOptions& opts) { std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collectiveCoalesced( inputTensors, outputTensors, @@ -4949,9 +4928,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter_tensor_coalesced auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclReduceScatter( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -5071,7 +5050,6 @@ c10::intrusive_ptr ProcessGroupHCCL::scatter( inputTensors.push_back(empty); inputFlattened = flatten_for_scatter_gather(inputTensors, outputTensors, size_); } - auto streamId = getStreamId(false, -1); return collective( inputFlattened, outputTensors, @@ -5085,9 +5063,9 @@ c10::intrusive_ptr ProcessGroupHCCL::scatter( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, root, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, root, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclScatter(inputDataPtr, outputDataPtr, numel, hcclType, root, comm, stream.stream(false)); *is_dispatched = true; @@ -5128,7 +5106,6 @@ c10::intrusive_ptr ProcessGroupHCCL::send(std::vector& t if (C10_UNLIKELY(at_npu::native::env::CheckOpHookEnable())) { at_npu::native::OpHook::GetInstance().PreHook("send", tensors); } - auto streamId = getStreamId(true, dstRank); auto tensors_ = cast_to_origin_format(tensors); auto ret = pointToPoint( tensors_, @@ -5137,9 +5114,9 @@ c10::intrusive_ptr ProcessGroupHCCL::send(std::vector& t auto inputDataPtr = input.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, numel, hcclType, dst_rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, numel, hcclType, dst_rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclSend", numel, hcclType, comm, streamId, -1, dst_rank), stream.stream(false), + getMstxHcclMsg("HcclSend", numel, hcclType, comm, stream.id(), -1, dst_rank), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclSend(inputDataPtr, numel, hcclType, static_cast(dst_rank), comm, stream.stream(false)); *is_dispatched = true; @@ -5160,7 +5137,6 @@ c10::intrusive_ptr ProcessGroupHCCL::recv(std::vector& t if (C10_UNLIKELY(at_npu::native::env::CheckOpHookEnable())) { at_npu::native::OpHook::GetInstance().PreHook("recv", tensors); } - auto streamId = getStreamId(true, srcRank); auto tensors_ = create_base_format_tensors(tensors); auto ret = pointToPoint( tensors_, @@ -5172,9 +5148,9 @@ c10::intrusive_ptr ProcessGroupHCCL::recv(std::vector& t auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclType = getHcclDataType(output.scalar_type()); - auto hccl_call = [outputDataPtr, numel, hcclType, src_rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [outputDataPtr, numel, hcclType, src_rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclRecv", numel, hcclType, comm, streamId, src_rank, -1), stream.stream(false), + getMstxHcclMsg("HcclRecv", numel, hcclType, comm, stream.id(), src_rank, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclRecv(outputDataPtr, numel, hcclType, static_cast(src_rank), comm, stream.stream(false)); *is_dispatched = true; @@ -5246,8 +5222,7 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( DIST_ERROR(ErrCode::PARAM)); uint64_t output_counts = static_cast(outputTensor.numel() / ranks); uint64_t input_counts = static_cast(inputTensor.numel() / ranks); - auto streamId = getStreamId(false, -1); - check_npu_tensors_different_devices(inputTensors); + check_npu_tensors_different_devices(inputTensors); check_npu_tensors_different_devices(outputTensors); return collective( inputTensors_, @@ -5269,10 +5244,9 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( outputhcclDataType, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAlltoAll", input_counts, inputhcclDataType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclAlltoAll", input_counts, inputhcclDataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAlltoAll( inputDataPtr, @@ -5346,8 +5320,7 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( inputSpl.push_back(inputSpl[i - 1] + inputCounts[i - 1]); } } - auto streamId = getStreamId(false, -1); - check_npu_tensors_different_devices(inputTensors); + check_npu_tensors_different_devices(inputTensors); check_npu_tensors_different_devices(outputTensors); return collective( inputTensors_, @@ -5368,11 +5341,10 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( outputhcclDataType, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( getMstxHcclMsg("HcclAlltoAllV", static_cast(inputCounts.size()), - inputhcclDataType, comm, streamId, -1, -1), + inputhcclDataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAlltoAllV( inputDataPtr, @@ -5475,7 +5447,6 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall( check_npu_tensors_different_devices(in_tensors); check_npu_tensors_different_devices(out_tensors); - auto streamId = getStreamId(false, -1); return collective( input_tensors_, output_tensors_, @@ -5495,11 +5466,10 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall( outputhcclDataType, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( getMstxHcclMsg("HcclAlltoAllV", static_cast(input_counts.size()), - inputhcclDataType, comm, streamId, -1, -1), + inputhcclDataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAlltoAllV( inputDataPtr, -- Gitee