From 5c5f9a09bb73556955d8fcf5d10015e2b4c456a7 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Mon, 28 Aug 2023 21:44:52 +0800 Subject: [PATCH 1/8] communication matrix --- .../analysis/communication_analysis.py | 191 +++++++++++++++--- .../cluster_analyse/common_func/constant.py | 2 + 2 files changed, 162 insertions(+), 31 deletions(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index bc953c4d7f7..e69a37d2ef4 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -13,31 +13,32 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +from abc import abstractmethod + from common_func.constant import Constant from collections import defaultdict from common_func.file_manager import FileManager -class CommunicationAnalysis: - CLUSTER_COMMUNICATION_JSON = "cluster_communication.json" +class BaseCommAnalysis: def __init__(self, param: dict): self.collection_path = param.get(Constant.COLLECTION_PATH) self.data_map = param.get(Constant.DATA_MAP) self.collective_group_dict = param.get(Constant.COLLECTIVE_GROUP) - self.communication_ops = param.get(Constant.COMMUNICATION_OPS) self.comm_ops_struct = {} @staticmethod - def combine_size_distribution(op_dict: dict, total_dict: dict): - for size, size_info in op_dict.items(): - total_dict[size][0] += size_info[0] - total_dict[size][1] += size_info[1] + def compute_ratio(dividend: float, divisor: float): + if not divisor: + return 0 + else: + return round(dividend / divisor, 4) + @abstractmethod def run(self): - self.split_op_by_group() - self.combine_ops_total_info() - self.dump_data() + pass def dump_data(self): if not self.comm_ops_struct: @@ -66,14 +67,36 @@ class CommunicationAnalysis: for step_id, communication_ops in group_dict.items(): self.compute_total_info(communication_ops) + +class CommunicationAnalysis(BaseCommAnalysis): + SAVED_JSON = "cluster_communication.json" + + def __init__(self, param: dict): + super().__init__(param) + self.communication_ops = param.get(Constant.COMMUNICATION_OPS) + + @staticmethod + def combine_size_distribution(op_dict: dict, total_dict: dict): + for size, size_info in op_dict.items(): + total_dict[size][0] += size_info[0] + total_dict[size][1] += size_info[1] + + def run(self): + self.split_op_by_group() + self.combine_ops_total_info() + self.dump_data() + def compute_total_info(self, comm_ops: dict): if not comm_ops: return - total_rank_dict = {} + total_rank_dict = defaultdict(lambda: { + Constant.COMMUNICATION_TIME_INFO: defaultdict(float), + Constant.COMMUNICATION_BANDWIDTH_INFO: {} + }) for communication_op, rank_dict in comm_ops.items(): for rank_id, communication_op_info in rank_dict.items(): - total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_TIME_INFO, defaultdict(float)) - total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_BANDWIDTH_INFO, {}) + # total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_TIME_INFO, defaultdict(float)) + # total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_BANDWIDTH_INFO, {}) for com_info, com_info_dict in communication_op_info.items(): if com_info == Constant.COMMUNICATION_TIME_INFO: self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) @@ -107,24 +130,130 @@ class CommunicationAnalysis: self.combine_size_distribution(value, total_bandwidth_info_dict[transport_type][bandwidth_msg]) def compute_time_ratio(self, total_time_info_dict: dict): - if total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: - total_time_info_dict[Constant.WAIT_TIME_RATIO] = 0 - else: - total_time_info_dict[Constant.WAIT_TIME_RATIO] = \ - round(total_time_info_dict[Constant.WAIT_TIME_MS] / - (total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) - if total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: - total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = 0 - else: - total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = \ - round(total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] / - (total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + - total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) + total_time_info_dict[Constant.WAIT_TIME_RATIO] = \ + self.compute_ratio(total_time_info_dict.get(Constant.WAIT_TIME_MS, 0), + total_time_info_dict.get(Constant.WAIT_TIME_MS, 0) + + total_time_info_dict.get(Constant.TRANSIT_TIME_MS, 0)) + total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = \ + self.compute_ratio(total_time_info_dict.get(Constant.SYNCHRONIZATION_TIME_MS), + total_time_info_dict.get(Constant.SYNCHRONIZATION_TIME_MS) + + total_time_info_dict.get(Constant.TRANSIT_TIME_MS)) def compute_bandwidth_ratio(self, total_bandwidth_info_dict: dict): for transport_type, bandwidth_dict in total_bandwidth_info_dict.items(): - if bandwidth_dict[Constant.TRANSIT_TIME_MS] == 0: - bandwidth_dict[Constant.BANDWIDTH_GB_S] = 0 - else: - bandwidth_dict[Constant.BANDWIDTH_GB_S] = \ - round(bandwidth_dict[Constant.TRANSIT_SIZE_MB] / bandwidth_dict[Constant.TRANSIT_TIME_MS], 4) + bandwidth_dict[Constant.BANDWIDTH_GB_S] = \ + self.compute_ratio(bandwidth_dict.get(Constant.TRANSIT_SIZE_MB, 0) / + bandwidth_dict.get(Constant.TRANSIT_TIME_MS, 0)) + + +class CommMatrixAnalysis(BaseCommAnalysis): + SAVED_JSON = "cluster_communication_matrix.json" + + def __init__(self, param: dict): + super().__init__(param) + self.communication_ops = {} + + @staticmethod + def combine_link(link_info_dict: dict, single_link_dict: dict): + link_info_dict[Constant.TRANSPORT_TYPE] = single_link_dict.get(Constant.TRANSPORT_TYPE) + link_info_dict[Constant.TRANSIT_TIME_MS] += single_link_dict.get(Constant.TRANSIT_TIME_MS) + link_info_dict[Constant.TRANSIT_SIZE_MB] += single_link_dict.get(Constant.TRANSIT_SIZE_MB) + + def run(self): + self.load_communication_matrix_data() + self.split_op_by_group() + self.combine_ops_total_info() + self.dump_data() + + def load_communication_matrix_data(self): + rank_comm_dict = {} + for rank_id, profiling_dir_path in self.data_map.items(): + comm_dir = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.COMM_MATRIX_JSON) + rank_comm_dict[rank_id] = FileManager.read_json_file(comm_dir) + if not rank_comm_dict.get(rank_id): + print(f"rank {rank_id} does not have a valid communication_matrix.json.") + self.construct_matrix_data(rank_comm_dict) + + def construct_matrix_data(self, rank_comm_dict: dict): + for rank_id, rank_id_dict in rank_comm_dict.items(): + for step_id, step_id_dict in rank_id_dict.items(): + self.add_comm_ops(rank_id, step_id, step_id_dict) + + def add_comm_ops(self, rank_id: int, step_id: int, step_id_dict: dict): + for comm_op_type, comm_dict in step_id_dict.items(): + if comm_op_type != Constant.COLLECTIVE or comm_op_type != Constant.P2P: + print(f"Unknown communication opertors type!") + continue + for op_name, op_link_info in comm_dict.items(): + if op_name.startswith('Total'): + continue + group_name = op_name.split('@')[-1] + self.communication_ops.append({ + Constant.RANK_ID: rank_id, + Constant.STEP_ID: step_id, + Constant.COMM_OP_TYPE: comm_op_type, + Constant.COMM_OP_NAME: op_name, + Constant.GROUP_NAME: group_name, + Constant.COMM_OP_INFO: op_link_info + }) + + def compute_total_info(self, step_dict: dict): + self.merge_same_links(step_dict) + self.combine_link_info(step_dict) + + def merge_same_links(self, step_dict: dict): + def process_link_key(): + for link_key in rank_dict: + if '-' not in link_key: + print(f"{op_name} has an invalid link key {link_key}!") + break + src_rank = link_key.split('-')[0] + dst_rank = link_key.split('-')[1] + if src_rank == dst_rank: + if src_rank not in project_local_global_rank_map: + project_local_global_rank_map[src_rank] = rank_id + else: + print(f"In the same communication group, local ranks projecting to global ranks repeat!") + else: + self.combine_link(link_info[link_key], rank_dict[link_key]) + + def convert_local_to_global_rank(): + tmp_link = {} + for link_key, link_dict in link_info.items(): + src_rank = link_key.split('-')[0] + dst_rank = link_key.split('-')[1] + src_rank = project_local_global_rank_map[src_rank] \ + if src_rank in project_local_global_rank_map else src_rank + dst_rank = project_local_global_rank_map[dst_rank] \ + if dst_rank in project_local_global_rank_map else dst_rank + link_dict[Constant.BANDWIDTH_GB_S] = \ + self.compute_ratio(link_dict.get(Constant.TRANSIT_SIZE_MB, 0), + link_dict.get(Constant.TRANSIT_TIME_MS, 0)) + tmp_link[f"{src_rank}-{dst_rank}"] = link_dict + return tmp_link + + project_local_global_rank_map = dict() + for op_name, op_dict in step_dict.items(): + link_info = defaultdict(lambda: { + self.TRANSPORT_TYPE: '', + self.TRANSIT_TIME_MS: 0, + self.TRANSIT_SIZE_MB: 0 + }) + for rank_id, rank_dict in op_dict.items(): + process_link_key() + step_dict[op_name] = convert_local_to_global_rank() + + def combine_link_info(self, step_dict: dict): + total_op_info = defaultdict(lambda: { + self.TRANSPORT_TYPE: '', + self.TRANSIT_TIME_MS: 0, + self.TRANSIT_SIZE_MB: 0 + }) + for op_name, op_dict in step_dict.items(): + for link_key, link_dict in op_dict.items(): + self.combine_link(total_op_info[link_key], link_dict) + for link_key, link_dict in total_op_info.items(): + link_dict[Constant.BANDWIDTH_GB_S] = \ + self.compute_ratio(link_dict.get(Constant.TRANSIT_SIZE_MB, 0), + link_dict.get(Constant.TRANSIT_TIME_MS, 0)) + diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 84d4a9e0545..8910099c7f8 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -20,6 +20,7 @@ class Constant(object): CLUSTER_ANALYSIS_OUTPUT = "cluster_analysis_output" SINGLE_OUTPUT = "ASCEND_PROFILER_OUTPUT" COMM_JSON = "communication.json" + COMM_MATRIX_JSON = "communication_matrix.json" STEP_TIME_CSV = "step_trace_time.csv" # file authority @@ -60,6 +61,7 @@ class Constant(object): COMMUNICATION_OPS = "communication_ops" COLLECTION_PATH = "collection_path" COMMUNICATION_GROUP = "communication_group" + TRANSPORT_TYPE = "Transport Type" # step time RANK = 'rank' -- Gitee From 18db65a9ad30e98986eaf273b82cb0371e11da1f Mon Sep 17 00:00:00 2001 From: sunboquan Date: Wed, 30 Aug 2023 16:39:41 +0800 Subject: [PATCH 2/8] communication matrix --- profiler/cluster_analyse/analysis/analysis_facade.py | 3 ++- profiler/cluster_analyse/analysis/communication_analysis.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/profiler/cluster_analyse/analysis/analysis_facade.py b/profiler/cluster_analyse/analysis/analysis_facade.py index ff352c94e25..ec0ddd36754 100644 --- a/profiler/cluster_analyse/analysis/analysis_facade.py +++ b/profiler/cluster_analyse/analysis/analysis_facade.py @@ -15,10 +15,11 @@ from analysis.communication_analysis import CommunicationAnalysis from analysis.step_trace_time_analysis import StepTraceTimeAnalysis +from analysis.communication_analysis import CommMatrixAnalysis class AnalysisFacade: - analysis_module = {CommunicationAnalysis, StepTraceTimeAnalysis} + analysis_module = {CommunicationAnalysis, StepTraceTimeAnalysis, CommMatrixAnalysis} def __init__(self, param: dict): self.param = param diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index e69a37d2ef4..b53a76ad1f0 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -47,7 +47,7 @@ class BaseCommAnalysis: output_comm_data = {} for key in self.comm_ops_struct: output_comm_data[str(key)] = self.comm_ops_struct.get(key) - FileManager.create_json_file(self.collection_path, output_comm_data, self.CLUSTER_COMMUNICATION_JSON) + FileManager.create_json_file(self.collection_path, output_comm_data, self.SAVED_JSON) def split_op_by_group(self): for single_op in self.communication_ops: @@ -142,7 +142,7 @@ class CommunicationAnalysis(BaseCommAnalysis): def compute_bandwidth_ratio(self, total_bandwidth_info_dict: dict): for transport_type, bandwidth_dict in total_bandwidth_info_dict.items(): bandwidth_dict[Constant.BANDWIDTH_GB_S] = \ - self.compute_ratio(bandwidth_dict.get(Constant.TRANSIT_SIZE_MB, 0) / + self.compute_ratio(bandwidth_dict.get(Constant.TRANSIT_SIZE_MB, 0), bandwidth_dict.get(Constant.TRANSIT_TIME_MS, 0)) -- Gitee From 9bb4dbec44d8ed03c571bed7e485b80faab273a3 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Wed, 30 Aug 2023 16:43:39 +0800 Subject: [PATCH 3/8] communication matrix --- profiler/cluster_analyse/analysis/communication_analysis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index b53a76ad1f0..a750d1e4c3f 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -181,7 +181,7 @@ class CommMatrixAnalysis(BaseCommAnalysis): def add_comm_ops(self, rank_id: int, step_id: int, step_id_dict: dict): for comm_op_type, comm_dict in step_id_dict.items(): - if comm_op_type != Constant.COLLECTIVE or comm_op_type != Constant.P2P: + if comm_op_type != Constant.COLLECTIVE and comm_op_type != Constant.P2P: print(f"Unknown communication opertors type!") continue for op_name, op_link_info in comm_dict.items(): -- Gitee From 85ea2775a06d4b7b8869d71a16b5f7687e8923a3 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Wed, 30 Aug 2023 16:47:59 +0800 Subject: [PATCH 4/8] communication matrix --- .../analysis/communication_analysis.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index a750d1e4c3f..14e6a769180 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -151,7 +151,7 @@ class CommMatrixAnalysis(BaseCommAnalysis): def __init__(self, param: dict): super().__init__(param) - self.communication_ops = {} + self.communication_ops = [] @staticmethod def combine_link(link_info_dict: dict, single_link_dict: dict): @@ -235,9 +235,9 @@ class CommMatrixAnalysis(BaseCommAnalysis): project_local_global_rank_map = dict() for op_name, op_dict in step_dict.items(): link_info = defaultdict(lambda: { - self.TRANSPORT_TYPE: '', - self.TRANSIT_TIME_MS: 0, - self.TRANSIT_SIZE_MB: 0 + Constant.TRANSPORT_TYPE: '', + Constant.TRANSIT_TIME_MS: 0, + Constant.TRANSIT_SIZE_MB: 0 }) for rank_id, rank_dict in op_dict.items(): process_link_key() @@ -245,9 +245,9 @@ class CommMatrixAnalysis(BaseCommAnalysis): def combine_link_info(self, step_dict: dict): total_op_info = defaultdict(lambda: { - self.TRANSPORT_TYPE: '', - self.TRANSIT_TIME_MS: 0, - self.TRANSIT_SIZE_MB: 0 + Constant.TRANSPORT_TYPE: '', + Constant.TRANSIT_TIME_MS: 0, + Constant.TRANSIT_SIZE_MB: 0 }) for op_name, op_dict in step_dict.items(): for link_key, link_dict in op_dict.items(): -- Gitee From 852887ab0e9815ef41701b2894347df23137c746 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Wed, 30 Aug 2023 16:53:58 +0800 Subject: [PATCH 5/8] communication matrix --- profiler/cluster_analyse/analysis/communication_analysis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 14e6a769180..0bf0933dd55 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -212,7 +212,7 @@ class CommMatrixAnalysis(BaseCommAnalysis): if src_rank == dst_rank: if src_rank not in project_local_global_rank_map: project_local_global_rank_map[src_rank] = rank_id - else: + elif project_local_global_rank_map.get(src_rank) != rank_id: print(f"In the same communication group, local ranks projecting to global ranks repeat!") else: self.combine_link(link_info[link_key], rank_dict[link_key]) -- Gitee From de3dac5050bac80fbf6f94c06ace641c1f37b71a Mon Sep 17 00:00:00 2001 From: sunboquan Date: Thu, 31 Aug 2023 17:46:35 +0800 Subject: [PATCH 6/8] communication matrix --- .../analysis/communication_analysis.py | 3 +- .../test_pytorch_data_preprocessor.py | 46 +++++++++++++++++++ .../test_communication_group_generator.py | 24 ++++++++++ 3 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py create mode 100644 profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 0bf0933dd55..a073c4ac9f2 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -214,8 +214,7 @@ class CommMatrixAnalysis(BaseCommAnalysis): project_local_global_rank_map[src_rank] = rank_id elif project_local_global_rank_map.get(src_rank) != rank_id: print(f"In the same communication group, local ranks projecting to global ranks repeat!") - else: - self.combine_link(link_info[link_key], rank_dict[link_key]) + self.combine_link(link_info[link_key], rank_dict[link_key]) def convert_local_to_global_rank(): tmp_link = {} diff --git a/profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py b/profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py new file mode 100644 index 00000000000..190534824ec --- /dev/null +++ b/profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py @@ -0,0 +1,46 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import shutil +import unittest +from unittest import mock + +from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor + + +class TestPytorchDataPreprocessor(unittest.TestCase): + DIR_PATH = os.path.join(os.path.dirname(__file__), 'DT_CLUSTER_PREPROCESS') + + def setUp(self) -> None: + shutil.rmtree(self.DIR_PATH) + os.makedirs(os.path.join(self.DIR_PATH, 'worker1_ascend_pt')) + os.mknod(os.path.join(self.DIR_PATH, 'worker1_ascend_pt', 'profiler_info_1.json')) + os.makedirs(os.path.join(self.DIR_PATH, 'worker2_ascend_pt')) + os.mknod(os.path.join(self.DIR_PATH, 'worker2_ascend_pt', 'profiler_info_2.json')) + os.makedirs(os.path.join(self.DIR_PATH, 'single_worker_ascend_pt')) + os.mknod(os.path.join(self.DIR_PATH, 'single_worker_ascend_pt', 'profiler_info.json')) + + def tearDown(self) -> None: + shutil.rmtree(self.DIR_PATH) + + def test_get_data_map(self): + PytorchDataPreprocessor(self.DIR_PATH) + + def test_get_rank_id_cluster(self): + check = PytorchDataPreprocessor(self.DIR_PATH) + ret = check.get_rank_id('worker1_ascend_pt') + self.assertEqual(ret, 1) diff --git a/profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py b/profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py new file mode 100644 index 00000000000..742c171c953 --- /dev/null +++ b/profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py @@ -0,0 +1,24 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import unittest +from unittest import mock + + +class TestCommunicationGroupGenerator(unittest.TestCase): + DIR_PATH = '' + def test_load_communication_json -- Gitee From 7397392a75114c04f77a539d90318a8f0503663e Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 5 Sep 2023 14:44:00 +0800 Subject: [PATCH 7/8] ut test --- .../communication_group_generator.py | 4 +- .../test_pytorch_data_preprocessor.py | 35 ++++++-- .../analysis/test_step_trace_time_analysis.py | 82 +++++++++++++++++ .../test_communication_group_generator.py | 88 ++++++++++++++++++- 4 files changed, 198 insertions(+), 11 deletions(-) create mode 100644 profiler/cluster_analyse/test/ut/testcase/communication_group/analysis/test_step_trace_time_analysis.py diff --git a/profiler/cluster_analyse/communication_group/communication_group_generator.py b/profiler/cluster_analyse/communication_group/communication_group_generator.py index 6dabcac3b64..64ada8befee 100644 --- a/profiler/cluster_analyse/communication_group/communication_group_generator.py +++ b/profiler/cluster_analyse/communication_group/communication_group_generator.py @@ -83,7 +83,7 @@ class CommunicationGroupGenerator: stage_group[min(rank_set)] = rank_set first_rank_sort_list = sorted([first_rank for first_rank in stage_group]) self.communication_group[Constant.P2P] = \ - [list(stage_group.get(first_rank, {})) for first_rank in first_rank_sort_list] + [list(stage_group.get(first_rank, set())) for first_rank in first_rank_sort_list] def get_collective_ops_name(self, rank_id: int, comm_op_dict: dict): for comm_op in comm_op_dict: @@ -111,7 +111,7 @@ class UnionFind(object): """Disjoint Set Union""" @classmethod def union(cls, p: set, q: set, o: set): - """make p and q the same set""" + """union p,q and o as the same set""" return p | q | o @classmethod diff --git a/profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py b/profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py index 190534824ec..c6ae8a23571 100644 --- a/profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py @@ -26,13 +26,18 @@ class TestPytorchDataPreprocessor(unittest.TestCase): DIR_PATH = os.path.join(os.path.dirname(__file__), 'DT_CLUSTER_PREPROCESS') def setUp(self) -> None: - shutil.rmtree(self.DIR_PATH) - os.makedirs(os.path.join(self.DIR_PATH, 'worker1_ascend_pt')) - os.mknod(os.path.join(self.DIR_PATH, 'worker1_ascend_pt', 'profiler_info_1.json')) - os.makedirs(os.path.join(self.DIR_PATH, 'worker2_ascend_pt')) - os.mknod(os.path.join(self.DIR_PATH, 'worker2_ascend_pt', 'profiler_info_2.json')) - os.makedirs(os.path.join(self.DIR_PATH, 'single_worker_ascend_pt')) - os.mknod(os.path.join(self.DIR_PATH, 'single_worker_ascend_pt', 'profiler_info.json')) + if os.path.exists(self.DIR_PATH): + shutil.rmtree(self.DIR_PATH) + os.makedirs(os.path.join(self.DIR_PATH, 'worker1_11111111_ascend_pt')) + open(os.path.join(self.DIR_PATH, 'worker1_11111111_ascend_pt', 'profiler_info_1.json'), 'w') + os.makedirs(os.path.join(self.DIR_PATH, 'worker2_11111112_ascend_pt')) + open(os.path.join(self.DIR_PATH, 'worker2_11111112_ascend_pt', 'profiler_info_2.json'), 'w') + os.makedirs(os.path.join(self.DIR_PATH, 'single_worker_11111111_ascend_pt')) + open(os.path.join(self.DIR_PATH, 'single_worker_11111111_ascend_pt', 'profiler_info.json'), 'w') + os.makedirs(os.path.join(self.DIR_PATH, 'worker1_11111112_ascend_pt')) + open(os.path.join(self.DIR_PATH, 'worker1_11111112_ascend_pt', 'profiler_info_1.json'), 'w') + os.makedirs(os.path.join(self.DIR_PATH, 'worker2_11111113_ascend_pt')) + open(os.path.join(self.DIR_PATH, 'worker2_11111113_ascend_pt', 'profiler_info_2.json'), 'w') def tearDown(self) -> None: shutil.rmtree(self.DIR_PATH) @@ -42,5 +47,19 @@ class TestPytorchDataPreprocessor(unittest.TestCase): def test_get_rank_id_cluster(self): check = PytorchDataPreprocessor(self.DIR_PATH) - ret = check.get_rank_id('worker1_ascend_pt') + ret = check.get_rank_id('worker1_11111111_ascend_pt') self.assertEqual(ret, 1) + + def test_get_rank_id_single(self): + check = PytorchDataPreprocessor(self.DIR_PATH) + ret = check.get_rank_id('single_worker_11111111_ascend_pt') + self.assertEqual(ret, -1) + + def test_get_data_map(self): + check = PytorchDataPreprocessor(self.DIR_PATH) + with mock.patch("common_func.file_manager.FileManager.check_file_or_directory_path", return_value=True): + ret = check.get_data_map() + self.assertIn(1, ret.keys()) + self.assertIn(2, ret.keys()) + self.assertIn(os.path.join(self.DIR_PATH, 'worker1_11111111_ascend_pt'), ret.values()) + self.assertIn(os.path.join(self.DIR_PATH, 'worker2_11111112_ascend_pt'), ret.values()) diff --git a/profiler/cluster_analyse/test/ut/testcase/communication_group/analysis/test_step_trace_time_analysis.py b/profiler/cluster_analyse/test/ut/testcase/communication_group/analysis/test_step_trace_time_analysis.py new file mode 100644 index 00000000000..f5dda57d0e8 --- /dev/null +++ b/profiler/cluster_analyse/test/ut/testcase/communication_group/analysis/test_step_trace_time_analysis.py @@ -0,0 +1,82 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import unittest +from unittest import mock + +from analysis.step_trace_time_analysis import StepTraceTimeAnalysis +from prof_bean.step_trace_time_bean import StepTraceTimeBean +from common_func.constant import Constant + + +class TestStepTraceTimeAnalysis(unittest.TestCase): + DIR_PATH = '' + + def test_get_max_data_row(self): + check = StepTraceTimeAnalysis({}) + ls = [ + [1, 3, 5, 7, 10], + [2, 4, 6, 8, 11], + [1000, -1, -1, -1, -1] + ] + ret = check.get_max_data_row(ls) + self.assertEqual([1000, 4, 6, 8, 11], ret) + + def test_get_max_data_row_single_ls(self): + check = StepTraceTimeAnalysis({}) + ls = [ + [1, 3, 5, 7, 10] + ] + ret = check.get_max_data_row(ls) + self.assertEqual([1, 3, 5, 7, 10], ret) + + def test_analyze_step_time_normal(self): + check = StepTraceTimeAnalysis({}) + check.step_time_dict = { + 0: [ + StepTraceTimeBean({"Step": 0, "time1": 1, "time2": 2}), + StepTraceTimeBean({"Step": 1, "time1": 1, "time2": 2}), + ], + 1: [ + StepTraceTimeBean({"Step": 0, "time1": 10, "time2": 20}), + StepTraceTimeBean({"Step": 1, "time1": 10, "time2": 20}) + ] + } + check.communication_group = {Constant.P2P: [[0, 1]]} + check.analyze_step_time() + self.assertIn([0, 'stage', (0, 1), 10.0, 20.0], check.step_data_list) + + def test_analyze_step_time_normal_none_step(self): + check = StepTraceTimeAnalysis({}) + check.step_time_dict = { + 0: [ + StepTraceTimeBean({"Step": None, "time1": 1, "time2": 2}) + ], + 1: [ + StepTraceTimeBean({"Step": None, "time1": 10, "time2": 20}), + ], + 2: [ + StepTraceTimeBean({"Step": None, "time1": 2, "time2": 3}), + ], + 3: [ + StepTraceTimeBean({"Step": None, "time1": 1, "time2": 1}), + ], + } + check.communication_group = {Constant.P2P: [[0, 1], [2, 3]]} + check.analyze_step_time() + self.assertIn([None, 'stage', (2, 3), 2.0, 3.0], check.step_data_list) + self.assertIn([None, 'rank', 0, 1.0, 2.0], check.step_data_list) diff --git a/profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py b/profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py index 742c171c953..f64daa05398 100644 --- a/profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py +++ b/profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py @@ -18,7 +18,93 @@ import os import unittest from unittest import mock +from communication_group.communication_group_generator import CommunicationGroupGenerator +from common_func.constant import Constant + class TestCommunicationGroupGenerator(unittest.TestCase): DIR_PATH = '' - def test_load_communication_json + + def test_generate_p2p_communication_group_1p(self): + check = CommunicationGroupGenerator('', {}) + check.collective_group_dict = { + 'group1': {0} + } + with mock.patch("common_func.file_manager.FileManager.check_file_or_directory_path", return_value=True): + check.generate_p2p_communication_group() + ret = {0} + self.assertEqual(ret, set(check.communication_group[Constant.P2P][0])) + + def test_generate_p2p_communication_group_8p(self): + check = CommunicationGroupGenerator('', {}) + check.collective_group_dict = { + 'group1': {1, 2, 3, 4}, + 'group2': {5, 6, 7, 8}, + } + with mock.patch("common_func.file_manager.FileManager.check_file_or_directory_path", return_value=True): + check.generate_p2p_communication_group() + ret_a = {1, 2, 3, 4} + ret_b = {5, 6, 7, 8} + self.assertEqual(ret_a, set(check.communication_group[Constant.P2P][0])) + self.assertEqual(ret_b, set(check.communication_group[Constant.P2P][1])) + + def test_generate_p2p_communication_group_16p(self): + check = CommunicationGroupGenerator('', {}) + check.collective_group_dict = { + 'group1': {0, 1}, + 'group2': {0, 2}, + 'group3': {2, 3}, + 'group4': {3, 1}, + 'group5': {4, 5}, + 'group6': {4, 6}, + 'group7': {5, 7}, + 'group8': {6, 7}, + 'group9': {8, 9}, + 'group10': {8, 10}, + 'group11': {11, 10}, + 'group12': {11, 9}, + 'group13': {12, 13}, + 'group14': {12, 14}, + 'group15': {15, 13}, + 'group16': {15, 14} + } + with mock.patch("common_func.file_manager.FileManager.check_file_or_directory_path", return_value=True): + check.generate_p2p_communication_group() + ret_a = {0, 1, 2, 3} + ret_b = {4, 5, 6, 7} + ret_c = {8, 9, 10, 11} + ret_d = {12, 13, 14, 15} + self.assertEqual(ret_a, set(check.communication_group[Constant.P2P][0])) + self.assertEqual(ret_b, set(check.communication_group[Constant.P2P][1])) + self.assertEqual(ret_c, set(check.communication_group[Constant.P2P][2])) + self.assertEqual(ret_d, set(check.communication_group[Constant.P2P][3])) + print(check.communication_group[Constant.P2P]) + + def test_generate_p2p_communication_group_repeat_group(self): + check = CommunicationGroupGenerator('', {}) + check.collective_group_dict = { + 'group1': {0, 1, 2, 3}, + 'group2': {0, 1, 2, 3}, + 'group3': {0, 1, 2, 3}, + 'group4': {0, 1, 2, 3}, + 'group5': {3, 2, 4, 5}, + 'group6': {4, 5, 6, 7}, + 'group7': {4, 5, 6, 7}, + 'group8': {4, 5, 6, 7}, + 'group9': {8, 9, 11, 10}, + 'group10': {8, 9, 11, 10}, + 'group11': {11, 10, 12, 13}, + 'group12': {11, 10, 12, 13}, + 'group13': {11, 10, 12, 13}, + 'group14': {12, 13, 14, 15}, + 'group15': {12, 13, 14, 15}, + 'group16': {12, 13, 14, 15} + } + with mock.patch("common_func.file_manager.FileManager.check_file_or_directory_path", return_value=True): + check.generate_p2p_communication_group() + ret_a = {0, 1, 2, 3, 4, 5, 6, 7} + ret_b = {8, 9, 10, 11, 12, 13, 14, 15} + self.assertEqual(ret_a, set(check.communication_group[Constant.P2P][0])) + self.assertEqual(ret_b, set(check.communication_group[Constant.P2P][1])) + print(check.communication_group[Constant.P2P]) + -- Gitee From 34fc0ef076fc075046777fcbc07cc75c90f924b3 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Thu, 21 Dec 2023 21:22:50 +0800 Subject: [PATCH 8/8] ut --- .../test/ut/testcase/test_cluster_analysis.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 profiler/cluster_analyse/test/ut/testcase/test_cluster_analysis.py diff --git a/profiler/cluster_analyse/test/ut/testcase/test_cluster_analysis.py b/profiler/cluster_analyse/test/ut/testcase/test_cluster_analysis.py new file mode 100644 index 00000000000..db071d824e1 --- /dev/null +++ b/profiler/cluster_analyse/test/ut/testcase/test_cluster_analysis.py @@ -0,0 +1,25 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import unittest +from unittest import mock + +from communication_group.communication_group_generator import CommunicationGroupGenerator +from common_func.constant import Constant + + +class TestInterface(unittest.TestCase): \ No newline at end of file -- Gitee