diff --git a/profiler/compare_tools/performance_compare.py b/profiler/compare_tools/performance_compare.py index dec22a4ec28b5dfa32594e0ad9c04050b5ddea0b..cf42641be10c99f42c6f4b5bc00066bda64d8ad5 100644 --- a/profiler/compare_tools/performance_compare.py +++ b/profiler/compare_tools/performance_compare.py @@ -8,23 +8,12 @@ import time from generation.comparison_generator import ComparisonGenerator from utils.args_manager import ArgsManager from profiling_analysis.profiling_parse import prof_main -from utils.constant import Constant def performance_compare(args): if not args.enable_profiling_compare: return - npu_path = '' - gpu_path = '' - if ArgsManager().base_profiling_type == Constant.NPU: - npu_path = ArgsManager().base_profiling.file_path - elif ArgsManager().base_profiling_type == Constant.GPU: - gpu_path = ArgsManager().base_profiling.file_path - if ArgsManager().comparison_profiling_type == Constant.NPU: - npu_path = ArgsManager().comparison_profiling.file_path - elif ArgsManager().comparison_profiling_type == Constant.GPU: - gpu_path = ArgsManager().comparison_profiling.file_path - prof_main(npu_path, gpu_path) + prof_main() def main(): @@ -45,6 +34,7 @@ def main(): args = parser.parse_args() ArgsManager().init(args) + try: performance_compare(args) except Exception: diff --git a/profiler/compare_tools/profiling_analysis/gpu_parser.py b/profiler/compare_tools/profiling_analysis/gpu_parser.py index 4cefc94ccfefc5a5a7fe9fcd6ac213c806926d3f..449bc7514fb7d15571b44a03ff1f49b535dd3aef 100644 --- a/profiler/compare_tools/profiling_analysis/gpu_parser.py +++ b/profiler/compare_tools/profiling_analysis/gpu_parser.py @@ -17,27 +17,34 @@ from collections import Counter, defaultdict import pandas as pd import profiling_analysis.parser_helper as parser_helper +from utils.file_reader import FileReader class GpuProfilingParser: + NCCL_MARK = 'nccl' + CUBE_MARK = 'gemm' + FA_MARK_LIST = [['fmha', 'kernel'], ['flash', 'kernel']] + def __init__(self, gpu_path): - self.trace_events = self.read_profiling_json_file(gpu_path) + self.trace_events = FileReader.read_trace_file(gpu_path).get('traceEvents') self.compute_stream_id = self.infer_compute_stream_id() self.one_step_time = 0 - self.profiling_info = parser_helper.ProfilingInfo() + self.profiling_info = parser_helper.ProfilingInfo('GPU') - @staticmethod - def read_profiling_json_file(json_path): - data = parser_helper.read_json_file(json_path) - if 'traceEvents' not in data: - raise RuntimeError("The gpu profiling json doesn't contain traceEvents data.") - return data.get('traceEvents') + def is_flash_attention(self, name: str): + for fa_mark in self.FA_MARK_LIST: + if not len([1 for mark in fa_mark if mark not in name.lower()]): + return True + return False def parse_events(self): cube_time = 0.0 all_op_time = 0.0 + fa_time = 0.0 + cube_num = 0 + vec_num = 0 op_list = [] - compute_stream_dur = 0.0 # 计算流耗时 + compute_stream_dur = 0.0 marks = defaultdict(int) # mark for compute communication_not_overlapped time for event in self.trace_events: @@ -50,39 +57,44 @@ class GpuProfilingParser: name = event.get('name') dur = event.get('dur') ts = event.get('ts') - cat = event.get('cat') + cat = event.get('cat', '') if cat.lower() != 'kernel': continue - if 'nccl' in name.lower(): + if self.NCCL_MARK in name.lower(): for timestep in range(ts + 1, ts + dur + 1): marks[str(timestep)] += 1 # mark this timestep in communication stream continue else: for timestep in range(ts + 1, ts + dur + 1): marks[str(timestep)] += -100 # mark this timestep in compute stream - if 'gemm' in name.lower(): + if self.is_flash_attention(name): + fa_time += float(dur) + elif self.CUBE_MARK in name.lower(): + cube_num += 1 cube_time += float(dur) + else: + vec_num += 1 all_op_time += float(dur) op_list.append([ts, name, cat, dur]) op_dataframe = pd.DataFrame(op_list, columns=['time start', 'name', 'cat', 'dur']) op_dataframe.to_csv('gpu_perf.csv', index=False) - self.profiling_info.compute_time = compute_stream_dur / 10 ** 6 + self.profiling_info.compute_time = len([_ for _, value in marks.items() if value < 0]) / 10 ** 6 self.profiling_info.communication_not_overlapped = len([_ for _, value in marks.items() if value > 0]) / 10 ** 6 + self.profiling_info.flash_attention_time = fa_time / 10 ** 6 self.profiling_info.cube_time = cube_time / 10 ** 6 - self.profiling_info.vector_time = (all_op_time - cube_time) / 10 ** 6 + self.profiling_info.vec_time = (all_op_time - cube_time - fa_time) / 10 ** 6 + self.profiling_info.cube_num = cube_num + self.profiling_info.vec_num = vec_num self.parse_e2e_time() - if self.one_step_time: - self.profiling_info.scheduling_time = self.one_step_time - all_op_time / 10 ** 6 - \ - self.profiling_info.communication_not_overlapped - else: - self.profiling_info.scheduling_time = self.profiling_info.e2e_time - all_op_time / 10 ** 6 - \ - self.profiling_info.communication_not_overlapped + + self.profiling_info.scheduling_time = self.profiling_info.e2e_time - all_op_time / 10 ** 6 - \ + self.profiling_info.communication_not_overlapped self.profiling_info.scheduling_ratio = self.profiling_info.scheduling_time / self.profiling_info.e2e_time self.parse_memory_reserved() def parse_e2e_time(self): compute_events_timeline = [event for event in self.trace_events if - event.get('args') and event.get('args').get('stream') == self.compute_stream_id] + event.get('args') and event.get('args').get('stream')] compute_events_timeline = sorted(compute_events_timeline, key=lambda event: event.get('ts')) self.profiling_info.e2e_time = (compute_events_timeline[-1].get('ts') + compute_events_timeline[-1].get('dur') - compute_events_timeline[0].get('ts')) / 10 ** 6 @@ -93,18 +105,18 @@ class GpuProfilingParser: if event.get('name', '').lower() == '[memory]' and event.get('args').get('Device Id') >= 0 ] if not memories: - print("Gpu profiling data doesn't contain memory info") + print("[INFO] Gpu profiling data doesn't contain memory info") return self.profiling_info.memory_used = max(memories) / 1024 ** 3 def infer_compute_stream_id(self): kernel_stream_ids = [] for event in self.trace_events: - is_kernel_exec_event = event.get('cat', '').lower() == 'kernel' and 'nccl' not in event.get('name', '').lower() + is_kernel_exec_event = event.get('cat', '').lower() == 'kernel' and self.NCCL_MARK not in event.get('name', '').lower() has_stream_id_event = event.get('args') and event.get('args').get('stream') if is_kernel_exec_event and has_stream_id_event: kernel_stream_ids.append(event.get('args').get('stream')) if not kernel_stream_ids: - raise RuntimeError('The profiling data does not contain kernel running data.') + raise RuntimeError('[ERROR] The profiling data does not contain kernel running data.') counter = Counter(kernel_stream_ids) return counter.most_common(1)[0][0] diff --git a/profiler/compare_tools/profiling_analysis/npu_parser.py b/profiler/compare_tools/profiling_analysis/npu_parser.py index d0547c04c9a3185631bc81c82cc6b8d4830355b3..5fca9d3ea32db1e21e5fac04fffb7b5bcffe828c 100644 --- a/profiler/compare_tools/profiling_analysis/npu_parser.py +++ b/profiler/compare_tools/profiling_analysis/npu_parser.py @@ -17,32 +17,34 @@ import sys import pandas as pd from collections import defaultdict import profiling_analysis.parser_helper as parser_helper +from utils.file_reader import FileReader class NpuProfilingParser: - def __init__(self, npu_step_time, add_cube_name, npu_file_path): + FLASH_ATTENTION = "flashattention" + + def __init__(self, npu_step_time, npu_file_path): self.npu_json_file = npu_file_path.get('trace_view') - self.npu_summary_file = npu_file_path.get('op_summary') + self.npu_summary_file = npu_file_path.get('kernel_details') self.npu_mem_file = npu_file_path.get('memory_record') - self.profiling_info = parser_helper.ProfilingInfo() + self.info_json = npu_file_path.get('info') + self.profiling_info = parser_helper.ProfilingInfo('NPU') self.npu_step_time = npu_step_time self.parallel_time = 0 self.aicore_time = 0 - self.cube_op_type = ['MatMul', 'BatchMatMul'] - self.cube_op_type = list(set(self.cube_op_type + add_cube_name)) - self.min_aicore_ts = sys.float_info.max - self.max_aicore_ts = sys.float_info.min + self.min_stream_ts = sys.float_info.max + self.max_stream_ts = sys.float_info.min def parse_npu_json_events(self): if not self.npu_json_file: - print('Npu trace json file is not available.') + print('[WARNING] Npu trace json file is not available.') return compute_time = 0 communication_time = 0 min_ts = sys.float_info.max max_ts = sys.float_info.min ts_flag = False # 表明没有获取到compute time的耗时 - data = parser_helper.read_json_file(self.npu_json_file) + data = FileReader.read_trace_file(self.npu_json_file) event_wait_sqe = defaultdict(list) ai_core_dict = defaultdict(list) event_wait_sqe_res = defaultdict(float) @@ -74,7 +76,7 @@ class NpuProfilingParser: compute_stream = list(event_wait_sqe.keys() & ai_core_dict.keys()) parallel_stream = list(ai_core_dict.keys() - set(compute_stream)) else: - print('Npu trace json file lack of Stream info') + print('[WARNING] Npu trace json file lack of Stream info') return cs_event_wait_sqe_list = event_wait_sqe[compute_stream[0]] if parallel_stream: @@ -83,7 +85,7 @@ class NpuProfilingParser: sorted(cs_ai_core_list, key=lambda x: (x[0])) self.parallel_time = self.interval_intersection(cs_event_wait_sqe_list, cs_ai_core_list) self.profiling_info.compute_time = compute_time / 10 ** 6 if ts_flag else ai_core_res[compute_stream[0]] / 10 ** 6 - self.profiling_info.e2e_time = (max_ts - min_ts) / 10 ** 6 if ts_flag else (self.max_aicore_ts - self.min_aicore_ts) / 10 ** 6 + self.profiling_info.e2e_time = (max_ts - min_ts) / 10 ** 6 if ts_flag else (self.max_stream_ts - self.min_stream_ts) / 10 ** 6 self.profiling_info.communication_not_overlapped = communication_time / 10 ** 6 \ if ts_flag else (event_wait_sqe_res[compute_stream[0]] - self.parallel_time) / 10 ** 6 time_required = self.profiling_info.compute_time + self.profiling_info.communication_not_overlapped @@ -94,48 +96,60 @@ class NpuProfilingParser: self.profiling_info.scheduling_ratio = self.profiling_info.scheduling_time / self.profiling_info.e2e_time \ if self.profiling_info.e2e_time != 0 else 0 + def parse_info_json(self): + if not self.info_json: + return + json_data = FileReader.read_trace_file(self.info_json) + if not json_data: + return + if "ProfilerActivity.CPU" not in json_data.get('config', {}).get('common_config', {}).get('activities', []): + return + if 'Level0' != json_data.get('experimental_config', {}).get('_profiler_level', ''): + return + self.profiling_info.minimal_profiling = True + def parse_npu_csv_events(self): + self.parse_mem_csv() if not self.npu_summary_file: - print('Npu op summary csv file is not available.') + print('[WARNING] Npu kernel details csv file is not available.') return info = pd.read_csv(self.npu_summary_file, index_col=None) cube_time = 0.0 vec_time = 0.0 - ai_core_time = 0.0 - vec_mac_flag = True # True标记当前summary文件中存在pmu信息 + fa_time = 0.0 + cube_num = 0 + vec_num = 0 if info.get('aic_mac_time(us)') is None or info.get('aiv_vec_time(us)') is None: - print('当前的profiling结果可能是极简模式,通过cube算子白名单进行区分,白名单如下:') - print(self.cube_op_type) - vec_mac_flag = False + self.profiling_info.hide_op_details = True + return for i in range(len(info['Model ID'])): - task_type = info.loc[i, 'Task Type'] - if task_type not in ['AI_CORE']: - continue - task_durations = info.loc[i, 'Task Duration(us)'] - ai_core_time += task_durations - op_type = info.loc[i, 'OP Type'] - if not vec_mac_flag: # 如果是极简模式根据OP_Type计算完cube time后提前返回 - cube_time += task_durations if op_type in self.cube_op_type else 0.0 - continue + op_type = info.loc[i, 'Type'] aiv_vec_time = info.loc[i, 'aiv_vec_time(us)'] - if aiv_vec_time > 0: + if pd.isna(aiv_vec_time) or pd.isna(op_type): + continue + task_durations = info.loc[i, 'Duration(us)'] + if self.FLASH_ATTENTION in op_type.lower(): + fa_time += task_durations + elif aiv_vec_time > 0: vec_time += task_durations - - if vec_mac_flag: - cube_time = (ai_core_time - vec_time) / 10 ** 6 - vec_time /= 10 ** 6 - else: - vec_time = (ai_core_time - cube_time) / 10 ** 6 - cube_time /= 10 ** 6 - self.profiling_info.cube_time = cube_time - self.profiling_info.vector_time = vec_time + vec_num += 1 + else: + cube_time += task_durations + cube_num += 1 + self.profiling_info.cube_time = cube_time / 10 ** 6 + self.profiling_info.vec_time = vec_time / 10 ** 6 + self.profiling_info.flash_attention_time = fa_time / 10 ** 6 + self.profiling_info.cube_num = cube_num + self.profiling_info.vec_num = vec_num + + def parse_mem_csv(self): if not self.npu_mem_file: - print('Npu op memory csv file is not available.') + print('[INFO] Npu op memory csv file is not available.') return try: info = pd.read_csv(self.npu_mem_file, usecols=['Total Reserved(MB)'], index_col=None) except ValueError: - print('Npu profiling data does not contain memory info.') + print('[ERROR] Load memory info failed.') else: self.profiling_info.memory_used = max(info.get('Total Reserved(MB)')) / 1024 @@ -167,7 +181,7 @@ class NpuProfilingParser: enent_wait_res[stream_id] += dur event_wait_sqe[stream_id].append([ts, ts + dur]) elif args.get('Task Type') == 'AI_CORE': - self.min_aicore_ts = ts if ts < self.min_aicore_ts else self.min_aicore_ts - self.max_aicore_ts = (ts + dur) if (ts + dur) > self.max_aicore_ts else self.max_aicore_ts ai_core_res[stream_id] += dur ai_core_dict[stream_id].append([ts, ts + dur]) + self.min_stream_ts = ts if ts < self.min_stream_ts else self.min_stream_ts + self.max_stream_ts = (ts + dur) if (ts + dur) > self.max_stream_ts else self.max_stream_ts diff --git a/profiler/compare_tools/profiling_analysis/parser_helper.py b/profiler/compare_tools/profiling_analysis/parser_helper.py index 958a3146bb58898cdb76003f5f59476a45c1593f..5fe5f1770c7a97ce27d5c488e6b3b84cab93b289 100644 --- a/profiler/compare_tools/profiling_analysis/parser_helper.py +++ b/profiler/compare_tools/profiling_analysis/parser_helper.py @@ -18,20 +18,18 @@ import os class ProfilingInfo: - def __init__(self): + def __init__(self, profiling_type: str): + self.profiling_type = profiling_type self.cube_time = 0.0 - self.vector_time = 0.0 + self.vec_time = 0.0 + self.cube_num = 0 + self.vec_num = 0 self.compute_time = 0.0 self.communication_not_overlapped = 0.0 self.scheduling_ratio = 0.0 self.memory_used = 0.0 self.e2e_time = 0.0 self.scheduling_time = 0.0 - - -def read_json_file(path): - if not os.path.isfile(path): - raise ValueError(f'The path "{path}" is not a valid json file.') - with open(path, 'r', encoding='utf-8') as json_handler: - data = json.load(json_handler) - return data + self.flash_attention_time = 0.0 + self.minimal_profiling = False + self.hide_op_details = False diff --git a/profiler/compare_tools/profiling_analysis/profiling_parse.py b/profiler/compare_tools/profiling_analysis/profiling_parse.py index 10e3a6d30567c08e8764face3cb95fa1dbb5a46f..17b1cb30681a1c7e9b422169b127ce88ac8cd693 100644 --- a/profiler/compare_tools/profiling_analysis/profiling_parse.py +++ b/profiler/compare_tools/profiling_analysis/profiling_parse.py @@ -21,66 +21,94 @@ from prettytable import PrettyTable from profiling_analysis.gpu_parser import GpuProfilingParser from profiling_analysis.npu_parser import NpuProfilingParser from profiling_analysis.parser_helper import ProfilingInfo - - -def parse_command(): - parser = argparse.ArgumentParser() - parser.add_argument('-g', '--gpu', required=False, default='', metavar='(FILE)', help='Gpu profiling json file.') - parser.add_argument('-glt', '--gpu_log_time', required=False, default=0.0, type=float, help='Gpu one step time(s)') - parser.add_argument('-n', '--npu', required=False, default='', metavar='(FILE)', - help='Npu single core profiling root path.') - parser.add_argument('-nlt', '--npu_log_time', required=False, default=0.0, metavar='(FILE)', type=float, - help='Npu one step time(s).') - parser.add_argument('-aop', '--add_cube_op', required=False, default=[], nargs='*', help='add cube op name') - return parser.parse_args() - - -def show_table(gpu_profiling_info, npu_profiling_info): +from utils.args_manager import ArgsManager +from utils.constant import Constant + + +def generate_table_info(base_profiling_info, comp_profiling_info, table): + headers = [''] + base_col = [f'{base_profiling_info.profiling_type}'] + comp_col = [f'{comp_profiling_info.profiling_type}'] + if not base_profiling_info.hide_op_details and not comp_profiling_info.hide_op_details: + headers.extend(['Cube Time(Num)', 'Vector Time(Num)']) + base_col.extend([f'{base_profiling_info.cube_time:.3f}s({base_profiling_info.cube_num})', + f'{base_profiling_info.vec_time:.3f}s({base_profiling_info.vec_num})']) + comp_col.extend([f'{comp_profiling_info.cube_time:.3f}s({comp_profiling_info.cube_num})', + f'{comp_profiling_info.vec_time:.3f}s({comp_profiling_info.vec_num})']) + if base_profiling_info.flash_attention_time or comp_profiling_info.flash_attention_time: + headers.append('Flash Attention Time') + base_col.append(f'{base_profiling_info.flash_attention_time:.3f}s') + comp_col.append(f'{comp_profiling_info.flash_attention_time:.3f}s') + headers.extend(['Computing Time']) + base_col.extend([f'{base_profiling_info.compute_time:.3f}s']) + comp_col.extend([f'{comp_profiling_info.compute_time:.3f}s']) + if base_profiling_info.memory_used or comp_profiling_info.memory_used: + headers.append('Mem Usage') + base_col.append(f'{base_profiling_info.memory_used:.2f}G') + comp_col.append(f'{comp_profiling_info.memory_used:.2f}G') + cue = '' + if ((base_profiling_info.profiling_type == "NPU" and not base_profiling_info.minimal_profiling) or + (comp_profiling_info.profiling_type == "NPU" and not comp_profiling_info.minimal_profiling)): + cue = '(Not minimal profiling)' + + headers.extend(['Uncovered Communication Time', 'Free Time', 'E2E Time' + cue]) + base_col.extend( + [f'{base_profiling_info.communication_not_overlapped: .3f}s', f'{base_profiling_info.scheduling_time:.3f}s', + f'{base_profiling_info.e2e_time:.3f}s']) + comp_col.extend( + [f'{comp_profiling_info.communication_not_overlapped: .3f}s', f'{comp_profiling_info.scheduling_time:.3f}s', + f'{comp_profiling_info.e2e_time:.3f}s']) + table.field_names = headers + table.add_row(base_col) + table.add_row(comp_col) + + +def show_table(base_profiling_info, comp_profiling_info): table = PrettyTable() - table.title = '大模型性能拆解' - table.field_names = ['', 'cube算子', 'vector算子', '计算流耗时', '通信', '调度耗时', '调度占比', '内存', - 'E2E性能值'] - table.add_row(['GPU基线', f'{gpu_profiling_info.cube_time:.3f}s', f'{gpu_profiling_info.vector_time:.3f}s', - f'{gpu_profiling_info.compute_time:.3f}s', f'{gpu_profiling_info.communication_not_overlapped: .3f}s', - f'{gpu_profiling_info.scheduling_time:.3f}', f'{gpu_profiling_info.scheduling_ratio:.2%}', - f'{gpu_profiling_info.memory_used:.2f}G', f'{gpu_profiling_info.e2e_time:.3f}s']) - table.add_row(['当前现状', f'{npu_profiling_info.cube_time:.3f}s', f'{npu_profiling_info.vector_time:.3f}s', - f'{npu_profiling_info.compute_time:.3f}s', f'{npu_profiling_info.communication_not_overlapped: .3f}s', - f'{npu_profiling_info.scheduling_time:.3f}', f'{npu_profiling_info.scheduling_ratio:.2%}', - f'{npu_profiling_info.memory_used:.2f}G', f'{npu_profiling_info.e2e_time:.3f}s']) + table.title = 'Model Profiling Time Distribution' + generate_table_info(base_profiling_info, comp_profiling_info, table) print(table) def parse_gpu(gpu_path): - if gpu_path: - gpu_parser = GpuProfilingParser(gpu_path) - gpu_parser.parse_events() - return gpu_parser.profiling_info - print('Gpu trace json file is not specified.') - return ProfilingInfo() + gpu_parser = GpuProfilingParser(gpu_path) + gpu_parser.parse_events() + return gpu_parser.profiling_info def parse_npu(npu_path): - npu_parser = NpuProfilingParser(0, [], npu_path) - npu_parser.parse_npu_csv_events() - npu_parser.parse_npu_json_events() - return npu_parser.profiling_info - - -def prof_main(npu_path, gpu_path): - if not npu_path or not gpu_path: - return - - npu_dir = {'trace_view': None, 'memory_record': None, 'op_summary': None} + npu_dir = {'trace_view': None, 'memory_record': None, 'kernel_details': None} for root, _, files in os.walk(npu_path): for file in files: if file == 'trace_view.json': npu_dir['trace_view'] = os.path.join(root, file) if file == 'memory_record.csv': npu_dir['memory_record'] = os.path.join(root, file) - if 'op_summary_' in file: - npu_dir['op_summary'] = os.path.join(root, file) - show_table(parse_gpu(gpu_path), parse_npu(npu_dir)) + if 'kernel_details' in file: + npu_dir['kernel_details'] = os.path.join(root, file) + if 'profiler_info' in file: + npu_dir['info'] = os.path.join(root, file) + + npu_parser = NpuProfilingParser(0, npu_dir) + npu_parser.parse_npu_csv_events() + npu_parser.parse_info_json() + npu_parser.parse_npu_json_events() + return npu_parser.profiling_info + + +def prof_main(): + base_info = ProfilingInfo('None') + comp_info = ProfilingInfo('None') + if ArgsManager().base_profiling_type == Constant.NPU: + base_info = parse_npu(ArgsManager().base_profiling.file_path) + elif ArgsManager().base_profiling_type == Constant.GPU: + base_info = parse_gpu(ArgsManager().base_profiling.file_path) + if ArgsManager().comparison_profiling_type == Constant.NPU: + comp_info = parse_npu(ArgsManager().comparison_profiling.file_path) + elif ArgsManager().comparison_profiling_type == Constant.GPU: + comp_info = parse_gpu(ArgsManager().comparison_profiling.file_path) + + show_table(base_info, comp_info) if __name__ == '__main__':