diff --git a/profiler/compare_tools/compare_bean/__init__.py b/profiler/compare_tools/compare_bean/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/profiler/compare_tools/compare_bean/origin_data_bean/__init__.py b/profiler/compare_tools/compare_bean/origin_data_bean/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/profiler/compare_tools/compare_bean/origin_data_bean/compare_event.py b/profiler/compare_tools/compare_bean/origin_data_bean/compare_event.py new file mode 100644 index 0000000000000000000000000000000000000000..31cad3e59a9001b2f16a3279f5d307154604546e --- /dev/null +++ b/profiler/compare_tools/compare_bean/origin_data_bean/compare_event.py @@ -0,0 +1,53 @@ +from utils.constant import Constant + + +class KernelEvent: + def __init__(self, event: dict, device_type: int): + self._event = event + self._device_type = device_type + + @property + def kernel_name(self) -> str: + return self._event.get("name", "") + + @property + def device_dur(self) -> float: + return self._event.get("dur", 0) + + @property + def task_id(self) -> int: + return self._event.get("args", {}).get("Task Id") + + @property + def task_type(self) -> str: + return self._event.get("args", {}).get("Task Type") + + @property + def kernel_details(self): + if self._device_type == Constant.GPU: + return f"{self.kernel_name} [duration: {self.device_dur}]" + return f"{self.kernel_name}, {self.task_id}, {self.task_type} [duration: {self.device_dur}]\n" + + +class MemoryEvent: + def __init__(self, event: dict, name: str): + self._event = event + self._name = name + + @property + def size(self) -> float: + return self._event.get(Constant.SIZE, 0) + + @property + def duration(self) -> float: + if not self._event.get(Constant.ALLOCATION_TIME) or not self._event.get(Constant.RELEASE_TIME): + return 0 + return float(self._event.get(Constant.RELEASE_TIME)) - self._event.get(Constant.ALLOCATION_TIME, 0) + + @property + def memory_details(self) -> str: + name = self._event.get(Constant.NAME, "") if self._event.get(Constant.NAME, "") else self._name + release_time = self._event.get(Constant.RELEASE_TIME) + allocation_time = self._event.get(Constant.ALLOCATION_TIME) + duration = float(release_time) - float(allocation_time) if release_time and allocation_time else None + return f"{name}, ({allocation_time}, {release_time}), [duration: {duration}], [size: {self.size}]\n" diff --git a/profiler/compare_tools/compare_bean/origin_data_bean/trace_event_bean.py b/profiler/compare_tools/compare_bean/origin_data_bean/trace_event_bean.py new file mode 100644 index 0000000000000000000000000000000000000000..205c34d282943f738f3598c524b2276c553f3ba2 --- /dev/null +++ b/profiler/compare_tools/compare_bean/origin_data_bean/trace_event_bean.py @@ -0,0 +1,134 @@ +class TraceEventBean: + + def __init__(self, event: dict): + self._event = event + self._pid = 0 + self._tid = 0 + self._ts = 0.0 + self._dur = 0.0 + self._ph = "" + self._cat = "" + self._name = "" + self._args = {} + self.init() + + @property + def pid(self) -> int: + return self._pid + + @property + def tid(self) -> int: + return self._tid + + @property + def dur(self) -> float: + return self._dur + + @property + def start_time(self) -> float: + return self._ts + + @property + def end_time(self) -> float: + return self._ts + self._dur + + @property + def name(self) -> str: + return self._name + + @property + def lower_name(self) -> str: + return self._name.lower() + + @property + def lower_cat(self) -> str: + return self._cat.lower() + + @property + def args(self) -> str: + return self._args + + @property + def id(self) -> str: + return self._event.get("id") + + @property + def stream_id(self) -> int: + return self._args.get('Stream Id') + + @property + def stream(self) -> int: + return self._args.get("stream") + + @property + def task_type(self) -> int: + return self._args.get('Task Type') + + @property + def device_id(self) -> int: + return self._args.get('Device Id', -1) + + @property + def total_reserved(self): + return self._args.get('Total Reserved', 0) + + @property + def corr_id(self) -> int: + return self._args.get('correlation_id') + + @property + def process_name(self) -> int: + return self._args.get("name", "") + + @property + def event(self) -> dict: + return self._event + + def is_m_mode(self) -> bool: + return self._ph == "M" + + def is_x_mode(self) -> bool: + return self._ph == "X" + + def is_flow_start(self) -> bool: + return self._ph == "s" + + def is_flow_end(self) -> bool: + return self._ph == "f" + + def is_process_meta(self) -> bool: + return self.is_m_mode() and self._name == "process_name" + + def is_thread_meta(self) -> bool: + return self.is_m_mode() and self._name == "thread_name" + + def is_communication_op_thread(self) -> bool: + return self._args.get("name", "").find("Communication") != -1 + + def is_hccl_process_name(self) -> bool: + return self.process_name == "HCCL" + + def is_npu_process_name(self) -> bool: + return self.process_name == "Ascend Hardware" + + def is_computing_event(self): + return self._name == "Computing" + + def is_comm_not_overlap(self): + return self._name == 'Communication(Not Overlapped)' + + def is_valid_event(self): + return self._name and self._cat and self._dur and self._ts + + def is_dict(self): + return isinstance(self._event, dict) + + def init(self): + self._pid = self._event.get("pid", 0) + self._tid = self._event.get("tid", 0) + self._ts = float(self._event.get("ts", 0.0)) + self._dur = float(self._event.get("dur", 0.0)) + self._ph = self._event.get("ph", "") + self._cat = self._event.get("cat", "") + self._name = self._event.get("name", "") + self._args = self._event.get("args", {}) diff --git a/profiler/compare_tools/compare_bean/profiling_info.py b/profiler/compare_tools/compare_bean/profiling_info.py new file mode 100644 index 0000000000000000000000000000000000000000..d8f251987a0b0f540dbd9305c5218b1c1321dc86 --- /dev/null +++ b/profiler/compare_tools/compare_bean/profiling_info.py @@ -0,0 +1,28 @@ +from utils.constant import Constant + + +class ProfilingInfo: + TABLE_NAME = Constant.PERFORMANCE_TABLE + HEADERS = [] + OVERHEAD = [] + + def __init__(self, profiling_type: str): + self.profiling_type = profiling_type + self.cube_time = 0.0 + self.other_time = 0.0 + self.vec_time = 0.0 + self.cube_num = 0 + self.vec_num = 0 + self.sdma_num = 0 + self.fa_num_fwd = 0 + self.fa_num_bwd = 0 + self.compute_time = 0.0 + self.communication_not_overlapped = 0.0 + self.memory_used = 0.0 + self.e2e_time = 0.0 + self.sdma_time = 0.0 + self.scheduling_time = 0.0 + self.flash_attention_time_bwd = 0.0 + self.flash_attention_time_fwd = 0.0 + self.minimal_profiling = False + self.hide_op_details = False diff --git a/profiler/compare_tools/profiling_parser/__init__.py b/profiler/compare_tools/profiling_parser/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/profiler/compare_tools/profiling_parser/base_profiling_parser.py b/profiler/compare_tools/profiling_parser/base_profiling_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..97aaf6b473d44f9d823f5f50f304ee883d9cacdb --- /dev/null +++ b/profiler/compare_tools/profiling_parser/base_profiling_parser.py @@ -0,0 +1,116 @@ +from abc import abstractmethod, ABC + +from compare_bean.origin_data_bean.compare_event import KernelEvent +from compare_bean.origin_data_bean.trace_event_bean import TraceEventBean +from compare_bean.profiling_info import ProfilingInfo +from utils.args_manager import ArgsManager +from utils.constant import Constant +from utils.file_reader import FileReader + + +class BaseProfilingParser(ABC): + def __init__(self, args: any, path_dict: dict): + self._args = args + self._profiling_type = path_dict.get(Constant.PROFILING_TYPE) + self._profiling_path = path_dict.get(Constant.PROFILING_PATH) + self._json_path = path_dict.get(Constant.TRACE_PATH) + self._trace_events = FileReader.read_trace_file(self._json_path) + self._torch_op_data = [] + self._kernel_dict = {} + self._memory_list = [] + self._communication_dict = {} + self._overall_metrics = ProfilingInfo(path_dict.get(Constant.PROFILING_TYPE)) + self._enable_profiling_compare = ArgsManager().enable_profiling_compare + self._enable_operator_compare = ArgsManager().enable_operator_compare + self._enable_memory_compare = ArgsManager().enable_memory_compare + self._enable_communication_compare = ArgsManager().enable_communication_compare + self._dispatch_func = self._get_dispatch_func() + self._memory_events = [] + self._flow_dict = {} + self._all_kernels = {} + + @abstractmethod + def _update_memory_list(self): + raise NotImplementedError("Function update_memory_list need to be implemented.") + + @abstractmethod + def _calculate_performance_time(self): + raise NotImplementedError("Function update_memory_list need to be implemented.") + + @abstractmethod + def _update_overall_metrics(self): + raise NotImplementedError("Function update_memory_list need to be implemented.") + + @abstractmethod + def _picking_communication_event(self, **kwargs): + raise NotImplementedError("Function picking_communication_event need to be implemented.") + + @abstractmethod + def _picking_torch_op_event(self, **kwargs): + raise NotImplementedError("Function picking_torch_op_event need to be implemented.") + + @abstractmethod + def _picking_kernel_event(self, **kwargs): + raise NotImplementedError("Function picking_kernel_event need to be implemented.") + + @abstractmethod + def _picking_flow_event(self, **kwargs): + raise NotImplementedError("Function picking_flow_event need to be implemented.") + + @abstractmethod + def _get_dispatch_func(self): + raise NotImplementedError("Function _get_dispatch_func need to be implemented.") + + def load_data(self) -> dict: + self._dispatch_events() + self._update_kernel_dict() + self._update_memory_list() + self._update_communication_dict() + if self._enable_profiling_compare: + self._calculate_performance_time() + self._update_overall_metrics() + self._check_result_data() + return {Constant.TORCH_OP: self._torch_op_data, Constant.KERNEL_DICT: self._kernel_dict, + Constant.MEMORY_LIST: self._memory_list, Constant.COMMUNICATION_DICT: self._communication_dict, + Constant.OVERALL_METRICS: self._overall_metrics} + + def _update_communication_dict(self): + pass + + def _dispatch_events(self): + for event in self._trace_events: + if not event.is_dict(): + continue + if event.is_m_mode(): + continue + self.__picking_event(event) + + def __picking_event(self, event: TraceEventBean): + for func in self._dispatch_func: + res = func(event) + if res: + break + + def _update_kernel_dict(self): + for flow_event in self._flow_dict.values(): + start_event = flow_event.get("start") + end_event = flow_event.get("end") + if not start_event or not end_event: + continue + kernel_event = self._all_kernels.get(f"{end_event.pid}-{end_event.tid}-{end_event.start_time}") + if not kernel_event: + continue + self._kernel_dict.setdefault(start_event.start_time, []).append( + KernelEvent(kernel_event.event, self._profiling_type)) + + def _check_result_data(self): + args = ArgsManager() + if args.enable_operator_compare or args.enable_memory_compare: + if not self._torch_op_data: + print(f"[WARNING] Can't find any torch op in the file: {self._profiling_path}") + if args.enable_operator_compare and not self._kernel_dict: + print(f"[WARNING] Can't find any flow event in the file: {self._profiling_path}") + if args.enable_memory_compare and not self._memory_list: + print(f"[WARNING] Can't find any memory event in the file: {self._profiling_path}") + if args.enable_communication_compare and not self._communication_dict: + print(f"[WARNING] Can't find any communication op in the file: {self._profiling_path}") diff --git a/profiler/compare_tools/profiling_parser/gpu_profiling_parser.py b/profiler/compare_tools/profiling_parser/gpu_profiling_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..9e64e0ac836e22eb02f8c57332576db7920ce6bd --- /dev/null +++ b/profiler/compare_tools/profiling_parser/gpu_profiling_parser.py @@ -0,0 +1,215 @@ +from collections import defaultdict, Counter + +from compare_bean.origin_data_bean.trace_event_bean import TraceEventBean +from profiling_parser.base_profiling_parser import BaseProfilingParser +from utils.args_manager import ArgsManager +from utils.constant import Constant + + +class OpTimeWarper: + def __init__( + self, + cube_time: float = 0.0, + sdma_time: float = 0.0, + vec_time: float = 0.0, + fa_time_fwd: float = 0.0, + fa_time_bwd: float = 0.0, + all_op_time: float = 0.0, + compute_stream_dur: float = 0.0, + cube_num: int = 0, + vec_num: int = 0, + sdma_num: int = 0, + fa_num_bwd: int = 0, + fa_num_fwd: int = 0 + ): + self.cube_time = cube_time + self.sdma_time = sdma_time + self.vec_time = vec_time + self.fa_time_fwd = fa_time_fwd + self.fa_time_bwd = fa_time_bwd + self.all_op_time = all_op_time + self.compute_stream_dur = compute_stream_dur + self.cube_num = cube_num + self.vec_num = vec_num + self.sdma_num = sdma_num + self.fa_num_bwd = fa_num_bwd + self.fa_num_fwd = fa_num_fwd + + +class GPUProfilingParser(BaseProfilingParser): + NCCL_MARK = 'nccl' + CUBE_MARK = 'gemm' + FA_MARK_LIST = [['fmha', 'kernel'], ['flash', 'kernel']] + SDMA_MARK_LIST = ['htod', 'dtod', 'dtoh', 'memset (device)'] + + def __init__(self, args: any, path_dict: dict): + super().__init__(args, path_dict) + self._trace_events = [TraceEventBean(event) for event in self._trace_events.get("traceEvents", [])] + self._flow_cat = (ArgsManager().args.gpu_flow_cat,) if ArgsManager().args.gpu_flow_cat else ( + "async_gpu", "async_cpu_to_gpu", "ac2g", "async") + self._compute_stream_id = self._infer_compute_stream_id() + self._time_wrapper = OpTimeWarper() + self._marks = defaultdict(int) + + @classmethod + def __is_flash_attention(cls, name: str): + for fa_mark in cls.FA_MARK_LIST: + if not [1 for mark in fa_mark if mark not in name.lower()]: + return True + return False + + @classmethod + def __is_sdma_time(cls, name: str): + for mark in cls.SDMA_MARK_LIST: + if mark in name.lower(): + return True + return False + + def _update_memory_list(self): + if not self._enable_memory_compare: + return + self._memory_events.sort(key=lambda x: x.start_time) + addr_dict = {} + for memory_event in self._memory_events: + allocate_bytes = memory_event.args.get("Bytes", 0) / Constant.BYTE_TO_KB + record = addr_dict.get(memory_event.args.get("Addr")) + if allocate_bytes > 0: + if record: + self._memory_list.append(record) + addr_dict[memory_event.args.get("Addr")] = {Constant.SIZE: allocate_bytes, + Constant.TS: memory_event.start_time, + Constant.ALLOCATION_TIME: memory_event.start_time} + if allocate_bytes < 0 and record: + if abs(allocate_bytes) == record.get(Constant.SIZE): + record[Constant.RELEASE_TIME] = memory_event.start_time + self._memory_list.append(record) + del addr_dict[memory_event.args.get("Addr")] + + def _calculate_performance_time(self): + for event in self._trace_events: + if event.args and event.args.get('stream') == self._compute_stream_id: + self._time_wrapper.compute_stream_dur += event.dur + if not event.is_valid_event(): + continue + if event.args and event.args.get('stream') == self._compute_stream_id: + if self.__is_sdma_time(event.name): + self._time_wrapper.sdma_time += float(event.dur) + self._time_wrapper.sdma_num += 1 + continue + if event.lower_cat != 'kernel': + continue + if self.NCCL_MARK in event.lower_name: + for timestep in range(int(event.start_time + 1), int(event.end_time + 1)): + self._marks[str(timestep)] += 1 # mark this timestep in communication stream + continue + else: + for timestep in range(int(event.start_time + 1), int(event.end_time + 1)): + self._marks[str(timestep)] += -100 # mark this timestep in compute stream + if self.__is_flash_attention(event.name): + if 'bwd' in event.lower_name: + self._time_wrapper.fa_time_bwd += event.dur + self._time_wrapper.fa_num_bwd += 1 + else: + self._time_wrapper.fa_time_fwd += event.dur + self._time_wrapper.fa_num_fwd += 1 + elif self.CUBE_MARK in event.lower_name: + self._time_wrapper.cube_num += 1 + self._time_wrapper.cube_time += event.dur + else: + self._time_wrapper.vec_num += 1 + self._time_wrapper.vec_time += event.dur + self._time_wrapper.all_op_time += event.dur + + def _update_overall_metrics(self): + self._overall_metrics.compute_time = len([_ for _, value in self._marks.items() if value < 0]) / 10 ** 6 + self._overall_metrics.communication_not_overlapped = len( + [_ for _, value in self._marks.items() if value > 0]) / 10 ** 6 + self._overall_metrics.flash_attention_time_bwd = self._time_wrapper.fa_time_bwd / 10 ** 6 + self._overall_metrics.flash_attention_time_fwd = self._time_wrapper.fa_time_fwd / 10 ** 6 + self._overall_metrics.cube_time = self._time_wrapper.cube_time / 10 ** 6 + self._overall_metrics.vec_time = self._overall_metrics.compute_time - ( + self._time_wrapper.cube_time + self._time_wrapper.fa_time_fwd + self._time_wrapper.fa_time_bwd) / 10 ** 6 + self._overall_metrics.cube_num = self._time_wrapper.cube_num + self._overall_metrics.vec_num = self._time_wrapper.vec_num + self._overall_metrics.sdma_num = self._time_wrapper.sdma_num + self._overall_metrics.fa_num_bwd = self._time_wrapper.fa_num_bwd + self._overall_metrics.fa_num_fwd = self._time_wrapper.fa_num_fwd + self._overall_metrics.sdma_time = self._time_wrapper.sdma_time / 10 ** 6 + self.__parse_e2e_time() + self._overall_metrics.scheduling_time = self._overall_metrics.e2e_time - self._overall_metrics.compute_time - \ + self._overall_metrics.communication_not_overlapped + self.__parse_memory_reserved() + + def _picking_communication_event(self, event: TraceEventBean): + if event.lower_cat == "kernel" and event.lower_name.split("_")[0] == "ncclkernel": + name_list = event.lower_name.split("_") + if len(name_list) > 2: + self._communication_dict.setdefault(name_list[1], {}).setdefault("comm_list", []).append(event.dur) + return True + return False + + def _picking_memory_event(self, event: TraceEventBean): + if event.lower_name == '[memory]' and event.device_id >= 0: + self._memory_events.append(event) + return True + return False + + def _picking_torch_op_event(self, event: TraceEventBean): + if event.lower_cat in ("cpu_op", "user_annotation", "cuda_runtime", "operator"): + self._torch_op_data.append(event.event) + return True + return False + + def _picking_kernel_event(self, event: TraceEventBean): + if event.lower_cat == "kernel" and event.lower_name.split("_")[0] != "ncclkernel": + self._all_kernels[f"{event.pid}-{event.tid}-{event.start_time}"] = event + return True + return False + + def _picking_flow_event(self, event: TraceEventBean): + if event.lower_cat in self._flow_cat: + if event.is_flow_start(): + self._flow_dict.setdefault(event.id, {})["start"] = event + elif event.is_flow_end(): + self._flow_dict.setdefault(event.id, {})["end"] = event + return True + return False + + def __parse_e2e_time(self): + compute_events_timeline = [event for event in self._trace_events if event.args and event.args.get('stream')] + compute_events_timeline = sorted(compute_events_timeline, key=lambda event: event.start_time) + self._overall_metrics.e2e_time = (compute_events_timeline[-1].end_time - compute_events_timeline[ + 0].start_time) / 10 ** 6 + + def __parse_memory_reserved(self): + memories = [event.total_reserved for event in self._memory_events] + if not memories: + print("[INFO] Gpu profiling data doesn't contain memory info.") + return + self._overall_metrics.memory_used = max(memories) / 1024 ** 3 + + def _get_dispatch_func(self): + func_list = [] + if self._enable_memory_compare or self._enable_operator_compare: + func_list.append(self._picking_torch_op_event) + if self._enable_operator_compare: + func_list.append(self._picking_kernel_event) + func_list.append(self._picking_flow_event) + if self._enable_memory_compare or self._enable_profiling_compare: + func_list.append(self._picking_memory_event) + if self._enable_communication_compare: + func_list.append(self._picking_communication_event) + return func_list + + def _infer_compute_stream_id(self): + if not self._enable_profiling_compare: + return -1 + kernel_stream_ids = [] + for event in self._trace_events: + is_kernel_exec_event = event.lower_cat == 'kernel' and self.NCCL_MARK not in event.lower_name + if is_kernel_exec_event and event.stream: + kernel_stream_ids.append(event.stream) + if not kernel_stream_ids: + raise RuntimeError('[ERROR] The profiling data does not contain kernel running data.') + counter = Counter(kernel_stream_ids) + return counter.most_common(1)[0][0]