diff --git a/debug/accuracy_tools/msprobe/core/common/parallel_state.py b/debug/accuracy_tools/msprobe/core/common/parallel_state.py new file mode 100644 index 0000000000000000000000000000000000000000..d4e48b53c309ccea6c5f3c430f474f957bf5171f --- /dev/null +++ b/debug/accuracy_tools/msprobe/core/common/parallel_state.py @@ -0,0 +1,193 @@ +# Copyright (c) 2024-2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List + +from msprobe.core.common.log import logger +from msprobe.core.common.exceptions import MsprobeException + + +class RankGroupGenerator(object): + def __init__(self, tensor_parallel: int, expert_parallel: int, data_parallel: int, + pipeline_parallel: int, context_parallel: int, order: str) -> None: + self.tensor_parallel = tensor_parallel + self.expert_parallel = expert_parallel + self.data_parallel = data_parallel + self.pipeline_parallel = pipeline_parallel + self.context_parallel = context_parallel + self.total_size = tensor_parallel * data_parallel * pipeline_parallel * context_parallel + + self.parallel_sizes = { + "tp": self.tensor_parallel, + "pp": self.pipeline_parallel, + "dp": self.data_parallel, + "ep": self.expert_parallel, + "cp": self.context_parallel, + } + self.original_order = order + normalized_order = order.lower() + + # 检查ep和dp是否相邻 + if 'ep' in normalized_order: + if 'ep-dp' not in normalized_order and 'dp-ep' not in normalized_order: + logger.error(f"The ep and dp must be adjacent in order ({self.original_order}).") + raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) + + # 检查所有非1的并行维度是否都在order中 + for name in self.parallel_sizes.keys(): + size = self.parallel_sizes[name] + if name not in normalized_order: + if size != 1: + logger.error(f"The parallel size ({name}) is ({size}), " + f"but it's not specified in order ({self.original_order}).") + raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) + else: + normalized_order += '-' + name + + self.order_with_ep = normalized_order + self.order_without_ep = '-'.join([item for item in normalized_order.split('-') if item != 'ep']) + + self.size_list_with_ep = [] + self.size_list_without_ep = [] + + for item in normalized_order.split('-'): + if item == 'dp': + self.size_list_with_ep.append(self.data_parallel // self.expert_parallel) + self.size_list_without_ep.append(self.data_parallel) + elif item == 'ep': + self.size_list_with_ep.append(self.expert_parallel) + else: + self.size_list_with_ep.append(self.parallel_sizes[item]) + self.size_list_without_ep.append(self.parallel_sizes[item]) + + @staticmethod + def create_mask(order_str: str, target_tokens: str) -> List[bool]: + order_elements = order_str.split('-') + target_elements = target_tokens.split('-') + mask = [False] * len(order_elements) + for token in target_elements: + mask[order_elements.index(token)] = True + return mask + + @staticmethod + def create_masked_rank_groups( + total_size: int, + parallel_dims: List[int], + mask: List[bool], + ) -> List[List[int]]: + def compute_prefix_products(dimensions: List[int], initial: int = 1) -> List[int]: + products = [initial] + current = initial + for dim in dimensions: + current *= dim + products.append(current) + return products + + def calculate_inner_product(a: List[int], b: List[int]) -> int: + return sum(x * y for x, y in zip(a, b)) + + def decompose_index(index: int, shape: List[int], strides: List[int] = None) -> List[int]: + if strides is None: + strides = compute_prefix_products(shape) + indices = [(index // stride) % dim for dim, stride in zip(shape, strides)] + + # 验证分解是否正确 + if calculate_inner_product(indices, strides[:-1]) != index: + error_msg = f"The index {index} with shape {shape} doesn't match decomposed indices {indices}." + logger.error(error_msg) + raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) + + return indices + + # 分离被掩码和未被掩码的维度 + masked_dims = [dim for dim, is_masked in zip(parallel_dims, mask) if is_masked] + unmasked_dims = [dim for dim, is_masked in zip(parallel_dims, mask) if not is_masked] + + # 计算全局、掩码和未掩码的步长 + global_strides = compute_prefix_products(parallel_dims) + masked_strides = [stride for stride, is_masked in zip(global_strides, mask) if is_masked] + unmasked_strides = [stride for stride, is_masked in zip(global_strides, mask) if not is_masked] + + # 计算组大小和组数 + group_dim = compute_prefix_products(masked_dims)[-1] + group_count = total_size // group_dim + + # 生成所有组的rank + rank_groups = [] + for group_idx in range(group_count): + decomposed_group = decompose_index(group_idx, unmasked_dims) + current_group = [] + for in_group_idx in range(group_dim): + decomposed_rank = decompose_index(in_group_idx, masked_dims) + rank_value = (calculate_inner_product(decomposed_rank, masked_strides) + + calculate_inner_product(decomposed_group, unmasked_strides)) + current_group.append(rank_value) + rank_groups.append(current_group) + + return rank_groups + + def generate_ranks(self, token: str, separate_ep: bool = False) -> List[List[int]]: + if separate_ep: + parallel_dims = self.size_list_with_ep + current_order = self.order_with_ep + else: + parallel_dims = self.size_list_without_ep + current_order = self.order_without_ep + + mask = self.create_mask(current_order, token) + return self.create_masked_rank_groups(self.total_size, parallel_dims, mask) + + def generate_all_ranks(self) -> dict: + result = {} + for token in ["dp", "pp", "tp"]: + result[token] = self.generate_ranks(token) + result[f"{token}_size"] = self.parallel_sizes[token] + return result + + +def get_tp_pp_default_groups( + total_world_size: int, + tensor_parallel_size: int = 1, + pipeline_parallel_size: int = 1, + order: str = "tp-cp-ep-dp-pp", +) -> tuple: + context_parallel_size = 1 + expert_parallel_size = 1 + + # 检查world_size是否可被各并行维度的乘积整除 + product = tensor_parallel_size * pipeline_parallel_size * context_parallel_size + if total_world_size % product != 0: + logger.error(f"The world size ({total_world_size}) is not divisible by " + f"{tensor_parallel_size} x {pipeline_parallel_size} x {context_parallel_size}.") + raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) + + data_parallel_size = total_world_size // product + + # 检查数据并行是否可被专家并行整除 + if data_parallel_size % expert_parallel_size != 0: + logger.error(f"The data parallel size ({data_parallel_size}) is not divisible by expert parallel size.") + raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) + + # 生成rank组 + rank_creator = RankGroupGenerator( + tensor_parallel=tensor_parallel_size, + expert_parallel=expert_parallel_size, + data_parallel=data_parallel_size, + pipeline_parallel=pipeline_parallel_size, + context_parallel=context_parallel_size, + order=order, + ) + + return rank_creator.generate_ranks('tp'), rank_creator.generate_ranks('pp') diff --git a/debug/accuracy_tools/msprobe/docs/21.visualization_PyTorch.md b/debug/accuracy_tools/msprobe/docs/21.visualization_PyTorch.md index 8b0726cf167b2633dd7cfc66929a9318a3810f22..1e0129e946a7800762f49f22132145b91ba5d898 100644 --- a/debug/accuracy_tools/msprobe/docs/21.visualization_PyTorch.md +++ b/debug/accuracy_tools/msprobe/docs/21.visualization_PyTorch.md @@ -324,7 +324,7 @@ dump配置请参考[dump配置示例](./03.config_examples.md#16-task-配置为- - 当前支持的模型并行切分策略:Tensor Parallelism(TP)、Pipeline Parallelism(PP)、Virtual Pipeline Parallelism(VPP),暂不支持Context Parallelism(CP)和Expert Parallelism(EP)。 - 当前支持基于Megatron、MindSpeed-LLM套件的模型进行图合并,其他套件的模型图合并效果有待验证; -- 当前仅支持msprobe工具dump的statistics数据; +- 当前仅支持msprobe工具dump的statistics数据, level需指定L0或者mix; - 图合并比对时要确保Data Parallelism(DP)切分一致,例如rank=8 tp=1 pp=8的配置,dp=1,图合并将得到一张图,rank=8 tp=1 pp=4的配置,dp=2,图合并将得到两张图,暂不支持数量不一致的图进行比对。 使能方式: @@ -341,6 +341,7 @@ dump配置请参考[dump配置示例](./03.config_examples.md#16-task-配置为- | tp | 张量并行大小,int类型。实际训练脚本中需指定`--tensor-model-parallel-size T`,其中`T`表示张量模型并行大小,即**图合并所需的参数tp**, `tp=T`。 | 是 | | pp | 流水线并行的阶段数,int类型。实际训练脚本中需指定`--pipeline-model-parallel-size P`,其中`P`表示流水线并行的阶段数,即**图合并所需的参数pp**, `pp=P`。 | 是 | | vpp | 虚拟流水线并行阶段数,int类型。虚拟流水线并行依赖流水线并行,实际训练脚本中需指定`--num-layers-per-virtual-pipeline-stage V`,其中`V`表示每个虚拟流水线阶段的层数;指定`--num-layers L`,其中`L`表示模型总层数,**图合并所需的参数vpp**=`L/V/P`。vpp参数可以不配置,默认vpp=1代表未开启虚拟流水线并行。 | 否 | +| order | 模型并行维度的排序顺序,str类型。Megatron默认为`tp-cp-ep-dp-pp`。 如果使用msprobe工具dump数据指定level为L0并且实际训练脚本中的order非默认值(例如实际训练脚本中指定`--use-tp-pp-dp-mapping`),请传入修改后的order。dump数据指定level为mix则无需修改。 | 否 | npu_path、bench_path的配置以及执行命令请参考[3.2.3 批量构建或比对](#323-批量构建或比对) diff --git a/debug/accuracy_tools/msprobe/docs/22.visualization_MindSpore.md b/debug/accuracy_tools/msprobe/docs/22.visualization_MindSpore.md index 97997f106573db1857bdb18b5f3df9c01bcc510d..2c67f17a1df6117265c1a8bc047db81900f2883e 100644 --- a/debug/accuracy_tools/msprobe/docs/22.visualization_MindSpore.md +++ b/debug/accuracy_tools/msprobe/docs/22.visualization_MindSpore.md @@ -325,7 +325,7 @@ dump配置请参考[dump配置示例](./03.config_examples.md#35-task-配置为- - 当前支持的模型并行切分策略:Tensor Parallelism(TP)、Pipeline Parallelism(PP)、Virtual Pipeline Parallelism(VPP),暂不支持Context Parallelism(CP)和Expert Parallelism(EP)。 - 当前支持基于Megatron、MindSpeed-LLM套件的模型进行图合并,其他套件的模型图合并效果有待验证; -- 当前仅支持msprobe工具dump的statistics数据; +- 当前仅支持msprobe工具dump的statistics数据, level需指定L0或者mix; - 图合并比对时要确保Data Parallelism(DP)切分一致,例如rank=8 tp=1 pp=8的配置,dp=1,图合并将得到一张图,rank=8 tp=1 pp=4的配置,dp=2,图合并将得到两张图,暂不支持数量不一致的图进行比对。 使能方式: @@ -342,6 +342,7 @@ dump配置请参考[dump配置示例](./03.config_examples.md#35-task-配置为- | tp | 张量并行大小,int类型。实际训练脚本中需指定`--tensor-model-parallel-size T`,其中`T`表示张量模型并行大小,即**图合并所需的参数tp**, `tp=T`。 | 是 | | pp | 流水线并行的阶段数,int类型。实际训练脚本中需指定`--pipeline-model-parallel-size P`,其中`P`表示流水线并行的阶段数,即**图合并所需的参数pp**, `pp=P`。 | 是 | | vpp | 虚拟流水线并行阶段数,int类型。虚拟流水线并行依赖流水线并行,实际训练脚本中需指定`--num-layers-per-virtual-pipeline-stage V`,其中`V`表示每个虚拟流水线阶段的层数;指定`--num-layers L`,其中`L`表示模型总层数,**图合并所需的参数vpp**=`L/V/P`。vpp参数可以不配置,默认vpp=1代表未开启虚拟流水线并行。 | 否 | +| order | 模型并行维度的排序顺序,str类型。Megatron默认为`tp-cp-ep-dp-pp`。 如果使用msprobe工具dump数据指定level为L0并且实际训练脚本中的order非默认值(例如实际训练脚本中指定`--use-tp-pp-dp-mapping`),请传入修改后的order。dump数据指定level为mix则无需修改。 | 否 | npu_path、bench_path的配置以及执行命令请参考[3.2.3 批量构建或比对](#323-批量构建或比对) diff --git a/debug/accuracy_tools/msprobe/test/visualization_ut/builder/test_graph_merger.py b/debug/accuracy_tools/msprobe/test/visualization_ut/builder/test_graph_merger.py index 7c1033de54bd2f043b2cb51f04850ce2d423e001..36f551edc8df75b5d1d6c8c0ddf40e4eff0084fc 100644 --- a/debug/accuracy_tools/msprobe/test/visualization_ut/builder/test_graph_merger.py +++ b/debug/accuracy_tools/msprobe/test/visualization_ut/builder/test_graph_merger.py @@ -5,7 +5,7 @@ from msprobe.visualization.builder.graph_merger import ( NoParallelMerger, TPPPMerger, FullMerger ) from msprobe.core.common.const import Const -from msprobe.visualization.utils import GraphConst +from msprobe.visualization.utils import GraphConst, ParallelParam from msprobe.visualization.graph.node_op import NodeOp from msprobe.visualization.graph.graph import Graph from msprobe.core.common.exceptions import MsprobeException @@ -14,7 +14,7 @@ from msprobe.core.common.exceptions import MsprobeException class TestGraphMerger(unittest.TestCase): def setUp(self): self.build_graph_results = MagicMock() - self.parallel_param = MagicMock(tp=1, pp=1, rank_size=1) + self.parallel_param = ParallelParam(tp=1, pp=1, rank_size=1) self.is_bench = False def test_select_strategy_no_parallel(self): @@ -57,7 +57,7 @@ class TestGraphMerger(unittest.TestCase): class TestBaseGraphMerger(unittest.TestCase): def setUp(self): self.build_graph_results = [MagicMock(rank=i) for i in range(2)] - self.parallel_param = MagicMock(tp=1, pp=1, rank_size=2) + self.parallel_param = ParallelParam(tp=1, pp=1, rank_size=2) self.is_bench = False self.merger = BaseGraphMerger(self.build_graph_results, self.parallel_param, self.is_bench) @@ -166,7 +166,7 @@ class TestBaseGraphMerger(unittest.TestCase): merger = BaseGraphMerger(self.build_graph_results, self.parallel_param, self.is_bench) tp_groups, pp_groups = merger.get_default_groups() self.assertEqual(tp_groups, [[0, 1], [2, 3], [4, 5], [6, 7]]) - self.assertEqual(pp_groups, [[0, 2], [1, 3], [4, 6], [5, 7]]) + self.assertEqual(pp_groups, [[0, 4], [1, 5], [2, 6], [3, 7]]) self.parallel_param.tp = 2 self.parallel_param.pp = 3 @@ -179,7 +179,7 @@ class TestBaseGraphMerger(unittest.TestCase): class TestPPMerger(unittest.TestCase): def setUp(self): self.build_graph_results = [MagicMock(rank=i) for i in range(4)] - self.parallel_param = MagicMock(tp=1, pp=4, rank_size=4) + self.parallel_param = ParallelParam(tp=1, pp=4, rank_size=4) self.is_bench = False self.merger = PPMerger(self.build_graph_results, self.parallel_param, self.is_bench) @@ -281,7 +281,7 @@ class TestPPMerger(unittest.TestCase): class TestTPMerger(unittest.TestCase): def setUp(self): self.build_graph_results = [MagicMock(rank=i) for i in range(4)] - self.parallel_param = MagicMock(tp=4, pp=1, rank_size=4) + self.parallel_param = ParallelParam(tp=4, pp=1, rank_size=4) self.is_bench = False self.merger = TPMerger(self.build_graph_results, self.parallel_param, self.is_bench) @@ -343,7 +343,7 @@ class TestTPMerger(unittest.TestCase): class TestNoParallelMerger(unittest.TestCase): def setUp(self): self.build_graph_results = [MagicMock()] - self.parallel_param = MagicMock(tp=1, pp=1, rank_size=1) + self.parallel_param = ParallelParam(tp=1, pp=1, rank_size=1) self.is_bench = False self.merger = NoParallelMerger(self.build_graph_results, self.parallel_param, self.is_bench) @@ -357,7 +357,7 @@ class TestNoParallelMerger(unittest.TestCase): class TestTPPPMerger(unittest.TestCase): def setUp(self): self.build_graph_results = [MagicMock(rank=i) for i in range(4)] - self.parallel_param = MagicMock(tp=2, pp=2, rank_size=4) + self.parallel_param = ParallelParam(tp=2, pp=2, rank_size=4) self.is_bench = False self.merger = TPPPMerger(self.build_graph_results, self.parallel_param, self.is_bench) @@ -380,7 +380,7 @@ class TestTPPPMerger(unittest.TestCase): class TestFullMerger(unittest.TestCase): def setUp(self): self.build_graph_results = [MagicMock(rank=i) for i in range(8)] - self.parallel_param = MagicMock(tp=2, pp=4, rank_size=8, vpp=1) + self.parallel_param = ParallelParam(tp=2, pp=4, rank_size=8, vpp=1) self.is_bench = False self.merger = FullMerger(self.build_graph_results, self.parallel_param, self.is_bench) diff --git a/debug/accuracy_tools/msprobe/visualization/builder/graph_merger.py b/debug/accuracy_tools/msprobe/visualization/builder/graph_merger.py index ceff2dffb2388f725df312d153e435600b117e1d..f1a2a6ca32c94285f7584a72f8106ff3e81b3bdd 100644 --- a/debug/accuracy_tools/msprobe/visualization/builder/graph_merger.py +++ b/debug/accuracy_tools/msprobe/visualization/builder/graph_merger.py @@ -21,8 +21,8 @@ from msprobe.visualization.graph.graph import Graph, BaseNode from msprobe.visualization.graph.node_op import NodeOp from msprobe.core.common.log import logger from msprobe.visualization.utils import GraphConst -from msprobe.core.common.exceptions import MsprobeException from msprobe.core.common.decorator import recursion_depth_decorator +from msprobe.core.common.parallel_state import get_tp_pp_default_groups MAX_INFO = 'The Max value merging method for ' MIN_INFO = 'The Min value merging method for ' @@ -235,34 +235,8 @@ class BaseGraphMerger: tp_groups: 张量并行组列表,每个元素是一个包含组内rank的列表 pp_groups: 流水线并行组列表,每个元素是一个包含组内rank的列表 """ - rank_size = self.parallel_param.rank_size - tp_size = self.parallel_param.tp - pp_size = self.parallel_param.pp - - if rank_size % (tp_size * pp_size) != 0: - logger.error(f'{self.log_prefix} The parallel param "rank_size" must be divisible by "tp * pp"!') - raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) - dp_size = int(rank_size / tp_size / pp_size) - - # 存储并行组信息 - tp_groups = [] - pp_groups = [] - - # 创建张量并行组 - for dp_rank in range(dp_size): - for pp_rank in range(pp_size): - # 计算当前DP组和PP组组合下的TP组的第一个rank - base_rank = dp_rank * tp_size * pp_size + pp_rank * tp_size - group_ranks = [base_rank + tp_rank for tp_rank in range(tp_size)] - tp_groups.append(group_ranks) - - # 创建流水线并行组 - for dp_rank in range(dp_size): - for tp_rank in range(tp_size): - # 计算当前DP组和TP组组合下的PP组的第一个rank - base_rank = dp_rank * tp_size * pp_size + tp_rank - group_ranks = [base_rank + pp_rank * tp_size for pp_rank in range(pp_size)] - pp_groups.append(group_ranks) + tp_groups, pp_groups = get_tp_pp_default_groups(self.parallel_param.rank_size, self.parallel_param.tp, + self.parallel_param.pp, order=self.parallel_param.order) return tp_groups, pp_groups @@ -397,14 +371,11 @@ class PPMerger(BaseGraphMerger): break if pp_rank is not None: break - if pp_rank is None: - logger.warning(f'{self.log_prefix} Unable to get pp groups because ' - f'the batch_isend_irecv, send, or isend were not found.') - else: + if pp_rank is not None: p2p_mapping[rank] = pp_rank pp_groups = self._trace_p2p_mapping(p2p_mapping) if not pp_groups: - logger.info('Unable to get pp groups based on Distributed Api, ' + logger.info('Unable to get pp groups based on Distributed Api (batch_isend_irecv, send, or isend), ' 'generate pp groups using parallel param "rank_size", "tp" and "pp".') _, pp_groups = self.get_default_groups() logger.info(f'{self.log_prefix} All pp groups is {pp_groups}.') @@ -684,7 +655,7 @@ class TPMerger(BaseGraphMerger): tp_groups.append(group_ranks) break if not tp_groups: - logger.info('Unable to get tp groups based on Distributed Api, ' + logger.info('Unable to get tp groups based on Distributed Api (reduce_scatter or all_reduce), ' 'generate tp groups using parallel param "rank_size", "tp" and "pp".') tp_groups, _ = self.get_default_groups() logger.info(f'{self.log_prefix} All tp groups is {tp_groups}.') diff --git a/debug/accuracy_tools/msprobe/visualization/graph/distributed_analyzer.py b/debug/accuracy_tools/msprobe/visualization/graph/distributed_analyzer.py index a4b709a1ed1e57fd34330e403992d5fdb781c4f5..90ac8dcfed106018789297248c5555554716cbc0 100644 --- a/debug/accuracy_tools/msprobe/visualization/graph/distributed_analyzer.py +++ b/debug/accuracy_tools/msprobe/visualization/graph/distributed_analyzer.py @@ -82,7 +82,7 @@ class DistributedAnalyzer: """ target_rank = node.input_data.get(f'{node.id}{GraphConst.INPUT}{parameter}', {}).get('value') if target_rank is None: - logger.warning(f'The parameter {parameter} of node {node.id} does not exist, {CANNOT_MATCH}{rank}') + logger.debug(f'The parameter {parameter} of node {node.id} does not exist, {CANNOT_MATCH}{rank}') return target_rank @staticmethod @@ -95,15 +95,15 @@ class DistributedAnalyzer: """ group = node.input_data.get(f'{node.id}{GraphConst.INPUT}group', {}) if not group: - logger.warning(f'The kwarg group of node {node.id} does not exist, {CANNOT_MATCH}{rank}') + logger.debug(f'The kwarg group of node {node.id} does not exist, {CANNOT_MATCH}{rank}') return None, None group_ranks = group.get('group_ranks') if not group_ranks: - logger.warning(f'The group_ranks of node {node.id} does not exist, {CANNOT_MATCH}{rank}') + logger.debug(f'The group_ranks of node {node.id} does not exist, {CANNOT_MATCH}{rank}') return None, None group_id = group.get('group_id') if not group_id: - logger.warning(f'The group_id of node {node.id} does not exist, {CANNOT_MATCH}{rank}') + logger.debug(f'The group_id of node {node.id} does not exist, {CANNOT_MATCH}{rank}') return None, None return group_ranks, group_id @@ -183,7 +183,7 @@ class DistributedAnalyzer: op = info_dict.get(GraphConst.OP) target_rank = info_dict.get(GraphConst.PEER) if op is None or target_rank is None: - logger.warning('Cannot get param op or peer.') + logger.debug('Cannot get param op or peer.') continue group_id = op + Const.REPLACEMENT_CHARACTER + Const.RANK + str(target_rank) + \ Const.REPLACEMENT_CHARACTER + info_dict.get(GraphConst.GROUP_ID, '') @@ -215,7 +215,7 @@ class DistributedAnalyzer: """ target_graph = self.graphs.get(target_rank) if not target_graph: - logger.warning(f'Graph data does not exist, {CANNOT_MATCH}{target_rank}') + logger.debug(f'Graph data does not exist, {CANNOT_MATCH}{target_rank}') return None target_group_mapping = self.group_node_mapping.get(target_rank) # p2p通信,想要获取目标节点,需要替换unique_group_id中的rank和api name, @@ -226,7 +226,7 @@ class DistributedAnalyzer: target_node_id = target_group_mapping.get(target_unique_group_id, '') target_node = target_graph.node_map.get(target_node_id) if not target_node: - logger.warning(f'Node {target_node_id} does not exist, {CANNOT_MATCH}{target_rank}') + logger.debug(f'Node {target_node_id} does not exist, {CANNOT_MATCH}{target_rank}') return None return target_node @@ -276,13 +276,13 @@ class DistributedAnalyzer: source_rank = (target_node.input_data.get(f'{target_node.id}{GraphConst.INPUT}{target_config_info[1]}', {}) .get('value')) if source_rank is None: - logger.warning( + logger.debug( f'The kwarg {target_config_info[1]} of node {target_node.id} does not exist, ' f'{CANNOT_MATCH}{source_rank}') return if source_rank != rank: # 点对点通信,待匹配目标节点包含的rank信息要与当前rank一致 - logger.warning( + logger.debug( f'{node.id} of rank{rank} is expected to communicate with {target_node.id} of rank{target_rank}, ' f'but the data shows that {target_node.id} communicates with rank{source_rank}.' f'The rank is inconsistent, cannot match distributed node') @@ -291,7 +291,7 @@ class DistributedAnalyzer: # 点对点通信,两个匹配节点的输出数据要一致 if not DistributedAnalyzer._node_output_all_equal(node.output_data.get(node.id + '.output.0'), target_node.output_data.get(target_node.id + '.output.0')): - logger.warning(f'{node.id} output of rank{rank} is different from the {target_node.id} ' + logger.debug(f'{node.id} output of rank{rank} is different from the {target_node.id} ' f'output of rank{target_rank}, cannot match distributed node') return @@ -332,7 +332,7 @@ class DistributedAnalyzer: if not target_group_id: continue if group_id != target_group_id: - logger.warning( + logger.debug( f'{node.id} of rank{rank} is expected to communicate with {target_node.id} of rank{target_rank}' f', but the data shows that the group id of the two nodes are different, ' f'cannot match distributed node') @@ -368,7 +368,7 @@ class DistributedAnalyzer: target_api_name = self.config.get(api_name)[0] target_rank = int(id_info[1].replace(Const.RANK, '')) except Exception as e: - logger.warning(f'Failed to parse batch p2p parameter with error info: {e}.') + logger.debug(f'Failed to parse batch p2p parameter with error info: {e}.') continue target_node = self._get_target_node(rank, unique_group_id, api_name, target_rank, target_api_name) if not target_node: diff --git a/debug/accuracy_tools/msprobe/visualization/utils.py b/debug/accuracy_tools/msprobe/visualization/utils.py index 461d3f3f765c675639ef02337735dad20605ec35..a20244d936278b993266df63c491e72d5c8c83bc 100644 --- a/debug/accuracy_tools/msprobe/visualization/utils.py +++ b/debug/accuracy_tools/msprobe/visualization/utils.py @@ -129,9 +129,11 @@ def load_parallel_param(input_param): parallel_merge = input_param.get("parallel_merge", {}) config_n = parallel_merge.get('npu', {}) config_b = parallel_merge.get('bench', {}) - return (ParallelParam(config_n.get('rank_size'), config_n.get('tp'), config_n.get('pp')),) if not config_b else \ - (ParallelParam(config_n.get('rank_size'), config_n.get('tp'), config_n.get('pp'), config_n.get('vpp', 1)), - ParallelParam(config_b.get('rank_size'), config_b.get('tp'), config_b.get('pp'), config_b.get('vpp', 1))) + param_n = ParallelParam(config_n.get('rank_size'), config_n.get('tp'), config_n.get('pp'), config_n.get('vpp', 1), + config_n.get('order', 'tp-cp-ep-dp-pp')) + param_b = ParallelParam(config_b.get('rank_size'), config_b.get('tp'), config_b.get('pp'), config_b.get('vpp', 1), + config_b.get('order', 'tp-cp-ep-dp-pp')) + return (param_n,) if not config_b else (param_n, param_b) def validate_parallel_param(parallel_param, dump_path, log_prefix='[NPU]'): @@ -164,14 +166,19 @@ def validate_parallel_param(parallel_param, dump_path, log_prefix='[NPU]'): raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) if parallel_param.vpp > 1 and parallel_param.pp < 2: logger.error(f'{log_prefix} When configuring the parallel param "vpp", the "pp" param must be greater than 1!') + raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) + if not isinstance(parallel_param.order, str): + logger.error(f'{log_prefix} The parallel params "order" must be of string type!') + raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) class ParallelParam: - def __init__(self, rank_size, tp, pp, vpp=1): + def __init__(self, rank_size, tp, pp, vpp=1, order='tp-cp-ep-dp-pp'): self.rank_size = rank_size self.tp = tp self.pp = pp self.vpp = vpp + self.order = order class ToolTip: @@ -257,10 +264,10 @@ class GraphConst: OP = 'op' PEER = 'peer' GROUP_ID = 'group_id' - + UNCERTAINTY_THRESHOLD = 1e-6 REDUCE_OPERATIONS = ['reduce_scatter', 'all_reduce'] - + IGNORE_PRECISION_INDEX = {'empty', 'empty_like', 'empty_with_format', 'new_empty_strided', 'new_empty', 'empty_strided'} VPP_CHUNK_0 = '0'