diff --git a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp index d39604b299ec82036e816b53fcd24d6bb0968803..01111c333312998a41bb067335ddb232cb996148 100644 --- a/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp +++ b/torch_npu/csrc/distributed/ProcessGroupHCCL.cpp @@ -3762,7 +3762,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce( check_npu_tensors_different_devices(tensors); std::vector tensors_cp = {tensors[0]}; std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collective( tensors_cp, tensors_cp, @@ -3775,9 +3774,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllReduce( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -3816,7 +3815,6 @@ c10::intrusive_ptr ProcessGroupHCCL::batch_isend_irecv( std::vector remote_rank_list) { std::vector tensors_tmp = {tensors[0]}; - auto streamId = getStreamId(false, -1); return collective( tensors_tmp, tensors_tmp, @@ -3831,7 +3829,7 @@ c10::intrusive_ptr ProcessGroupHCCL::batch_isend_irecv( numel_list.push_back(getNumelForHCCL(tensors[i])); type_list.push_back(getHcclDataType(tensors[i].scalar_type())); } - auto hccl_call = [tensor_ptr_list, numel_list, type_list, remote_rank_list, op_type, itemNum, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [tensor_ptr_list, numel_list, type_list, remote_rank_list, op_type, itemNum, comm, stream, is_dispatched]() -> int { HcclSendRecvItem sendRecvInfo[itemNum]; HcclSendRecvType currType; for (size_t i = 0; i < op_type.size(); ++i) { @@ -3850,7 +3848,7 @@ c10::intrusive_ptr ProcessGroupHCCL::batch_isend_irecv( }; } torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclBatchSendRecv", sendRecvInfo[0].count, sendRecvInfo[0].dataType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclBatchSendRecv", sendRecvInfo[0].count, sendRecvInfo[0].dataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclBatchIsendIrecv(sendRecvInfo, itemNum, comm, stream.stream(false)); *is_dispatched = true; @@ -3881,7 +3879,6 @@ c10::intrusive_ptr ProcessGroupHCCL::broadcast( const c10d::BroadcastOptions& opts) { check_npu_tensors_different_devices(tensors); - auto streamId = getStreamId(false, -1); return collective( tensors, tensors, @@ -3892,9 +3889,9 @@ c10::intrusive_ptr ProcessGroupHCCL::broadcast( auto inputDataPtr = input.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, numel, hcclType, root, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, numel, hcclType, root, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclBroadcast(inputDataPtr, numel, hcclType, root, comm, stream.stream(false)); *is_dispatched = true; @@ -3928,7 +3925,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce_coalesced( check_npu_tensors_same_device(tensors); std::vector tensors_cp = tensors; std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collectiveCoalesced( tensors_cp, tensors_cp, @@ -3941,9 +3937,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allreduce_coalesced( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllreduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllReduce( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -3991,7 +3987,6 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce( std::string functionName = __FUNCTION__; uint64_t rank = opts.rootRank; std::vector tensors_cp = {tensors[0]}; - auto streamId = getStreamId(false, -1); return collective( tensors_cp, tensors_cp, @@ -4004,9 +3999,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto reduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduce( inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream.stream(false)); @@ -4051,7 +4046,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_oop( std::vector inputTensors = {inputTensor}; std::vector outputTensors = {outputTensor}; std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collective( inputTensors, outputTensors, @@ -4064,9 +4058,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_oop( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto reduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduce", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduce", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduce( inputDataPtr, outputDataPtr, numel, hcclType, reduceOp, rank, comm, stream.stream(false)); @@ -4163,7 +4157,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base_uneven( auto inputTensors_ = cast_to_origin_format(inputTensors); auto outputTensors_ = cast_to_origin_format(outputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors_, @@ -4186,10 +4179,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base_uneven( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduceScatterV( inputDataPtr, @@ -4251,7 +4243,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base_uneven( auto inputTensors_ = cast_to_origin_format(inputTensors); auto outputTensors_ = cast_to_origin_format(outputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors_, @@ -4272,10 +4263,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base_uneven( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAllGatherV( inputDataPtr, @@ -4325,7 +4315,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( auto outputFlattened = flatten_for_scatter_gather(byte_alignment_outputTensors, byte_alignment_inputTensors_, size_); check_npu_tensors_different_devices(outputFlattened); - auto streamId = getStreamId(false, -1); return collective( byte_alignment_inputTensors_, outputFlattened, @@ -4339,9 +4328,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4396,7 +4385,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( } std::vector inputFlattened = {at::flatten(inputTensors[0])}; std::vector outputFlattened = {at::cat(flattenedOutputTensors, 0)}; - auto streamId = getStreamId(false, -1); return collective( inputFlattened, outputFlattened, @@ -4417,10 +4405,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclAllGatherV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAllGatherV( inputDataPtr, @@ -4471,7 +4458,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( const auto num_devices = outputTensors.size(); const auto num_reduces = outputTensors[0].size(); std::vector> works; - auto streamId = getStreamId(false, -1); // Need to add a method like startCoalescing(); for (const auto i : c10::irange(num_reduces)) { std::vector inputs_multi_dev(num_devices); @@ -4496,7 +4482,7 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather( auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclBroadcast", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclBroadcast(inputDataPtr, numel, hcclType, root, comm, stream.stream()); *is_dispatched = true; @@ -4521,7 +4507,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_into_tensor_coalesced const c10d::AllgatherOptions& opts) { auto inputTensors_ = cast_to_origin_format(inputs); - auto streamId = getStreamId(false, -1); return collectiveCoalesced( inputTensors_, outputs, @@ -4534,9 +4519,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_into_tensor_coalesced auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4562,7 +4547,6 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_togather( check_npu_tensors_different_devices(inputTensors); check_npu_tensors_different_devices(outputTensors); auto inputTensors_ = cast_to_origin_format(inputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors, @@ -4575,9 +4559,9 @@ c10::intrusive_ptr ProcessGroupHCCL::allgather_togather( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4608,7 +4592,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base( check_npu_tensors_different_devices(inputTensors); check_npu_tensors_different_devices(outputTensors); auto inputTensors_ = cast_to_origin_format(inputTensors); - auto streamId = getStreamId(false, -1); return collective( inputTensors_, outputTensors, @@ -4621,9 +4604,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_allgather_base( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclAllGather", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclAllGather(inputDataPtr, outputDataPtr, numel, hcclType, comm, stream.stream(false)); *is_dispatched = true; @@ -4644,7 +4627,6 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter( const c10d::ReduceScatterOptions& opts) { check_npu_tensors_different_devices(outputTensors); - auto streamId = getStreamId(false, -1); bool same_size = check_same_size(inputTensors.back()); if (same_size) { auto inputFlattened = flatten_for_scatter_gather(inputTensors, outputTensors, size_); @@ -4664,9 +4646,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclReduceScatter( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -4746,10 +4728,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter( numel, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclReduceScatterV", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclReduceScatterV( inputDataPtr, @@ -4840,7 +4821,6 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base( auto inputs = std::vector{inputTensor}; auto outputs = std::vector{outputTensor}; - auto streamId = getStreamId(false, -1); std::string functionName = __FUNCTION__; return collective( inputs, @@ -4856,9 +4836,9 @@ c10::intrusive_ptr ProcessGroupHCCL::_reduce_scatter_base( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclReduceScatter( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -4887,7 +4867,6 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter_tensor_coalesced const c10d::ReduceScatterOptions& opts) { std::string functionName = __FUNCTION__; - auto streamId = getStreamId(false, -1); return collectiveCoalesced( inputTensors, outputTensors, @@ -4902,9 +4881,9 @@ c10::intrusive_ptr ProcessGroupHCCL::reduce_scatter_tensor_coalesced auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclReduceOp = getHcclReduceOp(opts.reduceOp, input); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclReduceScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclReduceScatter( inputDataPtr, outputDataPtr, numel, hcclType, hcclReduceOp, comm, stream.stream(false)); @@ -5020,7 +4999,6 @@ c10::intrusive_ptr ProcessGroupHCCL::scatter( inputTensors.push_back(empty); inputFlattened = flatten_for_scatter_gather(inputTensors, outputTensors, size_); } - auto streamId = getStreamId(false, -1); return collective( inputFlattened, outputTensors, @@ -5034,9 +5012,9 @@ c10::intrusive_ptr ProcessGroupHCCL::scatter( auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, root, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, outputDataPtr, numel, hcclType, root, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclScatter", numel, hcclType, comm, streamId, -1, -1), stream.stream(false), + getMstxHcclMsg("HcclScatter", numel, hcclType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclScatter(inputDataPtr, outputDataPtr, numel, hcclType, root, comm, stream.stream(false)); *is_dispatched = true; @@ -5073,7 +5051,6 @@ c10::intrusive_ptr ProcessGroupHCCL::scatter( c10::intrusive_ptr ProcessGroupHCCL::send(std::vector& tensors, int dstRank, int tag) { check_npu_tensors_different_devices(tensors); - auto streamId = getStreamId(true, dstRank); auto tensors_ = cast_to_origin_format(tensors); auto ret = pointToPoint( tensors_, @@ -5082,9 +5059,9 @@ c10::intrusive_ptr ProcessGroupHCCL::send(std::vector& t auto inputDataPtr = input.data_ptr(); auto numel = getNumelForHCCL(input); auto hcclType = getHcclDataType(input.scalar_type()); - auto hccl_call = [inputDataPtr, numel, hcclType, dst_rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [inputDataPtr, numel, hcclType, dst_rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclSend", numel, hcclType, comm, streamId, -1, dst_rank), stream.stream(false), + getMstxHcclMsg("HcclSend", numel, hcclType, comm, stream.id(), -1, dst_rank), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclSend(inputDataPtr, numel, hcclType, static_cast(dst_rank), comm, stream.stream(false)); *is_dispatched = true; @@ -5101,7 +5078,6 @@ c10::intrusive_ptr ProcessGroupHCCL::send(std::vector& t c10::intrusive_ptr ProcessGroupHCCL::recv(std::vector& tensors, int srcRank, int tag) { check_npu_tensors_different_devices(tensors); - auto streamId = getStreamId(true, srcRank); auto tensors_ = create_base_format_tensors(tensors); auto ret = pointToPoint( tensors_, @@ -5113,9 +5089,9 @@ c10::intrusive_ptr ProcessGroupHCCL::recv(std::vector& t auto outputDataPtr = output.data_ptr(); auto numel = getNumelForHCCL(output); auto hcclType = getHcclDataType(output.scalar_type()); - auto hccl_call = [outputDataPtr, numel, hcclType, src_rank, comm, stream, is_dispatched, streamId]() -> int { + auto hccl_call = [outputDataPtr, numel, hcclType, src_rank, comm, stream, is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclRecv", numel, hcclType, comm, streamId, src_rank, -1), stream.stream(false), + getMstxHcclMsg("HcclRecv", numel, hcclType, comm, stream.id(), src_rank, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = HcclRecv(outputDataPtr, numel, hcclType, static_cast(src_rank), comm, stream.stream(false)); *is_dispatched = true; @@ -5182,7 +5158,6 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( DIST_ERROR(ErrCode::PARAM)); uint64_t output_counts = static_cast(outputTensor.numel() / ranks); uint64_t input_counts = static_cast(inputTensor.numel() / ranks); - auto streamId = getStreamId(false, -1); check_npu_tensors_different_devices(inputTensors); check_npu_tensors_different_devices(outputTensors); return collective( @@ -5205,10 +5180,9 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( outputhcclDataType, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( - getMstxHcclMsg("HcclAlltoAll", input_counts, inputhcclDataType, comm, streamId, -1, -1), + getMstxHcclMsg("HcclAlltoAll", input_counts, inputhcclDataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAlltoAll( inputDataPtr, @@ -5282,7 +5256,6 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( inputSpl.push_back(inputSpl[i - 1] + inputCounts[i - 1]); } } - auto streamId = getStreamId(false, -1); check_npu_tensors_different_devices(inputTensors); check_npu_tensors_different_devices(outputTensors); return collective( @@ -5304,11 +5277,10 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall_base( outputhcclDataType, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( getMstxHcclMsg("HcclAlltoAllV", static_cast(inputCounts.size()), - inputhcclDataType, comm, streamId, -1, -1), + inputhcclDataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAlltoAllV( inputDataPtr, @@ -5406,7 +5378,7 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall( check_npu_tensors_different_devices(in_tensors); check_npu_tensors_different_devices(out_tensors); - auto streamId = getStreamId(false, -1); + return collective( input_tensors_, output_tensors_, @@ -5426,11 +5398,10 @@ c10::intrusive_ptr ProcessGroupHCCL::alltoall( outputhcclDataType, comm, stream, - is_dispatched, - streamId]() -> int { + is_dispatched]() -> int { torch_npu::profiler::MstxRange range( getMstxHcclMsg("HcclAlltoAllV", static_cast(input_counts.size()), - inputhcclDataType, comm, streamId, -1, -1), + inputhcclDataType, comm, stream.id(), -1, -1), stream.stream(false), torch_npu::profiler::DOMAIN_COMMUNICATION); auto hccl_result = hcclAlltoAllV( inputDataPtr,