diff --git a/profiler/cluster_analyse/analysis/base_analysis.py b/profiler/cluster_analyse/analysis/base_analysis.py index 908331f3e321d2cc2abbcf36577baacaffb61bca..8f48fd10e34fd9b8fd610b65f06f00b5b6cba6eb 100644 --- a/profiler/cluster_analyse/analysis/base_analysis.py +++ b/profiler/cluster_analyse/analysis/base_analysis.py @@ -13,11 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import sys -import traceback -import shutil -import pandas as pd + from abc import abstractmethod from common_func.constant import Constant diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py index 81215b0ce3f500f44d2d6191a1fc2f87682e643f..57c37e66d6fd72c30d3a6194342a942f4a3c6ba2 100644 --- a/profiler/cluster_analyse/analysis/communication_analysis.py +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -120,8 +120,6 @@ class CommunicationAnalysisOptimized(BaseAnalysis): self._communication_group = param.get(Constant.COMM_DATA_DICT, {}).get(Constant.COMMUNICATION_GROUP) self._aggregate_time = {} self._aggregate_bandwidth = {} - self._total_transit_size = {} - self._total_transit_time = {} self._output_time = [] self._output_bandwidth = [] @@ -135,9 +133,10 @@ class CommunicationAnalysisOptimized(BaseAnalysis): def _format_time_data(communication_data): data_dict = {} for single_op in communication_data: - formated_data = CommunicationTimeBean(single_op) - data_dict.setdefault(formated_data.step_id, {}).\ - setdefault(formated_data.rank_id, []).extend([formated_data]) + formatted_data = CommunicationTimeBean(single_op) + data_dict.setdefault(formatted_data.step_id, {}).\ + setdefault(formatted_data.rank_id, {}).\ + setdefault(formatted_data.group_name, []).extend([formatted_data]) return data_dict def run(self): @@ -147,27 +146,15 @@ class CommunicationAnalysisOptimized(BaseAnalysis): self._aggregate_bandwidth = self._format_bandwidth_data(self._communication_ops[1]) self._compute_total_info() self._dump_data() - - def _update_total_bandwidth_data(self, formated_data: any): - step_id = formated_data.step_id - if self._total_transit_size.get(step_id) is None: - self._total_transit_size.setdefault(step_id, 0.0) - else: - self._total_transit_size[step_id] += formated_data.transit_size - if self._total_transit_time.get(step_id) is None: - self._total_transit_time.setdefault(step_id, 0.0) - else: - self._total_transit_time[step_id] += formated_data.transit_time def _format_bandwidth_data(self, communication_data: dict): data_dict = {} for single_op in communication_data: - formated_data = CommunicationBandwidthBean(single_op) - data_dict.setdefault(formated_data.step_id, {}).\ - setdefault(formated_data.rank_id, {}).\ - setdefault(formated_data.transport_type, {}).\ - setdefault(formated_data.package_size, []).extend([formated_data]) - self._update_total_bandwidth_data(formated_data) + formatted_data = CommunicationBandwidthBean(single_op) + data_dict.setdefault(formatted_data.step_id, {}).\ + setdefault(formatted_data.rank_id, {}).\ + setdefault(formatted_data.transport_type, {}).\ + setdefault(formatted_data.package_size, []).extend([formatted_data]) return data_dict def _dump_data(self): @@ -186,25 +173,36 @@ class CommunicationAnalysisOptimized(BaseAnalysis): return for step_id, rank_dict in self._aggregate_time.items(): for rank_id, communication_op_info in rank_dict.items(): - total_dict = { - TableConstant.RANK_ID: rank_id, - TableConstant.STEP: step_id, - TableConstant.GROUP_NAME: '', - TableConstant.HCCL_OP_NAME: Constant.TOTAL_OP_INFO - } - total_time_info = CommunicationTimeBean(total_dict) - for com_info_dict in communication_op_info: - total_time_info += com_info_dict - self._output_time.append(com_info_dict.convert_output()) - total_time_info.compute_ratio() - communication_op_info.append(total_time_info) - self._output_time.append(total_time_info.convert_output()) + rank_set_dict = {} + for group_name, single_group_op_info in communication_op_info.items(): + total_dict = { + TableConstant.RANK_ID: rank_id, + TableConstant.STEP: step_id, + TableConstant.GROUP_NAME: group_name, + TableConstant.HCCL_OP_NAME: Constant.TOTAL_OP_INFO + } + total_time_info = CommunicationTimeBean(total_dict) + for com_info_dict in single_group_op_info: + total_time_info += com_info_dict + self._output_time.append(com_info_dict.convert_output()) + rank_set = str(self.collective_group_dict.get(group_name)) + if not rank_set: + logger.warning("failed to find rank set with group name: %s.", group_name) + continue + if rank_set_dict.get(rank_set): + rank_set_dict[rank_set] += total_time_info + else: + rank_set_dict[rank_set] = total_time_info + for _, total_time_info in rank_set_dict.items(): + total_time_info.compute_ratio() + self._output_time.append(total_time_info.convert_output()) for step_id, rank_dict in self._aggregate_bandwidth.items(): - total_transit_size = self._total_transit_size.get(step_id) - total_transit_time = self._total_transit_time.get(step_id) - total_bandwidth = total_transit_size / total_transit_time if total_transit_time else 0.0 for rank_id, communication_op_info in rank_dict.items(): for transport_type, bandwidth_info in communication_op_info.items(): + total_transit_size = 0.0 + total_transit_time = 0.0 + total_info = [] + op_group_set = set() for package_size, package_info in bandwidth_info.items(): total_dict = { TableConstant.RANK_ID: rank_id, @@ -212,14 +210,24 @@ class CommunicationAnalysisOptimized(BaseAnalysis): TableConstant.GROUP_NAME: '', TableConstant.HCCL_OP_NAME: Constant.TOTAL_OP_INFO, TableConstant.TRANSPORT_TYPE: transport_type, - TableConstant.TRANSIT_SIZE: total_transit_size, - TableConstant.TRANSIT_TIME: total_transit_time, - TableConstant.BANDWIDTH: total_bandwidth, + TableConstant.TRANSIT_SIZE: 0.0, + TableConstant.TRANSIT_TIME: 0.0, + TableConstant.BANDWIDTH: 0.0, TableConstant.PACKAGE_SIZE: package_size } total_bandwidth_info = CommunicationBandwidthBean(total_dict) for bandwidth_package_info in package_info: total_bandwidth_info += bandwidth_package_info self._output_bandwidth.append(bandwidth_package_info.convert_output()) - package_info.append(total_bandwidth_info.convert_output()) - self._output_bandwidth.append(total_bandwidth_info.convert_output()) + op_group = bandwidth_package_info.hccl_op_name + "@" + bandwidth_package_info.group_name + if op_group not in op_group_set: + op_group_set.add(op_group) + total_transit_size += bandwidth_package_info.transit_size + total_transit_time += bandwidth_package_info.transit_time + total_info.append(total_bandwidth_info) + total_bandwidth = total_transit_size / total_transit_time if total_transit_time else 0.0 + for single_total_info in total_info: + single_total_info.set_transit_size(total_transit_size) + single_total_info.set_transit_time(total_transit_time) + single_total_info.set_bandwidth(total_bandwidth) + self._output_bandwidth.append(single_total_info.convert_output()) diff --git a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index 55c3d03958b97c427fe8fde0625e72ea4dee8997..1606cda79f0bf2cddc4d71cc759f8ca487a4dbab 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -12,7 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import glob + + from collections import defaultdict import os diff --git a/profiler/cluster_analyse/cluster_kernels_analysis/cluster_prof_Info_analysis.py b/profiler/cluster_analyse/cluster_kernels_analysis/cluster_prof_Info_analysis.py index 94be611c36bcff88233ea966b8e795f5e56bdca6..2b004b54c315b456b8bf2375c33c3f3d84ce1725 100644 --- a/profiler/cluster_analyse/cluster_kernels_analysis/cluster_prof_Info_analysis.py +++ b/profiler/cluster_analyse/cluster_kernels_analysis/cluster_prof_Info_analysis.py @@ -18,7 +18,6 @@ import argparse import re import os import stat -import shutil import warnings from pathlib import Path diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index 9f8245b740fa10d0f39aff269386636bcf3beb84..f78f798b35e8cbc1029387bd9a3564f2bf516789 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os class Constant(object): # dir name diff --git a/profiler/cluster_analyse/communication_group/communication_json_group.py b/profiler/cluster_analyse/communication_group/communication_json_group.py index b609b5de0e5051f26a8c9e5d30b71c3abd30b96f..c0e027033c47d95d44c85bba84ac56499a16a7da 100644 --- a/profiler/cluster_analyse/communication_group/communication_json_group.py +++ b/profiler/cluster_analyse/communication_group/communication_json_group.py @@ -14,8 +14,7 @@ # limitations under the License. import os - -from common_func.constant import Constant + from common_func.file_manager import FileManager from communication_group.base_communication_group import BaseCommunicationGroup diff --git a/profiler/cluster_analyse/prof_bean/communication_bandwidth_bean.py b/profiler/cluster_analyse/prof_bean/communication_bandwidth_bean.py index 80438552eec0e634662beaf3119742ddd3f51e78..e26a5000844fc6144b77eaf4f04def416e849670 100644 --- a/profiler/cluster_analyse/prof_bean/communication_bandwidth_bean.py +++ b/profiler/cluster_analyse/prof_bean/communication_bandwidth_bean.py @@ -40,9 +40,9 @@ class CommunicationBandwidthBean: TableConstant.GROUP_NAME: self._group_name, TableConstant.HCCL_OP_NAME: self._hccl_op_name, TableConstant.TRANSPORT_TYPE: self._transport_type, - TableConstant.TRANSIT_SIZE: self._transit_size + other._transit_size, - TableConstant.TRANSIT_TIME: self._transit_time + other._transit_time, - TableConstant.BANDWIDTH: self._bandwidth + other._bandwidth, + TableConstant.TRANSIT_SIZE: self._transit_size, + TableConstant.TRANSIT_TIME: self._transit_time, + TableConstant.BANDWIDTH: self._bandwidth, TableConstant.LARGE_PACKET_RATIO: 0, TableConstant.PACKAGE_SIZE: self._package_size, TableConstant.COUNT: self._count + other._count, @@ -74,9 +74,26 @@ class CommunicationBandwidthBean: def transit_size(self): return self._transit_size + @property + def group_name(self): + return self._group_name + + @property + def hccl_op_name(self): + return self._hccl_op_name + + def set_transit_size(self, value: float): + self._transit_size = value + + def set_transit_time(self, value: float): + self._transit_time = value + + def set_bandwidth(self, value: float): + self._bandwidth = value + def convert_output(self): return [ self._step_id, self._rank_id, self._hccl_op_name, self._group_name, - self._transport_type, self._transit_size, self._transit_time, self._bandwidth, + self._transport_type, self._transit_size, self._transit_time, round(self._bandwidth, 4), self._large_packet_ratio, self._package_size, self._count, self._total_duration ] diff --git a/profiler/cluster_analyse/prof_bean/communication_time_bean.py b/profiler/cluster_analyse/prof_bean/communication_time_bean.py index a3ef35669c464d2025d19b68ddf742c5a30f7d56..98aa86c941131086147c71cacc6190cb72cb82f3 100644 --- a/profiler/cluster_analyse/prof_bean/communication_time_bean.py +++ b/profiler/cluster_analyse/prof_bean/communication_time_bean.py @@ -56,15 +56,19 @@ class CommunicationTimeBean: def step_id(self): return self._step_id + @property + def group_name(self): + return self._group_name + def compute_ratio(self): - total_duration = self._elapsed_time + self._transit_time + self._wait_time +\ - self._synchronization_time +self._idle_time + total_duration = self._transit_time + self._synchronization_time self._sync_ratio = self._synchronization_time / total_duration if total_duration != 0 else 0 + total_duration = self._transit_time + self._wait_time self._wait_ratio = self._wait_time / total_duration if total_duration != 0 else 0 def convert_output(self): return [ self._step_id, self._rank_id, self._hccl_op_name, self._group_name, - self._start_time, self._transit_time, self._wait_time, self._synchronization_time, - self._idle_time, self._elapsed_time, self._sync_ratio, self._wait_ratio + self._start_time, self._elapsed_time, self._transit_time, self._wait_time, self._synchronization_time, + self._idle_time, self._sync_ratio, self._wait_ratio ]