diff --git a/profiler/cluster_analyse/communication_group/communication_group_generator.py b/profiler/cluster_analyse/communication_group/communication_group_generator.py index bab983de5b3bc63b7fb3ddc13d6340ea1a6fa593..0ce7746654b1cff468911ac91234c06910043401 100644 --- a/profiler/cluster_analyse/communication_group/communication_group_generator.py +++ b/profiler/cluster_analyse/communication_group/communication_group_generator.py @@ -122,7 +122,7 @@ class CommunicationGroupGenerator: stage_group[min(rank_set)] = rank_set first_rank_sort_list = sorted([first_rank for first_rank in stage_group]) self.communication_group[Constant.P2P] = \ - [list(stage_group.get(first_rank, {})) for first_rank in first_rank_sort_list] + [list(stage_group.get(first_rank, set())) for first_rank in first_rank_sort_list] def set_p2p_groups(self): self.p2p_link = sorted(self.p2p_link, key=lambda x: min(x)) @@ -203,7 +203,7 @@ class UnionFind(object): """Disjoint Set Union""" @classmethod def union(cls, p: set, q: set, o: set): - """make p and q the same set""" + """union p,q and o as the same set""" return p | q | o @classmethod diff --git a/profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py b/profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py new file mode 100644 index 0000000000000000000000000000000000000000..c6ae8a235710532d022c5486643d0a5ed7b62749 --- /dev/null +++ b/profiler/cluster_analyse/test/ut/testcase/cluster_data_preprocess/test_pytorch_data_preprocessor.py @@ -0,0 +1,65 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import shutil +import unittest +from unittest import mock + +from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor + + +class TestPytorchDataPreprocessor(unittest.TestCase): + DIR_PATH = os.path.join(os.path.dirname(__file__), 'DT_CLUSTER_PREPROCESS') + + def setUp(self) -> None: + if os.path.exists(self.DIR_PATH): + shutil.rmtree(self.DIR_PATH) + os.makedirs(os.path.join(self.DIR_PATH, 'worker1_11111111_ascend_pt')) + open(os.path.join(self.DIR_PATH, 'worker1_11111111_ascend_pt', 'profiler_info_1.json'), 'w') + os.makedirs(os.path.join(self.DIR_PATH, 'worker2_11111112_ascend_pt')) + open(os.path.join(self.DIR_PATH, 'worker2_11111112_ascend_pt', 'profiler_info_2.json'), 'w') + os.makedirs(os.path.join(self.DIR_PATH, 'single_worker_11111111_ascend_pt')) + open(os.path.join(self.DIR_PATH, 'single_worker_11111111_ascend_pt', 'profiler_info.json'), 'w') + os.makedirs(os.path.join(self.DIR_PATH, 'worker1_11111112_ascend_pt')) + open(os.path.join(self.DIR_PATH, 'worker1_11111112_ascend_pt', 'profiler_info_1.json'), 'w') + os.makedirs(os.path.join(self.DIR_PATH, 'worker2_11111113_ascend_pt')) + open(os.path.join(self.DIR_PATH, 'worker2_11111113_ascend_pt', 'profiler_info_2.json'), 'w') + + def tearDown(self) -> None: + shutil.rmtree(self.DIR_PATH) + + def test_get_data_map(self): + PytorchDataPreprocessor(self.DIR_PATH) + + def test_get_rank_id_cluster(self): + check = PytorchDataPreprocessor(self.DIR_PATH) + ret = check.get_rank_id('worker1_11111111_ascend_pt') + self.assertEqual(ret, 1) + + def test_get_rank_id_single(self): + check = PytorchDataPreprocessor(self.DIR_PATH) + ret = check.get_rank_id('single_worker_11111111_ascend_pt') + self.assertEqual(ret, -1) + + def test_get_data_map(self): + check = PytorchDataPreprocessor(self.DIR_PATH) + with mock.patch("common_func.file_manager.FileManager.check_file_or_directory_path", return_value=True): + ret = check.get_data_map() + self.assertIn(1, ret.keys()) + self.assertIn(2, ret.keys()) + self.assertIn(os.path.join(self.DIR_PATH, 'worker1_11111111_ascend_pt'), ret.values()) + self.assertIn(os.path.join(self.DIR_PATH, 'worker2_11111112_ascend_pt'), ret.values()) diff --git a/profiler/cluster_analyse/test/ut/testcase/communication_group/analysis/test_step_trace_time_analysis.py b/profiler/cluster_analyse/test/ut/testcase/communication_group/analysis/test_step_trace_time_analysis.py new file mode 100644 index 0000000000000000000000000000000000000000..f5dda57d0e8c7355b50e343b5565731e44c60619 --- /dev/null +++ b/profiler/cluster_analyse/test/ut/testcase/communication_group/analysis/test_step_trace_time_analysis.py @@ -0,0 +1,82 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import unittest +from unittest import mock + +from analysis.step_trace_time_analysis import StepTraceTimeAnalysis +from prof_bean.step_trace_time_bean import StepTraceTimeBean +from common_func.constant import Constant + + +class TestStepTraceTimeAnalysis(unittest.TestCase): + DIR_PATH = '' + + def test_get_max_data_row(self): + check = StepTraceTimeAnalysis({}) + ls = [ + [1, 3, 5, 7, 10], + [2, 4, 6, 8, 11], + [1000, -1, -1, -1, -1] + ] + ret = check.get_max_data_row(ls) + self.assertEqual([1000, 4, 6, 8, 11], ret) + + def test_get_max_data_row_single_ls(self): + check = StepTraceTimeAnalysis({}) + ls = [ + [1, 3, 5, 7, 10] + ] + ret = check.get_max_data_row(ls) + self.assertEqual([1, 3, 5, 7, 10], ret) + + def test_analyze_step_time_normal(self): + check = StepTraceTimeAnalysis({}) + check.step_time_dict = { + 0: [ + StepTraceTimeBean({"Step": 0, "time1": 1, "time2": 2}), + StepTraceTimeBean({"Step": 1, "time1": 1, "time2": 2}), + ], + 1: [ + StepTraceTimeBean({"Step": 0, "time1": 10, "time2": 20}), + StepTraceTimeBean({"Step": 1, "time1": 10, "time2": 20}) + ] + } + check.communication_group = {Constant.P2P: [[0, 1]]} + check.analyze_step_time() + self.assertIn([0, 'stage', (0, 1), 10.0, 20.0], check.step_data_list) + + def test_analyze_step_time_normal_none_step(self): + check = StepTraceTimeAnalysis({}) + check.step_time_dict = { + 0: [ + StepTraceTimeBean({"Step": None, "time1": 1, "time2": 2}) + ], + 1: [ + StepTraceTimeBean({"Step": None, "time1": 10, "time2": 20}), + ], + 2: [ + StepTraceTimeBean({"Step": None, "time1": 2, "time2": 3}), + ], + 3: [ + StepTraceTimeBean({"Step": None, "time1": 1, "time2": 1}), + ], + } + check.communication_group = {Constant.P2P: [[0, 1], [2, 3]]} + check.analyze_step_time() + self.assertIn([None, 'stage', (2, 3), 2.0, 3.0], check.step_data_list) + self.assertIn([None, 'rank', 0, 1.0, 2.0], check.step_data_list) diff --git a/profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py b/profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py new file mode 100644 index 0000000000000000000000000000000000000000..f64daa053988bd61992c13e9edf6cebb80e19a0b --- /dev/null +++ b/profiler/cluster_analyse/test/ut/testcase/communication_group/test_communication_group_generator.py @@ -0,0 +1,110 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import unittest +from unittest import mock + +from communication_group.communication_group_generator import CommunicationGroupGenerator +from common_func.constant import Constant + + +class TestCommunicationGroupGenerator(unittest.TestCase): + DIR_PATH = '' + + def test_generate_p2p_communication_group_1p(self): + check = CommunicationGroupGenerator('', {}) + check.collective_group_dict = { + 'group1': {0} + } + with mock.patch("common_func.file_manager.FileManager.check_file_or_directory_path", return_value=True): + check.generate_p2p_communication_group() + ret = {0} + self.assertEqual(ret, set(check.communication_group[Constant.P2P][0])) + + def test_generate_p2p_communication_group_8p(self): + check = CommunicationGroupGenerator('', {}) + check.collective_group_dict = { + 'group1': {1, 2, 3, 4}, + 'group2': {5, 6, 7, 8}, + } + with mock.patch("common_func.file_manager.FileManager.check_file_or_directory_path", return_value=True): + check.generate_p2p_communication_group() + ret_a = {1, 2, 3, 4} + ret_b = {5, 6, 7, 8} + self.assertEqual(ret_a, set(check.communication_group[Constant.P2P][0])) + self.assertEqual(ret_b, set(check.communication_group[Constant.P2P][1])) + + def test_generate_p2p_communication_group_16p(self): + check = CommunicationGroupGenerator('', {}) + check.collective_group_dict = { + 'group1': {0, 1}, + 'group2': {0, 2}, + 'group3': {2, 3}, + 'group4': {3, 1}, + 'group5': {4, 5}, + 'group6': {4, 6}, + 'group7': {5, 7}, + 'group8': {6, 7}, + 'group9': {8, 9}, + 'group10': {8, 10}, + 'group11': {11, 10}, + 'group12': {11, 9}, + 'group13': {12, 13}, + 'group14': {12, 14}, + 'group15': {15, 13}, + 'group16': {15, 14} + } + with mock.patch("common_func.file_manager.FileManager.check_file_or_directory_path", return_value=True): + check.generate_p2p_communication_group() + ret_a = {0, 1, 2, 3} + ret_b = {4, 5, 6, 7} + ret_c = {8, 9, 10, 11} + ret_d = {12, 13, 14, 15} + self.assertEqual(ret_a, set(check.communication_group[Constant.P2P][0])) + self.assertEqual(ret_b, set(check.communication_group[Constant.P2P][1])) + self.assertEqual(ret_c, set(check.communication_group[Constant.P2P][2])) + self.assertEqual(ret_d, set(check.communication_group[Constant.P2P][3])) + print(check.communication_group[Constant.P2P]) + + def test_generate_p2p_communication_group_repeat_group(self): + check = CommunicationGroupGenerator('', {}) + check.collective_group_dict = { + 'group1': {0, 1, 2, 3}, + 'group2': {0, 1, 2, 3}, + 'group3': {0, 1, 2, 3}, + 'group4': {0, 1, 2, 3}, + 'group5': {3, 2, 4, 5}, + 'group6': {4, 5, 6, 7}, + 'group7': {4, 5, 6, 7}, + 'group8': {4, 5, 6, 7}, + 'group9': {8, 9, 11, 10}, + 'group10': {8, 9, 11, 10}, + 'group11': {11, 10, 12, 13}, + 'group12': {11, 10, 12, 13}, + 'group13': {11, 10, 12, 13}, + 'group14': {12, 13, 14, 15}, + 'group15': {12, 13, 14, 15}, + 'group16': {12, 13, 14, 15} + } + with mock.patch("common_func.file_manager.FileManager.check_file_or_directory_path", return_value=True): + check.generate_p2p_communication_group() + ret_a = {0, 1, 2, 3, 4, 5, 6, 7} + ret_b = {8, 9, 10, 11, 12, 13, 14, 15} + self.assertEqual(ret_a, set(check.communication_group[Constant.P2P][0])) + self.assertEqual(ret_b, set(check.communication_group[Constant.P2P][1])) + print(check.communication_group[Constant.P2P]) + diff --git a/profiler/cluster_analyse/test/ut/testcase/test_cluster_analysis.py b/profiler/cluster_analyse/test/ut/testcase/test_cluster_analysis.py new file mode 100644 index 0000000000000000000000000000000000000000..3bd10640820fa6e6e673318e9ee42906647cff17 --- /dev/null +++ b/profiler/cluster_analyse/test/ut/testcase/test_cluster_analysis.py @@ -0,0 +1,62 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import unittest +from unittest import mock + +from cluster_analysis import Interface +from common_func.constant import Constant + + +class TestInterface(unittest.TestCase): + + def test_empty_allocate_prof_data(self): + + with mock.patch("os.walk", return_value=[(None, [], None)]), \ + mock.patch("common_func.path_manager.PathManager.get_realpath", return_value=''): + check = Interface({}) + ret = {} + self.assertEqual(ret, check.allocate_prof_data()) + + def test_pytorch_allocate_prof_data(self): + dirs = ['xx_ascend_pt'] + with mock.patch("os.walk", return_value=[('a', dirs, None)]), \ + mock.patch("common_func.path_manager.PathManager.get_realpath", return_value=''), \ + mock.patch("cluster_data_preprocess.pytorch_data_preprocessor.PytorchDataPreprocessor.get_data_map", + return_value={1: 1}): + check = Interface({}) + self.assertEqual(1, len(check.allocate_prof_data())) + + def test_mindspore_allocate_prof_data(self): + dirs = ['xx_ascend_ms'] + with mock.patch("os.walk", return_value=[('a', dirs, None)]), \ + mock.patch("common_func.path_manager.PathManager.get_realpath", return_value=''), \ + mock.patch("cluster_data_preprocess.mindspore_data_preprocessor.MindsporeDataPreprocessor.get_data_map", + return_value={1: 1}): + check = Interface({}) + self.assertEqual(1, len(check.allocate_prof_data())) + + def test_both_allocate_prof_data(self): + dirs = ['xx_ascend_ms', 'xx_ascend_pt'] + with mock.patch("os.walk", return_value=[('a', dirs, None)]), \ + mock.patch("common_func.path_manager.PathManager.get_realpath", return_value=''), \ + mock.patch("cluster_data_preprocess.mindspore_data_preprocessor.MindsporeDataPreprocessor.get_data_map", + return_value={1: 1}), \ + mock.patch("cluster_data_preprocess.mindspore_data_preprocessor.MindsporeDataPreprocessor.get_data_map", + return_value={1: 1}): + check = Interface({}) + self.assertEqual(0, len(check.allocate_prof_data()))