From 0dd9ddd96595433356f222afc85a526ffbce46c5 Mon Sep 17 00:00:00 2001 From: cabbage Date: Thu, 3 Aug 2023 11:41:22 +0800 Subject: [PATCH] =?UTF-8?q?distributed=20=E8=AF=95=E5=9B=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../torch_tb_profiler/profiler/data.py | 22 ++ .../torch_tb_profiler/profiler/loader.py | 20 +- .../profiler/run_generator.py | 283 +++++++++++++++--- .../tb_plugin/torch_tb_profiler/run.py | 4 + 4 files changed, 278 insertions(+), 51 deletions(-) diff --git a/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/data.py b/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/data.py index f699f43b0f..c46a4873c4 100644 --- a/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/data.py +++ b/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/data.py @@ -111,6 +111,14 @@ class RunProfileData(object): # npu operator data self.operator_path: str = None + # npu communication data + self.distributed_csv_path: str = None + self.communication_json_path: str = None + + self.step_to_overlap = None + self.step_to_wait = None + self.comm_op = None + @staticmethod def parse_gpu(worker, span, path, cache_dir): trace_path, trace_json, _ = RunProfileData._preprocess_file(path, cache_dir, 'GPU') @@ -128,6 +136,8 @@ class RunProfileData(object): has_kernel = False has_memory_record = False has_memory_operator = False + has_communication_overlap = False + has_communication_wait_ops = False for file in io.listdir(path): if utils.is_npu_trace_path(file): has_trace = True @@ -153,8 +163,16 @@ class RunProfileData(object): if str(file) == 'operator_details.csv': profile.has_operator_view = True profile.operator_path = io.join(path, file) + if str(file) == 'step_trace_time.csv': + has_communication_overlap = True + profile.distributed_csv_path = io.join(path, file) + if str(file) == 'communication.json': + has_communication_wait_ops = True + profile.communication_json_path = io.join(path, file) + profile.has_kernel = has_kernel profile.has_memory = has_memory_operator and has_memory_record + profile.has_communication = has_communication_wait_ops and has_communication_overlap return profile @staticmethod @@ -408,6 +426,10 @@ class DistributedRunProfileData: self.device_props = run_profile_data.device_props self.distributed_info = run_profile_data.distributed_info + self.step_to_overlap = run_profile_data.step_to_overlap + self.step_to_wait = run_profile_data.step_to_wait + self.comm_op = run_profile_data.comm_op + self.total_comm_stats = None self.step_comm_stats = None diff --git a/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/loader.py b/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/loader.py index 32a7ce3230..a05c02efdf 100644 --- a/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/loader.py +++ b/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/loader.py @@ -110,6 +110,10 @@ class RunLoader(object): generator = RunGenerator(worker, span, data, self.device_target) profile = generator.generate_run_profile() + if self.device_target == 'Ascend': + data.step_to_overlap = profile.step_to_overlap + data.step_to_wait = profile.step_to_wait + data.comm_op = profile.comm_op dist_data = DistributedRunProfileData(data) logger.debug('Sending back profile via mp.Queue') @@ -141,6 +145,18 @@ class RunLoader(object): return span_profiles def _process_distributed_profiles(self, profiles: List[DistributedRunProfileData], span): + if self.device_target != 'Ascend': + return self._gpu_distributed(profiles, span) + else: + for data in profiles: + if not data.has_communication: + logger.debug('There is no communication profile in this NPU run.') + return None + generator = DistributedRunGenerator(profiles, span, self.device_target) + profile = generator.generate_run_profile() + return profile + + def _gpu_distributed(self, profiles, span): has_communication = True comm_node_lists: List[List[CommunicationNode]] = [] for data in profiles: @@ -157,7 +173,7 @@ class RunLoader(object): logger.debug('Processing profile data finish') if not has_communication: - logger.debug('There is no communication profile in this run.') + logger.debug('There is no communication profile in this GPU run.') return None worker_num = len(comm_node_lists) @@ -184,6 +200,6 @@ class RunLoader(object): for data in profiles: data.communication_parse() - generator = DistributedRunGenerator(profiles, span) + generator = DistributedRunGenerator(profiles, span, self.device_target) profile = generator.generate_run_profile() return profile diff --git a/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/run_generator.py b/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/run_generator.py index 09e38c03e5..eff3f421de 100644 --- a/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/run_generator.py +++ b/tb_plugins/profiling/tb_plugin/torch_tb_profiler/profiler/run_generator.py @@ -1,11 +1,16 @@ # ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # -------------------------------------------------------------------------- +import csv +import json +import re +import io as sysio from collections import OrderedDict, defaultdict +from json import JSONDecodeError from typing import Dict, Iterable, List -import csv +import numpy as np -from .. import consts, utils +from .. import consts, utils, io from ..run import DistributedRunProfile, RunProfile from .data import DistributedRunProfileData, RunProfileData from .module_op import aggegate_module_view, aggegate_pl_module_view @@ -84,6 +89,11 @@ class RunGenerator(object): profile_run.memory_all_curve = self._get_memory_all_curve() profile_run.memory_events = self._get_memory_event(peak_memory_events) + if self.profile_data.has_communication: + profile_run.views.append(consts.DISTRIBUTED_VIEW) + profile_run.step_to_overlap = self._npu_get_overlap() + profile_run.step_to_wait, profile_run.comm_op = self._npu_get_wait_table() + if self.profile_data.has_trace: profile_run.views.append(consts.TRACE_VIEW) profile_run.trace_file_path = self.profile_data.trace_file_path @@ -111,6 +121,99 @@ class RunGenerator(object): return profile_run + def _npu_get_overlap(self): + path = self.profile_data.distributed_csv_path + overlap_by_steps: Dict[str, List[float]] = OrderedDict() + data = RunGenerator._get_csv_data(path) + if len(data) <= 1: + return overlap_by_steps + title = [x.lower() for x in data[0]] + title_name = RunGenerator._check_overlap_data(title) + if title_name is None: + logger.error("Incomplete content of CSV file.") + return overlap_by_steps + + for step in data[1:]: + key = step[0] + overlap = [float(step[int(title_name[0])]), float(step[int(title_name[1])]), + float(step[int(title_name[2])]), float(step[int(title_name[3])])] + if key in overlap_by_steps: + overlap_by_steps[key] = list(np.add(overlap, overlap_by_steps[key])) + else: + overlap_by_steps[key] = list(overlap) + return overlap_by_steps + + @staticmethod + def _check_overlap_data(title): + # csv: step / compute time / communication_not_overlap / overlap / communication / free time + length = len(title) + if length < 5: + return + key = ["compute time", "overlapped time", "communication time not overlapped", "free time"] + get_key = list() + for j in key: + for i in range(length): + if j == title[i]: + get_key.append(i) + if len(get_key) < 4: + return None + return get_key + + def _npu_get_wait_table(self): + path = self.profile_data.communication_json_path + if not io.exists(path): + raise FileNotFoundError(path) + data = io.read(path) + try: + communication_json = json.loads(data, strict=False) + except JSONDecodeError as e: + try: + communication_json = json.loads(data, strict=False) + except JSONDecodeError: + with sysio.StringIO() as fout: + str_data = data.decode('utf-8') + # only replace the N/A without surrounding double quote + fout.write(re.sub(r'(?