diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp index 7ec49743870720ea41c86335f892f684ff221379..06174edbc0b16893a1a4b7c6dccb1a26c2b98ade 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp @@ -3768,7 +3768,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce( std::vector tensors_cp = {tensors[0]}; std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collective( tensors_cp, tensors_cp, @@ -3781,9 +3780,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllReduce( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -3826,7 +3825,6 @@ c10::intrusive_ptr ProcessGroupHCCL::batch_isend_irecv( } std::vector tensors_tmp = {tensors[0]}; - auto streamId = getStreamId(false, -1); return collective( tensors_tmp, tensors_tmp, @@ -3841,7 +3839,7 @@ c10::intrusive_ptr ProcessGroupHCCL::batch_isend_irecv( numel_list.push_back(getNumelForHCCL(tensors[i])); type_list.push_back(getHcclDataType(tensors[i].scalar_type())); } - auto hccl_call = [tensor_ptr_list, numel_list, type_list, remote_rank_list, op_type, itemNum, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [tensor_ptr_list, numel_list, type_list, remote_rank_list, op_type, itemNum, comm, stream, is_dispatched]() -> int { HcclSendRecvItem sendRecvInfo[itemNum]; HcclSendRecvType currType; for (size_t i = 0; i < op_type.size(); ++i) { @@ -3860,7 +3858,7 @@ c10::intrusive_ptr ProcessGroupHCCL::batch_isend_irecv( }; } torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclBatchSendRecv", sendRecvInfo[0].count, sendRecvInfo[0].dataType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclBatchSendRecv", sendRecvInfo[0].count, sendRecvInfo[0].dataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclBatchIsendIrecv(sendRecvInfo, itemNum, comm, stream.stream(false)); *is_dispatched = true; @@ -3895,7 +3893,6 @@ c10::intrusive_ptr ProcessGroupHCCL::broadcast( if (C10_UNLIKELY(at_npu::native::env::CheckOpHookEnable())) { at_npu::native::OpHook::GetInstance().PreHook("broadcast", tensors); } - auto streamId = getStreamId(false, -1); return collective( tensors, tensors, @@ -3906,9 +3903,9 @@ c10::intrusive_ptr ProcessGroupHCCL::broadcast( auto inputDataPtr = input.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, numel, hcclType, root, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, numel, hcclType, root, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclBroadcast(inputDataPtr, numel, hcclType, root, comm, stream.stream(false)); *is_dispatched = true; @@ -3942,7 +3939,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce_coalesced( check_npu_tensors_same_device(tensors); std::vector tensors_cp = tensors; std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collectiveCoalesced( tensors_cp, tensors_cp, @@ -3955,9 +3951,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce_coalesced( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllReduce( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -4010,7 +4006,6 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce( std::string functionName = __FUNCTION__; uint64_t rank = opts.rootRank; std::vector tensors_cp = {tensors[0]}; - auto streamId = getStreamId(false, -1); return collective( tensors_cp, tensors_cp, @@ -4023,9 +4018,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto reduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduce( inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream.stream(false)); @@ -4070,7 +4065,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_oop( std::vector inputTensors = {inputTensor}; std::vector outputTensors = {outputTensor}; std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collective( inputTensors, outputTensors, @@ -4083,9 +4077,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_oop( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto reduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduce( inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream.stream(false)); @@ -4182,7 +4176,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base_uneven( auto inputTensors_ = cast_to_origin_format(inputTensors); auto outputTensors_ = cast_to_origin_format(outputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors_, @@ -4205,10 +4198,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base_uneven( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduceScatterV( inputDataPtr, @@ -4270,7 +4262,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base_uneven( auto inputTensors_ = cast_to_origin_format(inputTensors); auto outputTensors_ = cast_to_origin_format(outputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors_, @@ -4291,10 +4282,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base_uneven( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAllGatherV( inputDataPtr, @@ -4349,7 +4339,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( auto outputFlattened = flatten_for_scatter_gather(byte_alignment_outputTensors, byte_alignment_inputTensors_, size_); check_npu_tensors_different_devices(outputFlattened); - auto streamId = getStreamId(false, -1); return collective( byte_alignment_inputTensors_, outputFlattened, @@ -4363,9 +4352,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4420,7 +4409,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( } std::vector inputFlattened = {at::flatten(inputTensors[0])}; std::vector outputFlattened = {at::cat(flattenedOutputTensors, 0)}; - auto streamId = getStreamId(false, -1); return collective( inputFlattened, outputFlattened, @@ -4441,10 +4429,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAllGatherV( inputDataPtr, @@ -4495,7 +4482,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( const auto num_devices = outputTensors.size(); const auto num_reduces = outputTensors[0].size(); std::vector> works; - auto streamId = getStreamId(false, -1); // Need to add a method like startCoalescing(); for (const auto i : c10::irange(num_reduces)) { std::vector inputs_multi_dev(num_devices); @@ -4522,7 +4508,7 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclBroadcast(inputDataPtr, numel, hcclType, root, comm, stream.stream()); *is_dispatched = true; @@ -4547,7 +4533,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_into_tensor_coalesced const c10d::AllgatherOptions& opts) { auto inputTensors_ = cast_to_origin_format(inputs); - auto streamId = getStreamId(false, -1); return collectiveCoalesced( inputTensors_, outputs, @@ -4560,9 +4545,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_into_tensor_coalesced auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4593,7 +4578,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_togather( } auto inputTensors_ = cast_to_origin_format(inputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors, @@ -4606,9 +4590,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_togather( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4644,7 +4628,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base( } auto inputTensors_ = cast_to_origin_format(inputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors, @@ -4657,9 +4640,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4684,7 +4667,6 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter( if (C10_UNLIKELY(at_npu::native::env::CheckOpHookEnable())) { at_npu::native::OpHook::GetInstance().PreHook("reduce_scatter", outputTensors, inputTensors); } - auto streamId = getStreamId(false, -1); bool same_size = check_same_size(inputTensors.back()); if (same_size) { auto inputFlattened = flatten_for_scatter_gather(inputTensors, outputTensors, size_); @@ -4704,9 +4686,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclReduceScatter( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -4786,10 +4768,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduceScatterV( inputDataPtr, @@ -4884,7 +4865,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base( if (C10_UNLIKELY(at_npu::native::env::CheckOpHookEnable())) { at_npu::native::OpHook::GetInstance().PreHook("_reduce_scatter_base", outputs, inputs); } - auto streamId = getStreamId(false, -1); std::string functionName = __FUNCTION__; return collective( inputs, @@ -4900,9 +4880,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclReduceScatter( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -4931,7 +4911,6 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter_tensor_coalesced const c10d::ReduceScatterOptions& opts) { std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collectiveCoalesced( inputTensors, outputTensors, @@ -4946,9 +4925,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter_tensor_coalesced auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclReduceScatter( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -5063,7 +5042,6 @@ c10::intrusive_ptr ProcessGroupHCCL::scatter( } else { inputFlattened.push_back(at::empty(0).to(outputTensors[0])); } - auto streamId = getStreamId(false, -1); return collective( inputFlattened, outputTensors, @@ -5077,9 +5055,9 @@ c10::intrusive_ptr ProcessGroupHCCL::scatter( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, root, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, root, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclScatter(inputDataPtr, outputDataPtr, numel, hcclType, root, comm, stream.stream(false)); *is_dispatched = true; @@ -5120,7 +5098,6 @@ c10::intrusive_ptr ProcessGroupHCCL::send(std::vector& t if (C10_UNLIKELY(at_npu::native::env::CheckOpHookEnable())) { at_npu::native::OpHook::GetInstance().PreHook("send", tensors); } - auto streamId = getStreamId(true, dstRank); auto tensors_ = cast_to_origin_format(tensors); auto ret = pointToPoint( tensors_, @@ -5129,9 +5106,9 @@ c10::intrusive_ptr ProcessGroupHCCL::send(std::vector& t auto inputDataPtr = input.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, numel, hcclType, dst_rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, numel, hcclType, dst_rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclSend", numel, hcclType, comm, streamId, -1, dst_rank), stream.stream(false), + getMstxHcclMsg("HcclSend", numel, hcclType, comm, stream.id(), -1, dst_rank), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclSend(inputDataPtr, numel, hcclType, static_cast(dst_rank), comm, stream.stream(false)); *is_dispatched = true; @@ -5152,7 +5129,6 @@ c10::intrusive_ptr ProcessGroupHCCL::recv(std::vector& t if (C10_UNLIKELY(at_npu::native::env::CheckOpHookEnable())) { at_npu::native::OpHook::GetInstance().PreHook("recv", tensors); } - auto streamId = getStreamId(true, srcRank); auto tensors_ = create_base_format_tensors(tensors); auto ret = pointToPoint( tensors_, @@ -5164,9 +5140,9 @@ c10::intrusive_ptr ProcessGroupHCCL::recv(std::vector& t auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclType = getHcclDataType(output.scalar_type()); - auto hccl_call = [outputDataPtr, numel, hcclType, src_rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [outputDataPtr, numel, hcclType, src_rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclRecv", numel, hcclType, comm, streamId, src_rank, -1), stream.stream(false), + getMstxHcclMsg("HcclRecv", numel, hcclType, comm, stream.id(), src_rank, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclRecv(outputDataPtr, numel, hcclType, static_cast(src_rank), comm, stream.stream(false)); *is_dispatched = true; @@ -5238,7 +5214,6 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( DIST_ERROR(ErrCode::PARAM)); uint64_t output_counts = static_cast(outputTensor.numel() / ranks); uint64_t input_counts = static_cast(inputTensor.numel() / ranks); - auto streamId = getStreamId(false, -1); check_npu_tensors_different_devices(inputTensors); check_npu_tensors_different_devices(outputTensors); return collective( @@ -5261,10 +5236,9 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( outputhcclDataType, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAlltoAll", input_counts, inputhcclDataType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclAlltoAll", input_counts, inputhcclDataType, comm, stream.id(), stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAlltoAll( inputDataPtr, @@ -5338,7 +5312,6 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( inputSpl.push_back(inputSpl[i - 1] + inputCounts[i - 1]); } } - auto streamId = getStreamId(false, -1); check_npu_tensors_different_devices(inputTensors); check_npu_tensors_different_devices(outputTensors); return collective( @@ -5360,11 +5333,10 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( outputhcclDataType, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( getMstxHcclMsg("HcclAlltoAllV", static_cast(inputCounts.size()), - inputhcclDataType, comm, streamId, -1, -1), + inputhcclDataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAlltoAllV( inputDataPtr, @@ -5467,7 +5439,6 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall( check_npu_tensors_different_devices(in_tensors); check_npu_tensors_different_devices(out_tensors); - auto streamId = getStreamId(false, -1); return collective( input_tensors_, output_tensors_, @@ -5487,11 +5458,10 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall( outputhcclDataType, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( getMstxHcclMsg("HcclAlltoAllV", static_cast(input_counts.size()), - inputhcclDataType, comm, streamId, -1, -1), + inputhcclDataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAlltoAllV( inputDataPtr,