From d95a22945fb314ac42da9d86be4605beb6076970 Mon Sep 17 00:00:00 2001 From: l30044004 Date: Mon, 21 Jul 2025 17:01:51 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]=E5=88=86=E7=BA=A7=E5=8F=AF=E8=A7=86?= =?UTF-8?q?=E5=8C=96=E6=94=AF=E6=8C=81vpp=E5=9B=BE=E5=90=88=E5=B9=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../msprobe/docs/21.visualization_PyTorch.md | 24 ++- .../docs/22.visualization_MindSpore.md | 24 ++- .../builder/test_graph_merger.py | 2 +- .../visualization/builder/graph_merger.py | 162 +++++++++++++++++- .../msprobe/visualization/utils.py | 12 +- 5 files changed, 205 insertions(+), 19 deletions(-) diff --git a/debug/accuracy_tools/msprobe/docs/21.visualization_PyTorch.md b/debug/accuracy_tools/msprobe/docs/21.visualization_PyTorch.md index 7cf43c0bd9..0e5b377f25 100644 --- a/debug/accuracy_tools/msprobe/docs/21.visualization_PyTorch.md +++ b/debug/accuracy_tools/msprobe/docs/21.visualization_PyTorch.md @@ -318,28 +318,42 @@ dump配置请参考[dump配置示例](./03.config_examples.md#16-task-配置为- #### 3.2.5 不同切分策略下的图合并 -适用场景:不同Tensor Parallelism(TP)、Pipeline Parallelism(PP)切分策略下,两个模型产生了精度差异,需要进行整网数据比对,但被切分的数据分布于多rank中,需要将分布在各个rank的数据合并后再进行比对。 +适用场景:不同模型并行切分策略下,两个模型产生了精度差异,需要进行整网数据比对,但被切分的数据或模型结构分布于多rank中无法进行比对,需要将分布在各个rank的数据或模型结构合并后再进行比对。 使用限制: +- 当前支持的模型并行切分策略:Tensor Parallelism(TP)、Pipeline Parallelism(PP)、Virtual Pipeline Parallelism(VPP),暂不支持Context Parallelism(CP)和Expert Parallelism(EP)。 - 当前支持基于Megatron、MindSpeed-LLM套件的模型进行图合并,其他套件的模型图合并效果有待验证; - 当前仅支持msprobe工具dump的statistics数据; -- 图合并比对时要确保DP切分一致,例如rank=8 tp=1 pp=8的配置,dp=1,图合并将得到一张图,rank=8 tp=1 pp=4的配置,dp=2,图合并将得到两张图,暂不支持数量不一致的图进行比对。 +- 图合并比对时要确保Data Parallelism(DP)切分一致,例如rank=8 tp=1 pp=8的配置,dp=1,图合并将得到一张图,rank=8 tp=1 pp=4的配置,dp=2,图合并将得到两张图,暂不支持数量不一致的图进行比对。 使能方式: -在compare.json里增加parallel_merge配置项, rank_size、tp、pp参数按实际情况进行配置; +在compare.json里增加parallel_merge配置项, rank_size、tp、pp和vpp参数按实际情况进行配置。 + +参数说明: + +所需tp、pp和vpp参数来自于Megatron、MindSpeed-LLM套件中的训练脚本实际配置。 + +| 参数名 | 说明 | 是否必填 | +|-----------|--------------------------------------------------------------------------------------------------------------------------|------| +| rank_size | 模型实际训练所用加速卡的数量,int类型。`rank_size=tp*pp*cp*dp`,由于暂不支持CP合并,图合并功能中默认cp=1。 | 是 | +| tp | 张量并行大小,int类型。实际训练脚本中需指定`--tensor-model-parallel-size T`,其中`T`表示张量模型并行大小,即**图合并所需的参数tp**, `tp=T`。 | 是 | +| pp | 流水线并行的阶段数,int类型。实际训练脚本中需指定`--pipeline-model-parallel-size P`,其中`P`表示流水线并行的阶段数,即**图合并所需的参数pp**, `pp=P`。 | 是 | +| vpp | 虚拟流水线并行阶段数,int类型。虚拟流水线并行依赖流水线并行,实际训练脚本中需指定`--num-layers-per-virtual-pipeline-stage V`,其中`V`表示每个虚拟流水线阶段的层数;指定`--num-layers L`,其中`L`表示模型总层数,**图合并所需的参数vpp**=`L/V/P`。vpp参数可以不配置,默认vpp=1代表未开启虚拟流水线并行。 | 否 | npu_path、bench_path的配置以及执行命令请参考[3.2.3 批量构建或比对](#323-批量构建或比对) +如果只进行图构建,"bench_path"和"parallel_merge"中的"bench"参数可不配置。 + ``` { "npu_path": "./npu_dump", - "bench_path": "./bench_dump", # 只进行图构建可不配置 + "bench_path": "./bench_dump", "is_print_compare_log": true, "parallel_merge": { "npu": {"rank_size": 8, "tp": 8, "pp": 1}, - "bench": {"rank_size": 8, "tp": 1, "pp": 8} # 只进行图构建可不配置 + "bench": {"rank_size": 8, "tp": 1, "pp": 8} } } ``` diff --git a/debug/accuracy_tools/msprobe/docs/22.visualization_MindSpore.md b/debug/accuracy_tools/msprobe/docs/22.visualization_MindSpore.md index 21343e15fc..eac7a81910 100644 --- a/debug/accuracy_tools/msprobe/docs/22.visualization_MindSpore.md +++ b/debug/accuracy_tools/msprobe/docs/22.visualization_MindSpore.md @@ -319,28 +319,42 @@ dump配置请参考[dump配置示例](./03.config_examples.md#35-task-配置为- #### 3.2.5 不同切分策略下的图合并 -适用场景:不同Tensor Parallelism(TP)、Pipeline Parallelism(PP)切分策略下,两个模型产生了精度差异,需要进行整网数据比对,但被切分的数据分布于多rank中,需要将分布在各个rank的数据合并后再进行比对。 +适用场景:不同模型并行切分策略下,两个模型产生了精度差异,需要进行整网数据比对,但被切分的数据或模型结构分布于多rank中无法进行比对,需要将分布在各个rank的数据或模型结构合并后再进行比对。 使用限制: +- 当前支持的模型并行切分策略:Tensor Parallelism(TP)、Pipeline Parallelism(PP)、Virtual Pipeline Parallelism(VPP),暂不支持Context Parallelism(CP)和Expert Parallelism(EP)。 - 当前支持基于Megatron、MindSpeed-LLM套件的模型进行图合并,其他套件的模型图合并效果有待验证; - 当前仅支持msprobe工具dump的statistics数据; -- 图合并比对时要确保DP切分一致,例如rank=8 tp=1 pp=8的配置,dp=1,图合并将得到一张图,rank=8 tp=1 pp=4的配置,dp=2,图合并将得到两张图,暂不支持数量不一致的图进行比对。 +- 图合并比对时要确保Data Parallelism(DP)切分一致,例如rank=8 tp=1 pp=8的配置,dp=1,图合并将得到一张图,rank=8 tp=1 pp=4的配置,dp=2,图合并将得到两张图,暂不支持数量不一致的图进行比对。 使能方式: -在compare.json里增加parallel_merge配置项, rank_size、tp、pp参数按实际情况进行配置; +在compare.json里增加parallel_merge配置项, rank_size、tp、pp和vpp参数按实际情况进行配置。 + +参数说明: + +所需tp、pp和vpp参数来自于Megatron、MindSpeed-LLM套件中的训练脚本实际配置。 + +| 参数名 | 说明 | 是否必填 | +|-----------|--------------------------------------------------------------------------------------------------------------------------|------| +| rank_size | 模型实际训练所用加速卡的数量,int类型。`rank_size=tp*pp*cp*dp`,由于暂不支持CP合并,图合并功能中默认cp=1。 | 是 | +| tp | 张量并行大小,int类型。实际训练脚本中需指定`--tensor-model-parallel-size T`,其中`T`表示张量模型并行大小,即**图合并所需的参数tp**, `tp=T`。 | 是 | +| pp | 流水线并行的阶段数,int类型。实际训练脚本中需指定`--pipeline-model-parallel-size P`,其中`P`表示流水线并行的阶段数,即**图合并所需的参数pp**, `pp=P`。 | 是 | +| vpp | 虚拟流水线并行阶段数,int类型。虚拟流水线并行依赖流水线并行,实际训练脚本中需指定`--num-layers-per-virtual-pipeline-stage V`,其中`V`表示每个虚拟流水线阶段的层数;指定`--num-layers L`,其中`L`表示模型总层数,**图合并所需的参数vpp**=`L/V/P`。vpp参数可以不配置,默认vpp=1代表未开启虚拟流水线并行。 | 否 | npu_path、bench_path的配置以及执行命令请参考[3.2.3 批量构建或比对](#323-批量构建或比对) +如果只进行图构建,"bench_path"和"parallel_merge"中的"bench"参数可不配置。 + ``` { "npu_path": "./npu_dump", - "bench_path": "./bench_dump", # 只进行图构建可不配置 + "bench_path": "./bench_dump", "is_print_compare_log": true, "parallel_merge": { "npu": {"rank_size": 8, "tp": 8, "pp": 1}, - "bench": {"rank_size": 8, "tp": 1, "pp": 8} # 只进行图构建可不配置 + "bench": {"rank_size": 8, "tp": 1, "pp": 8} } } ``` diff --git a/debug/accuracy_tools/msprobe/test/visualization_ut/builder/test_graph_merger.py b/debug/accuracy_tools/msprobe/test/visualization_ut/builder/test_graph_merger.py index 6ad0de9fcd..7c1033de54 100644 --- a/debug/accuracy_tools/msprobe/test/visualization_ut/builder/test_graph_merger.py +++ b/debug/accuracy_tools/msprobe/test/visualization_ut/builder/test_graph_merger.py @@ -380,7 +380,7 @@ class TestTPPPMerger(unittest.TestCase): class TestFullMerger(unittest.TestCase): def setUp(self): self.build_graph_results = [MagicMock(rank=i) for i in range(8)] - self.parallel_param = MagicMock(tp=2, pp=4, rank_size=8) + self.parallel_param = MagicMock(tp=2, pp=4, rank_size=8, vpp=1) self.is_bench = False self.merger = FullMerger(self.build_graph_results, self.parallel_param, self.is_bench) diff --git a/debug/accuracy_tools/msprobe/visualization/builder/graph_merger.py b/debug/accuracy_tools/msprobe/visualization/builder/graph_merger.py index 2fa74ffda5..ceff2dffb2 100644 --- a/debug/accuracy_tools/msprobe/visualization/builder/graph_merger.py +++ b/debug/accuracy_tools/msprobe/visualization/builder/graph_merger.py @@ -41,11 +41,11 @@ class GraphMerger: elif param.tp == param.rank_size: return TPMerger(results, param, is_bench) elif param.pp == param.rank_size: - return PPMerger(results, param, is_bench) + return PPMerger(results, param, is_bench) if param.vpp == 1 else VPPMerger(results, param, is_bench) elif param.pp == 1: return TPMerger(results, param, is_bench) elif param.tp == 1: - return PPMerger(results, param, is_bench) + return PPMerger(results, param, is_bench) if param.vpp == 1 else VPPMerger(results, param, is_bench) elif param.tp * param.pp == param.rank_size: return TPPPMerger(results, param, is_bench) else: @@ -804,7 +804,8 @@ class NoParallelMerger(BaseGraphMerger): class TPPPMerger(BaseGraphMerger): def merge_graphs(self): tp_merger = TPMerger(self.build_graph_results, self.parallel_param, self.is_bench) - pp_merger = PPMerger(self.build_graph_results, self.parallel_param, self.is_bench) + pp_merger = PPMerger(self.build_graph_results, self.parallel_param, self.is_bench) \ + if self.parallel_param.vpp == 1 else VPPMerger(self.build_graph_results, self.parallel_param, self.is_bench) pp_groups = pp_merger.get_groups() tp_groups = tp_merger.get_groups() # 进入TP+PP混合处理器,PP和TP必然大于1 @@ -826,7 +827,8 @@ class TPPPMerger(BaseGraphMerger): class FullMerger(BaseGraphMerger): def merge_graphs(self): tp_merger = TPMerger(self.build_graph_results, self.parallel_param, self.is_bench) - pp_merger = PPMerger(self.build_graph_results, self.parallel_param, self.is_bench) + pp_merger = PPMerger(self.build_graph_results, self.parallel_param, self.is_bench) \ + if self.parallel_param.vpp == 1 else VPPMerger(self.build_graph_results, self.parallel_param, self.is_bench) pp_groups = pp_merger.get_groups() tp_groups = tp_merger.get_groups() tp_merge_mapping = {} @@ -860,3 +862,155 @@ class FullMerger(BaseGraphMerger): self.sort_merged_api_collection(tp_merged_result[0].graph) tp_results.extend(tp_merged_result) return tp_results + + +class VPPMerger(PPMerger): + LAYERS_NUM_PATTERN = re.compile(r"(layers\.|layer\.)(\d+)(\.)") + FORWARD_PATTERN = re.compile(r'\.forward\.\d+$') + + @staticmethod + def _replace_vpp_id(s, vpp_id): + parts = s.split(Const.SEP) + if len(parts) < 2 or not parts[1].isdigit(): + return s + parts[1] = str(vpp_id) + return Const.SEP.join(parts) + + def merge_pp_graphs(self, results): + if not results or len(results) < 2: + return results + graphs = [x.graph for x in results] + main_graph_result = results[0] + for main_node in main_graph_result.graph.root.subnodes: + if main_node.op == NodeOp.module and main_node.id not in self.unmerged_module: + self._merge_nodes(main_graph_result.graph, main_node, graphs[1:]) + self._sort_nodes(main_graph_result.graph, main_node) + self._merge_vpp_data(main_graph_result.graph) + self._merge_vpp_chunks(main_graph_result.graph) + return [main_graph_result] + + def _merge_vpp_data(self, graph): + """ + 所有chunk的数据都合并到chunk0,前向chunk0的输出使用最后一个chunk的输出,反向chunk0的输入使用最后一个chunk的输入 + """ + module_list = [] + for node in reversed(graph.root.subnodes): + parts = node.id.split(Const.SEP) + if len(parts) < 2: + continue + if parts[1] in [GraphConst.VPP_CHUNK_0, str(self.parallel_param.vpp - 1)]: + module_list.append(node) + if not module_list: + return + stack = module_list[:] + while stack: + current_node = stack.pop() + if hasattr(current_node, 'is_pp_merged') or hasattr(current_node, + 'pp_index') or current_node.op != NodeOp.module: + continue + is_forward = self.FORWARD_PATTERN.search(current_node.id) + stack.extend(reversed(current_node.subnodes)) + target_id = self._replace_vpp_id(current_node.id, self.parallel_param.vpp - 1) + target_node = graph.node_map.get(target_id) + if not target_node: + continue + if is_forward: + current_node.output_data = self._update_node_data_key(target_node.id, current_node.id, + target_node.output_data) + else: + current_node.input_data = self._update_node_data_key(target_node.id, current_node.id, + target_node.input_data) + + def _merge_vpp_chunks(self, graph): + """ + 所有chunk都合并到chunk0,layers层搬到chunk0并重排序号 + """ + chunk_id_list = [i for i in range(1, self.parallel_param.vpp)] + chunk_0_list = [] + for node in reversed(graph.root.subnodes): + parts = node.id.split(Const.SEP) + if len(parts) < 2: + continue + if parts[1] == GraphConst.VPP_CHUNK_0: + chunk_0_list.append(node) + if not chunk_0_list: + return + stack = chunk_0_list[:] + layers_need_merge_dict = {} + while stack: + current_node = stack.pop() + if hasattr(current_node, 'is_pp_merged') or hasattr(current_node, 'pp_index') \ + and current_node.upnode.id not in layers_need_merge_dict: + layers_need_merge_dict[current_node.upnode.id] = current_node.upnode + continue + stack.extend(reversed(current_node.subnodes)) + for node in layers_need_merge_dict.values(): + is_forward = self.FORWARD_PATTERN.search(node.id) + for vpp_id in chunk_id_list: + target_node = graph.node_map.get(self._replace_vpp_id(node.id, vpp_id)) + if not target_node: + continue + # 其他chunk的layers都搬到chunk0,forward追加到后面,backward追加到前面 + if is_forward: + node.subnodes.extend(target_node.subnodes) + else: + node.subnodes = target_node.subnodes + node.subnodes + for sub_node in target_node.subnodes: + sub_node.upnode = node + # 获取其他chunk的层级链路,删除所有父节点,不在前端展示已合并的其他chunk节点 + ancestors = target_node.get_ancestors() + if len(ancestors) < 2: + continue + for module_id in ancestors[1:]: + graph.node_map.pop(module_id, None) + graph.root.subnodes = [node for node in graph.root.subnodes if node.id != ancestors[1]] + # layers层重排序号 + self._sort_layers(node.subnodes, graph, is_forward) + + def _sort_layers(self, node_list, graph, is_forward): + if not is_forward: + node_list = list(reversed(node_list)) + index = -1 + for node in node_list: + match = self.LAYERS_NUM_PATTERN.search(node.id) + if match: + index += 1 + parts = node.id.split(Const.SEP) + # Module.0.xxx代表第一个chunk,不必重排序 + if len(parts) < 2 or parts[1] == GraphConst.VPP_CHUNK_0: + continue + # layers层修改chunk号和layers序号,非layers层修改chunk号 + new_node_id_prefix = '' + if match: + prefix, number, dot = match.groups() + new_string = prefix + str(index) + dot + start, end = match.span() + new_node_id_prefix = node.id[:start] + new_string + new_node_id_prefix = self._replace_vpp_id(new_node_id_prefix, GraphConst.VPP_CHUNK_0) + new_node_id = new_node_id_prefix + node.id[end:] + else: + new_node_id = self._replace_vpp_id(node.id, GraphConst.VPP_CHUNK_0) + graph.node_map.pop(node.id, None) + node.input_data = self._update_node_data_key(node.id, new_node_id, node.input_data) + node.output_data = self._update_node_data_key(node.id, new_node_id, node.output_data) + node.id = new_node_id + graph.node_map[new_node_id] = node + stack = node.subnodes[:] + while stack: + current_node = stack.pop() + if current_node.op != NodeOp.module: + continue + stack.extend(reversed(current_node.subnodes)) + match = self.LAYERS_NUM_PATTERN.search(current_node.id) + if match: + _, e = match.span() + new_current_node_id = new_node_id_prefix + current_node.id[e:] + else: + new_current_node_id = self._replace_vpp_id(current_node.id, GraphConst.VPP_CHUNK_0) + current_node.input_data = self._update_node_data_key(current_node.id, new_current_node_id, + current_node.input_data) + current_node.output_data = self._update_node_data_key(current_node.id, new_current_node_id, + current_node.output_data) + graph.node_map.pop(current_node.id, None) + current_node.id = new_current_node_id + graph.node_map[new_current_node_id] = current_node diff --git a/debug/accuracy_tools/msprobe/visualization/utils.py b/debug/accuracy_tools/msprobe/visualization/utils.py index 52a5a52fc3..15c7052f79 100644 --- a/debug/accuracy_tools/msprobe/visualization/utils.py +++ b/debug/accuracy_tools/msprobe/visualization/utils.py @@ -130,8 +130,8 @@ def load_parallel_param(input_param): config_n = parallel_merge.get('npu', {}) config_b = parallel_merge.get('bench', {}) return (ParallelParam(config_n.get('rank_size'), config_n.get('tp'), config_n.get('pp')),) if not config_b else \ - (ParallelParam(config_n.get('rank_size'), config_n.get('tp'), config_n.get('pp')), - ParallelParam(config_b.get('rank_size'), config_b.get('tp'), config_b.get('pp'))) + (ParallelParam(config_n.get('rank_size'), config_n.get('tp'), config_n.get('pp'), config_n.get('vpp', 1)), + ParallelParam(config_b.get('rank_size'), config_b.get('tp'), config_b.get('pp'), config_n.get('vpp', 1))) def validate_parallel_param(parallel_param, dump_path, log_prefix='[NPU]'): @@ -145,7 +145,7 @@ def validate_parallel_param(parallel_param, dump_path, log_prefix='[NPU]'): logger.error(f'{log_prefix} The parallel params "tp/pp/rank_size" must not be null!') raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) if any(x <= 0 for x in params): - logger.error(f'{log_prefix} The parallel params "tp/pp/rank_size" must be greater than 0!') + logger.error(f'{log_prefix} The parallel params "tp/pp/vpp/rank_size" must be greater than 0!') raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) if parallel_param.tp > parallel_param.rank_size: logger.error(f'{log_prefix} The parallel param "tp" must be less than or equal to "rank_size"!') @@ -162,13 +162,16 @@ def validate_parallel_param(parallel_param, dump_path, log_prefix='[NPU]'): if parallel_param.tp * parallel_param.pp > parallel_param.rank_size: logger.error(f'{log_prefix} The parallel params "tp * pp" must be less than or equal to "rank_size"!') raise MsprobeException(MsprobeException.INVALID_PARAM_ERROR) + if parallel_param.vpp > 1 and parallel_param.pp < 2: + logger.error(f'{log_prefix} When configuring the parallel param "vpp", the "pp" param must be greater than 1!') class ParallelParam: - def __init__(self, rank_size, tp, pp): + def __init__(self, rank_size, tp, pp, vpp=1): self.rank_size = rank_size self.tp = tp self.pp = pp + self.vpp = vpp class ToolTip: @@ -260,6 +263,7 @@ class GraphConst: IGNORE_PRECISION_INDEX = {'empty', 'empty_like', 'empty_with_format', 'new_empty_strided', 'new_empty', 'empty_strided'} + VPP_CHUNK_0 = '0' def is_serializable(obj): -- Gitee