From d8b3777822463564c25d78fbe7c1ad2e9fa2f46e Mon Sep 17 00:00:00 2001 From: tangmengcheng <745274877@qq.com> Date: Mon, 18 Aug 2025 17:24:54 +0800 Subject: [PATCH] profiler analyse optim --- .../prof_common_func/test_task_manager.py | 13 +- .../prof_parse/test_fwk_file_parser.py | 178 +++++++++++++++ .../prepare_parser/test_fwk_pre_parser.py | 78 +++++++ test/profiler/test_npu_profiler.py | 36 +++ .../analysis/prof_bean/_memory_use_bean.py | 61 ++++-- .../analysis/prof_bean/_op_mark_bean.py | 49 +++-- .../analysis/prof_bean/_torch_op_bean.py | 82 ++++--- .../analysis/prof_common_func/_constant.py | 59 ++++- .../analysis/prof_common_func/_id_manager.py | 5 +- .../analysis/prof_common_func/_log.py | 2 +- .../prof_common_func/_task_manager.py | 60 +++++ .../prof_common_func/_tree_builder.py | 2 +- .../analysis/prof_config/_parser_config.py | 39 +++- .../prof_config/_parser_deps_config.py | 44 ++-- .../analysis/prof_parse/_event_tree_parser.py | 4 +- .../prof_parse/_fwk_cann_relation_parser.py | 5 +- .../analysis/prof_parse/_fwk_file_parser.py | 159 +++++--------- .../prof_parse/_python_trace_parser.py | 7 +- .../prof_view/_communication_parser.py | 2 + .../analysis/prof_view/_integrate_parser.py | 2 + .../analysis/prof_view/_kernel_view_parser.py | 2 + .../prof_view/_memory_prepare_parser.py | 43 ++-- .../analysis/prof_view/_memory_view_parser.py | 11 +- .../prof_view/_operator_view_parser.py | 6 +- .../analysis/prof_view/_stack_view_parser.py | 10 +- .../prof_view/_trace_step_time_parser.py | 6 +- .../analysis/prof_view/_trace_view_parser.py | 17 +- .../prof_view/cann_parse/_cann_analyze.py | 2 + .../prof_view/cann_parse/_cann_export.py | 7 + .../prepare_parse/_fwk_pre_parser.py | 79 ++++++- .../prepare_parse/_relation_parser.py | 6 +- .../prof_db_parse/_basic_db_parser.py | 2 + .../prof_db_parse/_communication_db_parser.py | 2 + .../prof_view/prof_db_parse/_db_parser.py | 2 + .../prof_db_parse/_fwk_api_db_parser.py | 206 ++++++------------ .../prof_db_parse/_gc_record_db_parser.py | 2 + .../prof_db_parse/_memory_db_parser.py | 11 +- .../prof_db_parse/_step_info_db_parser.py | 2 + .../_trace_step_time_db_parser.py | 9 +- 39 files changed, 914 insertions(+), 398 deletions(-) create mode 100644 test/profiler/analysis/prof_parse/test_fwk_file_parser.py create mode 100644 test/profiler/analysis/prof_view/prepare_parser/test_fwk_pre_parser.py diff --git a/test/profiler/analysis/prof_common_func/test_task_manager.py b/test/profiler/analysis/prof_common_func/test_task_manager.py index 20656e720f0..536b8f2d833 100644 --- a/test/profiler/analysis/prof_common_func/test_task_manager.py +++ b/test/profiler/analysis/prof_common_func/test_task_manager.py @@ -3,6 +3,7 @@ import pickle import select import time import multiprocessing +from unittest.mock import patch from torch_npu.profiler.analysis.prof_common_func._constant import Constant from torch_npu.profiler.analysis.prof_common_func._task_manager import ( @@ -90,7 +91,8 @@ class TestTaskManager(TestCase): self.output = None self.text = None - def test_run_in_main_process(self): + @patch.object(ConcurrentTasksManager, 'log_task_execution_summary') + def test_run_in_main_process(self, mock_log_summary): manager = ConcurrentTasksManager() task_success = TaskSuccess([], ConcurrentMode.MAIN_PROCESS) task_fail = TaskFailed([], ConcurrentMode.MAIN_PROCESS) @@ -104,7 +106,8 @@ class TestTaskManager(TestCase): self.assertEqual(TaskStatus.Failed, task_infos.get("task_fail").status) self.assertEqual(TaskStatus.Running, task_infos.get("task_exception").status) - def test_run_in_sub_process(self): + @patch.object(ConcurrentTasksManager, 'log_task_execution_summary') + def test_run_in_sub_process(self, mock_log_summary): manager = ConcurrentTasksManager() task_success = TaskSuccess([], ConcurrentMode.SUB_PROCESS) task_fail = TaskFailed([], ConcurrentMode.SUB_PROCESS) @@ -118,7 +121,8 @@ class TestTaskManager(TestCase): self.assertEqual(TaskStatus.Failed, task_infos.get("task_fail").status) self.assertEqual(TaskStatus.Failed, task_infos.get("task_exception").status) - def test_run_in_sub_thread(self): + @patch.object(ConcurrentTasksManager, 'log_task_execution_summary') + def test_run_in_sub_thread(self, mock_log_summary): manager = ConcurrentTasksManager() task_success = TaskSuccess([], ConcurrentMode.PTHREAD) task_fail = TaskFailed([], ConcurrentMode.PTHREAD) @@ -129,7 +133,8 @@ class TestTaskManager(TestCase): self.assertEqual(TaskStatus.Succeed, task_infos.get("task_success").status) self.assertEqual(TaskStatus.Failed, task_infos.get("task_fail").status) - def test_run_sub_process_deps(self): + @patch.object(ConcurrentTasksManager, 'log_task_execution_summary') + def test_run_sub_process_deps(self, mock_log_summary): manager = ConcurrentTasksManager() task_serial1 = TaskSerial1([], ConcurrentMode.SUB_PROCESS) task_serial2 = TaskSerial2(["task_serial1"], ConcurrentMode.SUB_PROCESS) diff --git a/test/profiler/analysis/prof_parse/test_fwk_file_parser.py b/test/profiler/analysis/prof_parse/test_fwk_file_parser.py new file mode 100644 index 00000000000..0a54592960e --- /dev/null +++ b/test/profiler/analysis/prof_parse/test_fwk_file_parser.py @@ -0,0 +1,178 @@ +import os +import tempfile +from unittest.mock import patch, MagicMock + +from torch_npu.testing.testcase import TestCase, run_tests +from torch_npu.profiler.analysis.prof_parse._fwk_file_parser import FwkFileParser +from torch_npu.profiler.analysis.prof_common_func._constant import Constant, ApiType + + +class MockTorchOpBean: + """Mock TorchOpBean for testing""" + def __init__(self, pid=12345, tid=67890, name="test_op", ts=1000000, end_ns=2000000, args=None): + self.pid = pid + self.tid = tid + self.name = name + self.ts = ts + self.end_ns = end_ns + self.dur = end_ns - ts + self.args = args or {} + + +class MockOpMarkBean: + """Mock OpMarkBean for testing""" + def __init__(self, pid=12345, tid=67890, name="test_mark", ts=1000000, dur=500000, + corr_id=1, origin_name="origin_test", category=1): + self.pid = pid + self.tid = tid + self.name = name + self.ts = ts + self.time_ns = ts + self.dur = dur + self.corr_id = corr_id + self.origin_name = origin_name + self.args = {"correlation_id": corr_id} + self._category = category + + +class TestFwkFileParser(TestCase): + + def setUp(self): + self.test_dir = "temp" + self.fwk_dir = os.path.join(self.test_dir, "FRAMEWORK") + + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.ProfilerLogger') + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.ProfilerPathManager') + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.os.listdir') + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.os.path.isfile') + def test_get_fwk_trace_data_should_return_valid_list_when_given_torch_ops_and_taskqueue(self, mock_isfile, mock_listdir, mock_path_manager, mock_logger): + """Test basic trace data generation""" + mock_path_manager.get_fwk_path.return_value = self.fwk_dir + mock_listdir.return_value = [] + mock_isfile.return_value = True + mock_logger.init.return_value = None + mock_logger.get_instance.return_value = MagicMock() + + torch_op_data = [ + MockTorchOpBean(pid=12345, tid=1001, name="conv2d", ts=1000000, end_ns=2000000), + MockTorchOpBean(pid=12345, tid=1001, name="matmul", ts=2000000, end_ns=3000000) + ] + enqueue_data = [ + MockOpMarkBean(pid=12345, tid=1001, name="Enqueue@conv2d", ts=1000000, dur=1000000, corr_id=10), + MockOpMarkBean(pid=12345, tid=1001, name="Enqueue@matmul", ts=2000000, dur=1000000, corr_id=20) + ] + dequeue_data = [ + MockOpMarkBean(pid=12345, tid=1002, name="Dequeue@conv2d", ts=2000000, dur=1000000, corr_id=10), + MockOpMarkBean(pid=12345, tid=1002, name="Dequeue@matmul", ts=3000000, dur=1000000, corr_id=20) + ] + + parser = FwkFileParser(self.test_dir) + + with patch.object(parser, 'get_python_trace_data', return_value=[]), \ + patch.object(parser, 'get_gc_record_trace_data', return_value=[]): + + result = parser.get_fwk_trace_data(torch_op_data, enqueue_data, dequeue_data) + + # Verify results + self.assertIsInstance(result, list) + self.assertEqual(len(result), 17) # 6 x event + 2 e event + 2 f event + 7 m event + + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.ProfilerLogger') + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.ProfilerPathManager') + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.os.listdir') + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.os.path.isfile') + def test_get_fwk_api_should_return_valid_dict_when_given_torch_ops(self, mock_isfile, mock_listdir, mock_path_manager, mock_logger): + """Test basic API data generation""" + # Setup mocks + mock_path_manager.get_fwk_path.return_value = self.fwk_dir + mock_listdir.return_value = [] + mock_isfile.return_value = True + mock_logger.init.return_value = None + mock_logger.get_instance.return_value = MagicMock() + + torch_op_data = [ + MockTorchOpBean(pid=12345, tid=1001, name="conv2d", ts=1000000, end_ns=2000000, + args={Constant.SEQUENCE_NUMBER: 100, Constant.FORWARD_THREAD_ID: 0}) + ] + + parser = FwkFileParser(self.test_dir) + + with patch.object(parser, 'get_file_data_by_tag', return_value=[]), \ + patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.Str2IdManager') as mock_str_mgr, \ + patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.ConnectionIdManager') as mock_conn_mgr, \ + patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.CallChainIdManager') as mock_chain_mgr: + + mock_str_instance = MagicMock() + mock_str_instance.get_id_from_str.return_value = 1001 + mock_str_mgr.return_value = mock_str_instance + + mock_conn_instance = MagicMock() + mock_conn_mgr.return_value = mock_conn_instance + + mock_chain_instance = MagicMock() + mock_chain_mgr.return_value = mock_chain_instance + + result = parser.get_fwk_api(torch_op_data, [], []) + + # Verify results + self.assertIsInstance(result, dict) + self.assertIn(Constant.TORCH_OP_DATA, result) + self.assertEqual(len(result[Constant.TORCH_OP_DATA]), 1) + api_data = result[Constant.TORCH_OP_DATA][0] + self.assertEqual(api_data[0], 1000000) + self.assertEqual(api_data[1], 2000000) + self.assertEqual(api_data[10], ApiType.TORCH_OP) + self.assertEqual(len(api_data), 11) + + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.ProfilerLogger') + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.ProfilerPathManager') + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.os.listdir') + @patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.os.path.isfile') + def test_get_fwk_api_should_process_task_queue_data_when_given_enqueue_dequeue_ops(self, mock_isfile, mock_listdir, mock_path_manager, mock_logger): + """Test API data generation with task queue""" + # Setup mocks + mock_path_manager.get_fwk_path.return_value = self.fwk_dir + mock_listdir.return_value = [] + mock_isfile.return_value = True + mock_logger.init.return_value = None + mock_logger.get_instance.return_value = MagicMock() + + enqueue_data = [MockOpMarkBean(name="enqueue_op", ts=500000, dur=100000, corr_id=1)] + dequeue_data = [MockOpMarkBean(name="dequeue_op", ts=1500000, dur=200000, corr_id=1)] + + parser = FwkFileParser(self.test_dir) + + with patch.object(parser, 'get_file_data_by_tag', return_value=[]), \ + patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.Str2IdManager') as mock_str_mgr, \ + patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.ConnectionIdManager') as mock_conn_mgr, \ + patch('torch_npu.profiler.analysis.prof_parse._fwk_file_parser.CallChainIdManager') as mock_chain_mgr: + + mock_str_instance = MagicMock() + mock_str_instance.get_id_from_str.return_value = 2001 + mock_str_mgr.return_value = mock_str_instance + + mock_conn_instance = MagicMock() + mock_conn_instance.get_id_from_connection_ids.return_value = 1 + mock_conn_mgr.return_value = mock_conn_instance + + mock_chain_instance = MagicMock() + mock_chain_mgr.return_value = mock_chain_instance + + result = parser.get_fwk_api([], enqueue_data, dequeue_data) + + # Verify results + self.assertIsInstance(result, dict) + self.assertEqual(len(result[Constant.ENQUEUE_DATA]), 1) + self.assertEqual(len(result[Constant.DEQUEUE_DATA]), 1) + enqueue = result[Constant.ENQUEUE_DATA][0] + dequeue = result[Constant.DEQUEUE_DATA][0] + self.assertEqual(enqueue[0], 500000) # enqueue start_ns + self.assertEqual(dequeue[0], 1500000) # dequeue start_ns + self.assertEqual(enqueue[10], ApiType.TASK_QUEUE) # api_type + self.assertEqual(dequeue[10], ApiType.TASK_QUEUE) # api_type + self.assertEqual(len(enqueue), 11) + self.assertEqual(len(dequeue), 11) + + +if __name__ == "__main__": + run_tests() diff --git a/test/profiler/analysis/prof_view/prepare_parser/test_fwk_pre_parser.py b/test/profiler/analysis/prof_view/prepare_parser/test_fwk_pre_parser.py new file mode 100644 index 00000000000..f03ab181daa --- /dev/null +++ b/test/profiler/analysis/prof_view/prepare_parser/test_fwk_pre_parser.py @@ -0,0 +1,78 @@ +from unittest.mock import patch + +from torch_npu.testing.testcase import TestCase, run_tests +from torch_npu.profiler.analysis.prof_view.prepare_parse._fwk_pre_parser import ( + TaskQueueParser, TorchOpParser, DbPreParser +) +from torch_npu.profiler.analysis.prof_common_func._constant import Constant + + +class TestFwkPreParsers(TestCase): + + def setUp(self): + self.test_dir = "temp" + self.param_dict = { + "profiler_path": self.test_dir, + "output_path": self.test_dir + } + + @patch('torch_npu.profiler.analysis.prof_common_func._path_manager.ProfilerPathManager.get_cann_path', return_value="/mock/cann/path") + def test_task_queue_parser_should_return_success_with_enqueue_dequeue_data_when_run_with_valid_params(self, mock_get_cann_path): + parser = TaskQueueParser("test_task_queue", self.param_dict) + + mock_enqueue_data = [{"name": "enqueue_op1", "ts": 1000}] + mock_dequeue_data = [{"name": "dequeue_op1", "ts": 2000}] + + with patch('torch_npu.profiler.analysis.prof_view.prepare_parse._fwk_pre_parser.ProfilerLogger'): + with patch('torch_npu.profiler.analysis.prof_view.prepare_parse._fwk_pre_parser.FwkFileParser') as mock_parser: + mock_instance = mock_parser.return_value + mock_instance.get_task_queue_data.return_value = (mock_enqueue_data, mock_dequeue_data) + + status, result = parser.run({}) + + self.assertEqual(status, Constant.SUCCESS) + self.assertEqual(result["enqueue_data"], mock_enqueue_data) + self.assertEqual(result["dequeue_data"], mock_dequeue_data) + + @patch('torch_npu.profiler.analysis.prof_common_func._path_manager.ProfilerPathManager.get_cann_path', return_value="/mock/cann/path") + def test_torch_op_parser_should_return_success_with_torch_op_data_when_run_with_valid_params(self, mock_get_cann_path): + parser = TorchOpParser("test_torch_op", self.param_dict) + + mock_torch_op_data = [{"name": "torch_op1", "ts": 1000}] + + with patch('torch_npu.profiler.analysis.prof_view.prepare_parse._fwk_pre_parser.ProfilerLogger'): + with patch('torch_npu.profiler.analysis.prof_view.prepare_parse._fwk_pre_parser.FwkFileParser') as mock_parser: + mock_instance = mock_parser.return_value + mock_instance.get_file_data_by_tag.return_value = mock_torch_op_data + + status, result = parser.run({}) + + self.assertEqual(status, Constant.SUCCESS) + self.assertEqual(result, mock_torch_op_data) + + @patch('torch_npu.profiler.analysis.prof_common_func._path_manager.ProfilerPathManager.get_cann_path', return_value="/mock/cann/path") + def test_db_pre_parser_should_return_success_with_fwk_api_data_when_run_with_deps_data(self, mock_get_cann_path): + parser = DbPreParser("test_db_pre", self.param_dict) + + mock_fwk_db_data = {"api_data": [{"name": "api1"}]} + deps_data = { + Constant.TORCH_OP_PARSER: [{"name": "torch_op1"}], + Constant.TASK_QUEUE_PARSER: { + "enqueue_data": [{"name": "enqueue_op1"}], + "dequeue_data": [{"name": "dequeue_op1"}] + } + } + + with patch('torch_npu.profiler.analysis.prof_view.prepare_parse._fwk_pre_parser.ProfilerLogger'): + with patch('torch_npu.profiler.analysis.prof_view.prepare_parse._fwk_pre_parser.FwkFileParser') as mock_parser: + mock_instance = mock_parser.return_value + mock_instance.get_fwk_api.return_value = mock_fwk_db_data + + status, result = parser.run(deps_data) + + self.assertEqual(status, Constant.SUCCESS) + self.assertEqual(result, mock_fwk_db_data) + + +if __name__ == "__main__": + run_tests() diff --git a/test/profiler/test_npu_profiler.py b/test/profiler/test_npu_profiler.py index 7921126b0d3..7f1c0d177d0 100644 --- a/test/profiler/test_npu_profiler.py +++ b/test/profiler/test_npu_profiler.py @@ -321,6 +321,42 @@ class TestNpuProfiler(TestCase): self.assertEqual(True, self._has_view_result(result_dir, work_names[0], self.KERNEL_FILE_NAME)) self.assertEqual(True, self._has_view_result(result_dir, work_names[0], self.OPERATOR_FILE_NAME)) + def test_export_db(self): + from torch_npu.profiler.analysis.prof_common_func._constant import TableColumnsManager + from torch_npu.profiler.analysis.prof_common_func._db_manager import TorchDb + import glob + + worker_name = self.worker_name + prof = torch_npu.profiler.profile( + profile_memory=True, + schedule=torch_npu.profiler.schedule(wait=0, warmup=0, active=1, repeat=1, skip_first=0), + on_trace_ready=torch_npu.profiler.tensorboard_trace_handler(self.results_path, worker_name=worker_name), + experimental_config=torch_npu.profiler._ExperimentalConfig(export_type="db")) + prof.start() + for _ in range(self.small_steps): + self.model_train.train_one_step() + prof.step() + prof.stop() + + output_path = self._get_tensorboard_output(self.results_path, worker_name) + + # Find db file (could be with or without rank_id) + db_files = glob.glob(os.path.join(output_path, "*_pytorch_profiler*.db")) + self.assertEqual(1, len(db_files)) + db_path = db_files[0] + self.assertEqual(True, os.path.exists(db_path)) + + TorchDb().init(db_path) + self.assertEqual(True, TorchDb().create_connect_db()) + + # Verify tables in TableColumns must exist + tables = ['CANN_API', 'STRING_IDS', 'CONNECTION_IDS', 'ENUM_API_TYPE', 'PYTORCH_API', 'MEMORY_RECORD', 'OP_MEMORY'] + + for table_name in tables: + self.assertEqual(True, TorchDb().judge_table_exist(table_name)) + + TorchDb().close() + def _get_tensorboard_output(self, dir_name: str, worker_name: str) -> str: sub_dirs = os.listdir(os.path.realpath(dir_name)) for sub_dir in sub_dirs: diff --git a/torch_npu/profiler/analysis/prof_bean/_memory_use_bean.py b/torch_npu/profiler/analysis/prof_bean/_memory_use_bean.py index 6995478b0de..9768d6c0bfc 100644 --- a/torch_npu/profiler/analysis/prof_bean/_memory_use_bean.py +++ b/torch_npu/profiler/analysis/prof_bean/_memory_use_bean.py @@ -28,63 +28,78 @@ class MemoryEnum(Enum): class MemoryUseBean(CommonBean): CONSTANT_STRUCT = "<7q2b3B2Q" + CONSTANT_UNPACKER = struct.Struct(CONSTANT_STRUCT) NPU_ID = 20 CPU_ID = 0 INNER_ALLOCATOR = 0 def __init__(self, data: dict): super().__init__(data) - self._constant_data = struct.unpack(self.CONSTANT_STRUCT, self._data.get(Constant.CONSTANT_BYTES)) + self._constant_data = self.CONSTANT_UNPACKER.unpack(data.get(Constant.CONSTANT_BYTES)) + self._ptr = self._constant_data[MemoryEnum.PTR.value] + self._stream_ptr = self._constant_data[MemoryEnum.STREAM_PTR.value] + profiler_config = ProfilerConfig() + self._time_ns = profiler_config.get_local_time( + profiler_config.get_timestamp_from_syscnt(self._constant_data[MemoryEnum.TIME_NS.value])) + self._alloc_size = self._constant_data[MemoryEnum.ALLOC_SIZE.value] + self._total_allocated = self._constant_data[MemoryEnum.TOTAL_ALLOCATED.value] + self._total_reserved = self._constant_data[MemoryEnum.TOTAL_RESERVED.value] + self._total_active = self._constant_data[MemoryEnum.TOTAL_ACTIVE.value] + self._device_type = self._constant_data[MemoryEnum.DEVICE_TYPE.value] self._device_index = -1 + self._component_type = self._constant_data[MemoryEnum.COMPONENT_TYPE.value] + self._data_type = self._constant_data[MemoryEnum.DATA_TYPE.value] + self._allocator_type = self._constant_data[MemoryEnum.ALLOCATOR_TYPE.value] + self._thread_id = self._constant_data[MemoryEnum.THREAD_ID.value] + self._process_id = self._constant_data[MemoryEnum.PROCESS_ID.value] @property def ptr(self) -> int: - return int(self._constant_data[MemoryEnum.PTR.value]) + return self._ptr @property def stream_ptr(self) -> int: - return int(self._constant_data[MemoryEnum.STREAM_PTR.value]) + return self._stream_ptr @property def time_ns(self) -> int: - time_ns = ProfilerConfig().get_timestamp_from_syscnt(self._constant_data[MemoryEnum.TIME_NS.value]) - return ProfilerConfig().get_local_time(time_ns) + return self._time_ns @property - def alloc_size(self) -> int: - return int(self._constant_data[MemoryEnum.ALLOC_SIZE.value]) / Constant.B_TO_KB + def alloc_size(self) -> float: + return self._alloc_size / Constant.B_TO_KB @property def alloc_size_for_db(self) -> int: - return int(self._constant_data[MemoryEnum.ALLOC_SIZE.value]) + return self._alloc_size @property - def total_allocated(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_ALLOCATED.value]) / Constant.B_TO_MB + def total_allocated(self) -> float: + return self._total_allocated / Constant.B_TO_MB @property def total_allocated_for_db(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_ALLOCATED.value]) + return self._total_allocated @property - def total_reserved(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_RESERVED.value]) / Constant.B_TO_MB + def total_reserved(self) -> float: + return self._total_reserved / Constant.B_TO_MB @property def total_reserved_for_db(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_RESERVED.value]) + return self._total_reserved @property - def total_active(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_ACTIVE.value]) / Constant.B_TO_MB + def total_active(self) -> float: + return self._total_active / Constant.B_TO_MB @property def total_active_for_db(self) -> int: - return int(self._constant_data[MemoryEnum.TOTAL_ACTIVE.value]) + return self._total_active @property def device_type(self) -> int: - return int(self._constant_data[MemoryEnum.DEVICE_TYPE.value]) + return self._device_type @property def device_index(self) -> int: @@ -97,23 +112,23 @@ class MemoryUseBean(CommonBean): @property def component_type(self) -> int: - return int(self._constant_data[MemoryEnum.COMPONENT_TYPE.value]) + return self._component_type @property def data_type(self) -> int: - return int(self._constant_data[MemoryEnum.DATA_TYPE.value]) + return self._data_type @property def allocator_type(self) -> int: - return int(self._constant_data[MemoryEnum.ALLOCATOR_TYPE.value]) + return self._allocator_type @property def tid(self) -> int: - return int(self._constant_data[MemoryEnum.THREAD_ID.value]) + return self._thread_id @property def pid(self) -> int: - return int(self._constant_data[MemoryEnum.PROCESS_ID.value]) + return self._process_id def is_npu(self) -> bool: return self.device_type == self.NPU_ID diff --git a/torch_npu/profiler/analysis/prof_bean/_op_mark_bean.py b/torch_npu/profiler/analysis/prof_bean/_op_mark_bean.py index 34d95e97fd8..8617799085d 100644 --- a/torch_npu/profiler/analysis/prof_bean/_op_mark_bean.py +++ b/torch_npu/profiler/analysis/prof_bean/_op_mark_bean.py @@ -27,49 +27,64 @@ class OpMarkBean: Constant.NAME: 1 } CONSTANT_STRUCT = " int: + if self._pid is None: + self._pid = self._constant_data[OpMarkEnum.PROCESS_ID.value] return self._pid @property def tid(self) -> int: + if self._tid is None: + self._tid = self._constant_data[OpMarkEnum.THREAD_ID.value] return self._tid @property def time_ns(self) -> int: + if self._time_ns is None: + self._init_time_ns() return self._time_ns @property def corr_id(self) -> int: + if self._corr_id is None: + self._corr_id = self._constant_data[OpMarkEnum.CORRELATION_ID.value] return self._corr_id @property def origin_name(self) -> str: + if self._origin_name is None: + self._origin_name = self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.NAME), "") return self._origin_name @property def name(self) -> str: - if self.is_dequeue_start or self.is_dequeue_end: - return "Dequeue@" + str(self._origin_data[self.TLV_TYPE_DICT.get(Constant.NAME)]) - return "Enqueue" + if self._name is None: + if self.is_dequeue_start or self.is_dequeue_end: + self._name = "Dequeue@" + self.origin_name + else: + self._name = "Enqueue" + return self._name @property def args(self) -> dict: - return {"correlation_id": self.corr_id} + if self._args is None: + self._args = {"correlation_id": self.corr_id} + return self._args @property def is_enqueue_start(self) -> bool: @@ -114,3 +129,9 @@ class OpMarkBean: @dur.setter def dur(self, dur: int): self._dur = dur + + def _init_time_ns(self): + profiler_config = ProfilerConfig() + syscnt = self._constant_data[OpMarkEnum.TIME_NS.value] + self._time_ns = profiler_config.get_local_time( + profiler_config.get_timestamp_from_syscnt(syscnt)) diff --git a/torch_npu/profiler/analysis/prof_bean/_torch_op_bean.py b/torch_npu/profiler/analysis/prof_bean/_torch_op_bean.py index de415599df9..859f68b40e0 100644 --- a/torch_npu/profiler/analysis/prof_bean/_torch_op_bean.py +++ b/torch_npu/profiler/analysis/prof_bean/_torch_op_bean.py @@ -32,10 +32,14 @@ class TorchOpBean: Constant.FLOPS: 10 } CONSTANT_STRUCT = "<3q4QB?" + CONSTANT_UNPACKER = struct.Struct(CONSTANT_STRUCT) + + REPLACE_FIELDS = {Constant.INPUT_SHAPES, Constant.INPUT_DTYPES, Constant.CALL_STACK} + SKIP_FIELDS = {Constant.OP_NAME, Constant.INPUT_TENSORS, Constant.INPUT_TENSORLISTS, Constant.INPUT_SCALARS} def __init__(self, data: dict): self._origin_data = data - self._constant_data = struct.unpack(self.CONSTANT_STRUCT, data.get(Constant.CONSTANT_BYTES)) + self._constant_data = self.CONSTANT_UNPACKER.unpack(data.get(Constant.CONSTANT_BYTES)) self._kernel_list = [] self._pid = None self._tid = None @@ -44,81 +48,101 @@ class TorchOpBean: self._end_ns = None self._call_stack = None self._args = None - self.init() + self._inputs = None + self._scope = None + self._dur = None @property def pid(self) -> int: + if self._pid is None: + self._pid = self._constant_data[TorchOpEnum.PROCESS_ID.value] return self._pid @property def tid(self) -> int: + if self._tid is None: + self._tid = self._constant_data[TorchOpEnum.START_THREAD_ID.value] return self._tid @property def name(self) -> str: + if self._name is None: + self._name = str(self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.OP_NAME), "")) return self._name @property def ts(self) -> int: + if self._start_ns is None: + self._init_timestamps() return self._start_ns @property def dur(self) -> int: - return int(self._end_ns) - int(self._start_ns) + if self._dur is None: + if self._start_ns is None or self._end_ns is None: + self._init_timestamps() + self._dur = self._end_ns - self._start_ns + return self._dur @property def end_ns(self): + if self._end_ns is None: + self._init_timestamps() return self._end_ns @property def call_stack(self): + if self._call_stack is None: + self._call_stack = self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.CALL_STACK), "").replace(";", ";\r\n") return self._call_stack @property def inputs(self): + if self._inputs is None: + self._inputs = { + Constant.INPUT_TENSORS: self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.INPUT_TENSORS)), + Constant.INPUT_TENSORLISTS: self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.INPUT_TENSORLISTS)), + Constant.INPUT_SCALARS: self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.INPUT_SCALARS)) + } return self._inputs - + @property def scope(self): + if self._scope is None: + self._scope = self._constant_data[TorchOpEnum.SCOPE.value] return self._scope @property def args(self): + if self._args is None: + self._args = self.get_args() return self._args @property def is_torch_op(self): return True + + def _init_timestamps(self): + profiler_config = ProfilerConfig() + start_syscnt = self._constant_data[TorchOpEnum.START_NS.value] + end_syscnt = self._constant_data[TorchOpEnum.END_NS.value] - def init(self): - self._pid = int(self._constant_data[TorchOpEnum.PROCESS_ID.value]) - self._tid = int(self._constant_data[TorchOpEnum.START_THREAD_ID.value]) - self._name = str(self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.OP_NAME), "")) - self._start_ns = ProfilerConfig().get_local_time( - ProfilerConfig().get_timestamp_from_syscnt(self._constant_data[TorchOpEnum.START_NS.value])) - self._end_ns = ProfilerConfig().get_local_time( - ProfilerConfig().get_timestamp_from_syscnt(self._constant_data[TorchOpEnum.END_NS.value])) - self._scope = int(self._constant_data[TorchOpEnum.SCOPE.value]) - self._call_stack = self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.CALL_STACK), "").replace(";", ";\r\n") - self._args = self.get_args() - self._inputs = { - Constant.INPUT_TENSORS: self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.INPUT_TENSORS)), - Constant.INPUT_TENSORLISTS: self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.INPUT_TENSORLISTS)), - Constant.INPUT_SCALARS: self._origin_data.get(self.TLV_TYPE_DICT.get(Constant.INPUT_SCALARS))} + self._start_ns = profiler_config.get_local_time(profiler_config.get_timestamp_from_syscnt(start_syscnt)) + self._end_ns = profiler_config.get_local_time(profiler_config.get_timestamp_from_syscnt(end_syscnt)) def get_args(self) -> dict: args = { - Constant.SEQUENCE_NUMBER: int(self._constant_data[TorchOpEnum.SEQUENCE_NUMBER.value]), - Constant.FORWARD_THREAD_ID: int( - self._constant_data[TorchOpEnum.FORWARD_THREAD_ID.value])} + Constant.SEQUENCE_NUMBER: self._constant_data[TorchOpEnum.SEQUENCE_NUMBER.value], + Constant.FORWARD_THREAD_ID: self._constant_data[TorchOpEnum.FORWARD_THREAD_ID.value] + } + origin_keys = self._origin_data.keys() for type_name, type_id in self.TLV_TYPE_DICT.items(): - if type_name in [Constant.OP_NAME, Constant.INPUT_TENSORS, - Constant.INPUT_TENSORLISTS, Constant.INPUT_SCALARS]: - continue - if type_id not in self._origin_data.keys(): + if type_name in self.SKIP_FIELDS or type_id not in origin_keys: continue - if type_name in [Constant.INPUT_SHAPES, Constant.INPUT_DTYPES, Constant.CALL_STACK]: - args[type_name] = self._origin_data.get(type_id).replace(";", ";\r\n") + + value = self._origin_data[type_id] + if type_name in self.REPLACE_FIELDS: + args[type_name] = value.replace(";", ";\r\n") if value else "" else: - args[type_name] = self._origin_data.get(type_id) + args[type_name] = value return args diff --git a/torch_npu/profiler/analysis/prof_common_func/_constant.py b/torch_npu/profiler/analysis/prof_common_func/_constant.py index 37abe8cf045..35b04eaa550 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_constant.py +++ b/torch_npu/profiler/analysis/prof_common_func/_constant.py @@ -1,6 +1,7 @@ import os import time from typing import Union +from enum import Enum from torch_npu.utils._error_code import ErrCode, prof_error from torch_npu.utils import _should_print_warning @@ -182,6 +183,9 @@ class Constant(object): FAIL = 1 # parser name + TASK_QUEUE_PARSER = "task_queue" + TORCH_OP_PARSER = "torch_op" + DB_PRE_PARSER = "db_prepare" TRACE_PRE_PARSER = "trace_prepare" TREE_BUILD_PARSER = "build_tree" CANN_EXPORT_PARSER = "export" @@ -208,7 +212,7 @@ class Constant(object): TRACE_STEP_TIME_DB_PARSER = "trace_step_time_db" GC_RECORD_DB_PARSER = "gc_record_db" - TRACE_VIEW_TEMP = "trace_view_temp.json" + TRACE_VIEW = "trace_view.json" # db data type SQL_TEXT_TYPE = "TEXT" @@ -216,6 +220,13 @@ class Constant(object): SQL_NUMERIC_TYPE = "NUMERIC" SQL_REAL_TYPE = "REAL" + # data name + ENQUEUE_DATA = "enqueue_data" + DEQUEUE_DATA = "dequeue_data" + TORCH_OP_DATA = "torch_op_data" + PYTHON_TRACE_DATA = "python_trace_data" + MSTX_OP_DATA = "mstx_op_data" + def print_info_msg(message: str): current_time = time.localtime() @@ -284,6 +295,7 @@ def contact_2num(high_num: int, low_num: int) -> int: class DbConstant(): # db invalid value DB_INVALID_VALUE = 4294967295 + DB_INVALID_CONNECTION_ID = -1 # db name DB_ASCEND_PYTORCH_PROFILER = "ascend_pytorch_profiler.db" @@ -336,6 +348,9 @@ class DbConstant(): START_STRING_ID_FWK_API = 1 << 28 START_STRING_ID_MEMORY = 2 << 28 + # pytorch start connection id + START_CONNECTION_ID_FWK_API = 5 << 28 + class TableColumnsManager(): TableColumns = { @@ -478,3 +493,45 @@ class TableColumnsManager(): ("globalTid", Constant.SQL_INTEGER_TYPE) ] } + + +class ApiType: + TORCH_OP = 50001 + TASK_QUEUE = 50002 + PYTHON_TRACE = 50003 + MSTX_OP = 50004 + + +class TorchOpDataOri: + START_NS = 0 + END_NS = 1 + GLOBAL_TID = 2 + CONNECTION_ID = 3 + NAME = 4 + SEQUENCE_NUM = 5 + FWD_THREAD_ID = 6 + INPUT_DIMS = 7 + INPUT_SHAPES = 8 + CALL_STACK = 9 + + +class TaskQueueDataOri: + START_NS = 0 + END_NS = 1 + GLOBAL_TID = 2 + CORRELATION_ID = 3 + NAME = 4 + + +class PythonTraceApiDataOri: + START_NS = 0 + END_NS = 1 + GLOBAL_TID = 2 + NAME = 3 + + +class CannNodeLaunchApiOri: + START_NS = 0 + END_NS = 1 + GLOBAL_TID = 2 + CORRELATION_ID = 3 diff --git a/torch_npu/profiler/analysis/prof_common_func/_id_manager.py b/torch_npu/profiler/analysis/prof_common_func/_id_manager.py index 7c78cf5d034..f2f61c951f3 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_id_manager.py +++ b/torch_npu/profiler/analysis/prof_common_func/_id_manager.py @@ -46,10 +46,13 @@ class ConnectionIdManager: self._connecion_id_map[self._curr_id] = connection_ids self._curr_id += 1 return res_id - + def get_all_connection_ids(self) -> dict: return self._connecion_id_map + def get_connection_ids_from_id(self, source_id: int) -> list: + return self._connecion_id_map.get(source_id, [DbConstant.DB_INVALID_CONNECTION_ID]) + @Singleton class CallChainIdManager: diff --git a/torch_npu/profiler/analysis/prof_common_func/_log.py b/torch_npu/profiler/analysis/prof_common_func/_log.py index 0fecde48c41..7054fb13eec 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_log.py +++ b/torch_npu/profiler/analysis/prof_common_func/_log.py @@ -23,7 +23,7 @@ class ProfilerLogger: BACKUP_COUNT: Number of backup files to keep """ - LOG_FORMAT = "[%(asctime)s] [%(levelname)s] [%(name)s:%(lineno)d] %(message)s" + LOG_FORMAT = "[%(asctime)s.%(msecs)03d] [%(levelname)s] [%(name)s:%(lineno)d] %(message)s" DATE_FORMAT = "%Y-%m-%d-%H:%M:%S" DEFAULT_LOGGER_NAME = "AscendProfiler" DEFAULT_LOG_LEVEL = logging.INFO diff --git a/torch_npu/profiler/analysis/prof_common_func/_task_manager.py b/torch_npu/profiler/analysis/prof_common_func/_task_manager.py index a618e2122a8..cf914b17a88 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_task_manager.py +++ b/torch_npu/profiler/analysis/prof_common_func/_task_manager.py @@ -13,6 +13,7 @@ from abc import ABC, abstractmethod from torch_npu.utils._error_code import ErrCode, prof_error from ._constant import print_error_msg, Constant +from ._log import ProfilerLogger __all__ = [] @@ -110,6 +111,9 @@ class TaskInfo: self.handler = None self.pipe = (-1, -1) self.recv_buffer = None + self.start_time = None + self.end_time = None + self.execution_time = None class ConcurrentTasksManager: @@ -124,6 +128,7 @@ class ConcurrentTasksManager: self.epoll = None self.max_concurrent_num = max_concurrent_num self.progress_bar = progress_bar + self.logger = None def add_task(self, task): if not isinstance(task, ConcurrentTask): @@ -154,6 +159,7 @@ class ConcurrentTasksManager: print_error_msg(f"An error occurred: {e}") finally: self.finalize() + self.log_task_execution_summary() def finalize(self): for task_info in self.task_infos.values(): @@ -185,6 +191,7 @@ class ConcurrentTasksManager: def __run_one_task(self, task_info): task_info.status = TaskStatus.Running + task_info.start_time = time.time() if (task_info.task.mode & ConcurrentMode.SUB_PROCESS) != 0: self.__run_in_subprocess(task_info) elif (task_info.task.mode & ConcurrentMode.PTHREAD) != 0: @@ -259,6 +266,9 @@ class ConcurrentTasksManager: def __on_task_done(self, task_info, ret_code, output): """ be called when task.run is finish(listening thread receives ret_code) """ + task_info.end_time = time.time() + task_info.execution_time = task_info.end_time - task_info.start_time + if ret_code == 0: task_info.status = TaskStatus.Succeed if output is not None: @@ -422,3 +432,53 @@ class ConcurrentTasksManager: def __del__(self): self.clear() + + def __get_mode_string(self, mode): + modes = [] + if mode & ConcurrentMode.MAIN_PROCESS: + modes.append("MAIN_PROCESS") + if mode & ConcurrentMode.SUB_PROCESS: + modes.append("SUB_PROCESS") + if mode & ConcurrentMode.PTHREAD: + modes.append("PTHREAD") + if mode & ConcurrentMode.NON_BLOCKING: + modes.append("NON_BLOCKING") + return "|".join(modes) if modes else "UNKNOWN" + + def get_task_execution_summary(self): + summary = [] + for task_name, task_info in self.task_infos.items(): + if task_info.execution_time is not None: + mode_str = self.__get_mode_string(task_info.task.mode) + status_str = task_info.status.name + deps_names = [dep for dep in task_info.task.deps] + deps_str = ", ".join(deps_names) if deps_names else "None" + + summary.append({ + 'task_name': task_name, + 'mode': mode_str, + 'status': status_str, + 'execution_time': task_info.execution_time, + 'start_time': task_info.start_time, + 'end_time': task_info.end_time, + 'deps': deps_str + }) + + return summary + + def log_task_execution_summary(self): + self.logger = ProfilerLogger.get_instance() + summary = self.get_task_execution_summary() + + self.logger.info("=" * 60) + self.logger.info("Task execution completed") + self.logger.info("=" * 60) + + for task in summary: + self.logger.info( + f"{task['task_name']:<25} | " + f"{task['mode']:<15} | " + f"{task['status']:<10} | " + f"{task['execution_time']:.3f}s | " + f"deps: {task['deps']}" + ) diff --git a/torch_npu/profiler/analysis/prof_common_func/_tree_builder.py b/torch_npu/profiler/analysis/prof_common_func/_tree_builder.py index 06ec8b7d6b0..9ecd9d8b6ac 100644 --- a/torch_npu/profiler/analysis/prof_common_func/_tree_builder.py +++ b/torch_npu/profiler/analysis/prof_common_func/_tree_builder.py @@ -10,7 +10,7 @@ class TreeBuilder: @classmethod def build_tree(cls, event_list: list, enqueue_list: list) -> TorchOpNode: all_node_list = [None] * (len(event_list) + 1) - event_list.extend(enqueue_list) + event_list = event_list + enqueue_list event_list.sort(key=lambda x: x.ts) root_node = TorchOpNode() last_node = root_node diff --git a/torch_npu/profiler/analysis/prof_config/_parser_config.py b/torch_npu/profiler/analysis/prof_config/_parser_config.py index 986ff3bb30d..8ad26ed7cb0 100644 --- a/torch_npu/profiler/analysis/prof_config/_parser_config.py +++ b/torch_npu/profiler/analysis/prof_config/_parser_config.py @@ -17,7 +17,6 @@ from ..prof_common_func._constant import Constant from ..prof_view.cann_parse._cann_analyze import CANNAnalyzeParser from ..prof_view.cann_parse._cann_export import CANNExportParser, CANNTimelineParser from ..prof_view._memory_prepare_parser import MemoryPrepareParser -from ..prof_view.prepare_parse._fwk_pre_parser import TracePreParser, TreeBuildParser from ..prof_view._kernel_view_parser import KernelViewParser from ..prof_view._operator_view_parser import OperatorViewParser from ..prof_view.prepare_parse._relation_parser import RelationParser @@ -28,7 +27,14 @@ from ..prof_view._memory_view_parser import MemoryViewParser from ..prof_view._integrate_parser import IntegrateParser from ..prof_view._communication_parser import CommunicationParser from ..prof_view._memory_timeline_parser import MemoryTimelineParser -from ..prof_view.prof_db_parse._db_parser import DbParser +from ..prof_view.prof_db_parse._db_parser import DbParser +from ..prof_view.prepare_parse._fwk_pre_parser import ( + TracePreParser, + TreeBuildParser, + TaskQueueParser, + TorchOpParser, + DbPreParser +) __all__ = [] @@ -37,6 +43,8 @@ class ParserConfig: LEVEL_NONE_CONFIG = { Constant.Text: { Constant.TENSORBOARD_TRACE_HANDLER: [ + TaskQueueParser, + TorchOpParser, TracePreParser, TreeBuildParser, CANNExportParser, @@ -52,6 +60,9 @@ class ParserConfig: }, Constant.Db: { Constant.TENSORBOARD_TRACE_HANDLER: [ + TorchOpParser, + TaskQueueParser, + DbPreParser, CANNExportParser, CANNTimelineParser, CANNAnalyzeParser, @@ -65,6 +76,8 @@ class ParserConfig: COMMON_CONFIG = { Constant.Text: { Constant.TENSORBOARD_TRACE_HANDLER: [ + TorchOpParser, + TaskQueueParser, TracePreParser, TreeBuildParser, CANNExportParser, @@ -80,13 +93,17 @@ class ParserConfig: IntegrateParser, CommunicationParser ], - Constant.EXPORT_CHROME_TRACE: [TracePreParser, TreeBuildParser, CANNExportParser, CANNTimelineParser, - TraceViewParser], - Constant.EXPORT_STACK: [TreeBuildParser, CANNExportParser, CANNTimelineParser, StackViewParser], + Constant.EXPORT_CHROME_TRACE: [TorchOpParser, TaskQueueParser, TracePreParser, TreeBuildParser, CANNExportParser, + CANNTimelineParser, TraceViewParser], + Constant.EXPORT_STACK: [TorchOpParser, TaskQueueParser, TreeBuildParser, CANNExportParser, + CANNTimelineParser, StackViewParser], Constant.EXPORT_MEMORY_TIMELINE: [MemoryTimelineParser] }, Constant.Db: { Constant.TENSORBOARD_TRACE_HANDLER: [ + TorchOpParser, + TaskQueueParser, + DbPreParser, CANNExportParser, CANNTimelineParser, CANNAnalyzeParser, @@ -99,18 +116,21 @@ class ParserConfig: ONLY_FWK_CONFIG = { Constant.Text: { - Constant.TENSORBOARD_TRACE_HANDLER: [OperatorViewParser, TraceViewParser, MemoryViewParser], - Constant.EXPORT_CHROME_TRACE: [TraceViewParser], - Constant.EXPORT_STACK: [StackViewParser], + Constant.TENSORBOARD_TRACE_HANDLER: [TorchOpParser, TaskQueueParser, MemoryPrepareParser, OperatorViewParser, TraceViewParser, + MemoryViewParser], + Constant.EXPORT_CHROME_TRACE: [TorchOpParser, TaskQueueParser, TraceViewParser], + Constant.EXPORT_STACK: [TorchOpParser, TaskQueueParser, StackViewParser], Constant.EXPORT_MEMORY_TIMELINE: [MemoryTimelineParser] }, Constant.Db: { - Constant.TENSORBOARD_TRACE_HANDLER: [CANNExportParser, DbParser] + Constant.TENSORBOARD_TRACE_HANDLER: [TorchOpParser, TaskQueueParser, DbPreParser, MemoryPrepareParser, DbParser] } } PARSER_NAME_MAP = { # text parser + TorchOpParser: Constant.TORCH_OP_PARSER, + TaskQueueParser: Constant.TASK_QUEUE_PARSER, TracePreParser: Constant.TRACE_PRE_PARSER, TreeBuildParser: Constant.TREE_BUILD_PARSER, CANNExportParser: Constant.CANN_EXPORT_PARSER, @@ -129,5 +149,6 @@ class ParserConfig: MemoryTimelineParser: Constant.MEMORY_TIMELINE_PARSER, # db parser + DbPreParser: Constant.DB_PRE_PARSER, DbParser: Constant.DB_PARSER, } diff --git a/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py b/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py index e61ecc36900..2afd5874b4d 100644 --- a/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py +++ b/torch_npu/profiler/analysis/prof_config/_parser_deps_config.py @@ -21,27 +21,32 @@ __all__ = [] class ParserDepsConfig: COMMON_CONFIG = { - Constant.TRACE_PRE_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: []}, - Constant.TREE_BUILD_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: []}, + Constant.TORCH_OP_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: []}, + Constant.TASK_QUEUE_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: []}, + Constant.TRACE_PRE_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, + Constant.DEPS: [Constant.TORCH_OP_PARSER, Constant.TASK_QUEUE_PARSER]}, + Constant.TREE_BUILD_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.TORCH_OP_PARSER, Constant.TASK_QUEUE_PARSER]}, Constant.CANN_EXPORT_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: []}, Constant.CANN_TIMELINE_PARSER: {Constant.MODE: ConcurrentMode.NON_BLOCKING | ConcurrentMode.PTHREAD, Constant.DEPS: []}, Constant.RELATION_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, - Constant.DEPS: [Constant.CANN_TIMELINE_PARSER]}, + Constant.DEPS: [Constant.TASK_QUEUE_PARSER, Constant.CANN_TIMELINE_PARSER]}, Constant.CANN_ANALYZE_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.CANN_TIMELINE_PARSER]}, Constant.OPERATOR_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_TIMELINE_PARSER, - Constant.RELATION_PARSER]}, + Constant.RELATION_PARSER, Constant.TORCH_OP_PARSER]}, Constant.TRACE_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.TRACE_PRE_PARSER, - Constant.CANN_TIMELINE_PARSER]}, + Constant.CANN_TIMELINE_PARSER, Constant.TASK_QUEUE_PARSER, + Constant.TORCH_OP_PARSER]}, Constant.KERNEL_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_EXPORT_PARSER, Constant.RELATION_PARSER]}, Constant.TRACE_STEP_TIME_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_TIMELINE_PARSER, - Constant.RELATION_PARSER]}, + Constant.RELATION_PARSER, Constant.TORCH_OP_PARSER]}, Constant.MEMORY_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.CANN_EXPORT_PARSER, Constant.MEMORY_PREPARE]}, Constant.INTEGRATE_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, @@ -50,21 +55,32 @@ class ParserDepsConfig: Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_ANALYZE_PARSER, Constant.RELATION_PARSER]}, Constant.STACK_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, - Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_TIMELINE_PARSER]}, + Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.CANN_TIMELINE_PARSER, + Constant.TASK_QUEUE_PARSER, Constant.TORCH_OP_PARSER]}, Constant.MEMORY_PREPARE: {Constant.MODE: ConcurrentMode.PTHREAD, - Constant.DEPS: [Constant.TREE_BUILD_PARSER]}, + Constant.DEPS: [Constant.TREE_BUILD_PARSER, Constant.TASK_QUEUE_PARSER, + Constant.TORCH_OP_PARSER]}, + Constant.DB_PRE_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.TORCH_OP_PARSER, Constant.TASK_QUEUE_PARSER]}, Constant.DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.CANN_EXPORT_PARSER, Constant.MEMORY_PREPARE, - Constant.TREE_BUILD_PARSER, Constant.CANN_ANALYZE_PARSER]}, + Constant.TREE_BUILD_PARSER, Constant.CANN_ANALYZE_PARSER, + Constant.DB_PRE_PARSER]}, Constant.MEMORY_TIMELINE_PARSER: {} } ONLY_FWK_CONFIG = { - Constant.OPERATOR_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: []}, - Constant.TRACE_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: []}, - Constant.MEMORY_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: []}, - Constant.STACK_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: []}, + Constant.TORCH_OP_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: []}, + Constant.TASK_QUEUE_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: []}, + Constant.OPERATOR_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TORCH_OP_PARSER]}, + Constant.TRACE_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TASK_QUEUE_PARSER, Constant.TORCH_OP_PARSER]}, + Constant.MEMORY_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.MEMORY_PREPARE]}, + Constant.STACK_VIEW_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: [Constant.TORCH_OP_PARSER, Constant.TASK_QUEUE_PARSER]}, Constant.CANN_EXPORT_PARSER: {Constant.MODE: ConcurrentMode.SUB_PROCESS, Constant.DEPS: []}, - Constant.DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.CANN_EXPORT_PARSER]}, + Constant.DB_PRE_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.TORCH_OP_PARSER, Constant.TASK_QUEUE_PARSER]}, + Constant.MEMORY_PREPARE: {Constant.MODE: ConcurrentMode.PTHREAD, + Constant.DEPS: [Constant.TASK_QUEUE_PARSER, Constant.TORCH_OP_PARSER]}, + Constant.DB_PARSER: {Constant.MODE: ConcurrentMode.PTHREAD, Constant.DEPS: [Constant.DB_PRE_PARSER, Constant.MEMORY_PREPARE]}, Constant.MEMORY_TIMELINE_PARSER: {} } diff --git a/torch_npu/profiler/analysis/prof_parse/_event_tree_parser.py b/torch_npu/profiler/analysis/prof_parse/_event_tree_parser.py index cf7d3c356d5..e74e994ba7c 100644 --- a/torch_npu/profiler/analysis/prof_parse/_event_tree_parser.py +++ b/torch_npu/profiler/analysis/prof_parse/_event_tree_parser.py @@ -303,8 +303,8 @@ class _ProfilerEvent: self.children: List['_ProfilerEvent'] = [] if isinstance(bean, TorchOpBean): self.tag = _EventType.TorchOp - self.tid = bean._tid - self.start_time_ns = bean._start_ns + self.tid = bean.tid + self.start_time_ns = bean.ts self.extra_fields = _ExtraFields_TorchOp(bean) elif isinstance(bean, MemoryUseBean): self.tag = _EventType.Allocation diff --git a/torch_npu/profiler/analysis/prof_parse/_fwk_cann_relation_parser.py b/torch_npu/profiler/analysis/prof_parse/_fwk_cann_relation_parser.py index dc7142738ab..9a22c3ec277 100644 --- a/torch_npu/profiler/analysis/prof_parse/_fwk_cann_relation_parser.py +++ b/torch_npu/profiler/analysis/prof_parse/_fwk_cann_relation_parser.py @@ -48,7 +48,7 @@ class FwkCANNRelationParser: break index += 1 - def get_kernel_dict(self) -> dict: + def get_kernel_dict(self, dequeue_data: list) -> dict: acl_to_npu_dict = CANNFileParser(self._profiler_path).get_acl_to_npu_data() if not acl_to_npu_dict and ProfilerConfig().get_level() != Constant.LEVEL_NONE: error_msg = ( @@ -57,8 +57,7 @@ class FwkCANNRelationParser: ) print_error_msg(error_msg) return acl_to_npu_dict - dequeue_data_list = FwkFileParser(self._profiler_path).get_dequeue_data() - return self.combine_kernel_dict(acl_to_npu_dict, dequeue_data_list) + return self.combine_kernel_dict(acl_to_npu_dict, dequeue_data) def get_step_range(self, root_node: TorchOpNode, kernel_dict: dict): if not kernel_dict: diff --git a/torch_npu/profiler/analysis/prof_parse/_fwk_file_parser.py b/torch_npu/profiler/analysis/prof_parse/_fwk_file_parser.py index 09771da81d3..ba79447f939 100644 --- a/torch_npu/profiler/analysis/prof_parse/_fwk_file_parser.py +++ b/torch_npu/profiler/analysis/prof_parse/_fwk_file_parser.py @@ -4,7 +4,7 @@ from collections import defaultdict from ..prof_bean._torch_op_bean import TorchOpBean from ..prof_common_func._binary_decoder import BinaryDecoder -from ..prof_common_func._constant import Constant, contact_2num +from ..prof_common_func._constant import Constant, contact_2num, DbConstant from ..prof_common_func._file_manager import FileManager from ..prof_common_func._file_tag import FileTag from ..prof_common_func._path_manager import ProfilerPathManager @@ -12,8 +12,10 @@ from ..prof_common_func._tlv_decoder import TLVDecoder from ..prof_common_func._trace_event_manager import TraceEventManager from ..prof_common_func._tree_builder import TreeBuilder from ..prof_common_func._log import ProfilerLogger +from ..prof_common_func._id_manager import Str2IdManager, ConnectionIdManager, CallChainIdManager from ..prof_config._fwk_file_parser_config import FwkFileParserConfig from ._python_trace_parser import PythonTraceParser +from ..prof_common_func._constant import ApiType __all__ = [] @@ -40,62 +42,6 @@ class FwkFileParser: else: return BinaryDecoder.decode(all_bytes, file_bean, struct_size) - def get_enqueue_data(self) -> list: - enqueue_data_list = [] - op_mark_data = self.get_file_data_by_tag(FileTag.OP_MARK) - if not op_mark_data: - self.logger.error("Get enqueue data failed, the op mark data is empty.") - return enqueue_data_list - op_mark_data.sort(key=lambda x: x.time_ns) - tid_op_dict = defaultdict(lambda: defaultdict(list)) - match_failed_num = 0 - for op_mark in op_mark_data: - if not op_mark.is_enqueue: - continue - if op_mark.is_enqueue_start: - tid_op_dict[op_mark.tid][op_mark.origin_name].append(op_mark) - continue - start_op_list = tid_op_dict.get(op_mark.tid, {}).get(op_mark.origin_name, []) - if not start_op_list: - match_failed_num += 1 - continue - start_op = start_op_list.pop() - op_mark.ts = start_op.time_ns - op_mark.dur = op_mark.time_ns - start_op.time_ns - enqueue_data_list.append(op_mark) - start_op_list.clear() - if match_failed_num: - self.logger.warning(f"{match_failed_num} enqueue data match failed.") - return enqueue_data_list - - def get_dequeue_data(self) -> list: - dequeue_data_list = [] - op_mark_data = self.get_file_data_by_tag(FileTag.OP_MARK) - if not op_mark_data: - self.logger.error("Get dequeue data failed, the op mark data is empty.") - return dequeue_data_list - op_mark_data.sort(key=lambda x: x.time_ns) - tid_op_dict = defaultdict(lambda: defaultdict(list)) - match_failed_num = 0 - for op_mark in op_mark_data: - if not op_mark.is_dequeue: - continue - if op_mark.is_dequeue_start: - tid_op_dict[op_mark.tid][op_mark.origin_name].append(op_mark) - continue - start_op_list = tid_op_dict.get(op_mark.tid, {}).get(op_mark.origin_name, []) - if not start_op_list: - match_failed_num += 1 - continue - start_op = start_op_list.pop() - op_mark.ts = start_op.time_ns - op_mark.dur = op_mark.time_ns - start_op.time_ns - dequeue_data_list.append(op_mark) - start_op_list.clear() - if match_failed_num: - self.logger.warning(f"{match_failed_num} enqueue data match failed.") - return dequeue_data_list - def get_task_queue_data(self) -> any: enqueue_data_list, dequeue_data_list = [], [] op_mark_data = self.get_file_data_by_tag(FileTag.OP_MARK) @@ -139,20 +85,16 @@ class FwkFileParser: self.logger.warning(f"{dequeue_match_failed_num} dequeue data match failed.") return enqueue_data_list, dequeue_data_list - def get_torch_op_tree_node(self, only_fwk: bool = False) -> list: - torch_op_list = self.get_file_data_by_tag(FileTag.TORCH_OP) - if not torch_op_list: + def get_torch_op_tree_node(self, torch_op_data: list, enqueue_data: list = None) -> list: + if not torch_op_data: self.logger.error("Get torch op tree node failed, the torch op data is empty.") return [] - enqueue_data_list = [] - if not only_fwk: - enqueue_data_list = self.get_enqueue_data() - result_data = TreeBuilder.build_tree(torch_op_list, enqueue_data_list) + if enqueue_data is None: + enqueue_data = [] + result_data = TreeBuilder.build_tree(torch_op_data, enqueue_data) return result_data - def get_fwk_trace_data(self): - torch_op_data = self.get_file_data_by_tag(FileTag.TORCH_OP) - enqueue_data_list, dequeue_data_list = self.get_task_queue_data() + def get_fwk_trace_data(self, torch_op_data: list, enqueue_data_list: list, dequeue_data_list: list) -> list: if torch_op_data: pid = torch_op_data[0].pid elif enqueue_data_list or dequeue_data_list: @@ -164,7 +106,7 @@ class FwkFileParser: fwk_x_event_list = [None] * ( len(torch_op_data) + len(enqueue_data_list) * 2 + len(dequeue_data_list) * 2) index = 0 - fwd_dict = {} + fwd_dict = defaultdict(dict) correlation_id_name_dict = {} for torch_op in torch_op_data: self.filter_fwd_bwd_event(fwd_dict, torch_op) @@ -201,11 +143,13 @@ class FwkFileParser: def get_python_trace_data(self, torch_tids: set) -> list: trace_hash_data = self.get_file_data_by_tag(FileTag.PYTHON_TRACER_HASH) func_call_data = self.get_file_data_by_tag(FileTag.PYTHON_TRACER_FUNC) + if not (trace_hash_data and func_call_data): + return [] python_trace_parser = PythonTraceParser(torch_tids, trace_hash_data, func_call_data) return python_trace_parser.get_python_trace_data() @classmethod - def filter_fwd_bwd_event(cls, fwd_dict: dict, torch_op: TorchOpBean): + def filter_fwd_bwd_event(cls, fwd_dict: defaultdict, torch_op: TorchOpBean): seq_num = torch_op.args.get("Sequence number", -1) if seq_num < 0: return @@ -213,7 +157,7 @@ class FwkFileParser: mode = "start" if torch_op.args.get("Fwd thread id") == 0 else "end" if fwd_event.get(mode, {}).get("ts", -float('inf')) < torch_op.ts: node = {mode: {'pid': torch_op.pid, 'tid': torch_op.tid, 'ts': torch_op.ts}} - fwd_dict.setdefault(seq_num, {}).update(node) + fwd_dict[seq_num].update(node) def has_task_queue_data(self): return bool(self._file_list.get(FileTag.OP_MARK)) @@ -243,14 +187,12 @@ class FwkFileParser: if node.get('start') and node.get('end'): fwb_op_id = node['start']['idx'] bwd_op_id = node['end']['idx'] - torch_op_apis[fwb_op_id][3].append(start_connection_id) - torch_op_apis[bwd_op_id][3].append(start_connection_id) + torch_op_apis[fwb_op_id][3].append(start_connection_id + DbConstant.START_CONNECTION_ID_FWK_API) + torch_op_apis[bwd_op_id][3].append(start_connection_id + DbConstant.START_CONNECTION_ID_FWK_API) start_connection_id += 1 - def get_fwk_api(self) -> dict: - torch_op_data = self.get_file_data_by_tag(FileTag.TORCH_OP) - enqueue_data_list, dequeue_data_list = self.get_task_queue_data() + def get_fwk_api(self, torch_op_data: list, enqueue_data_list: list, dequeue_data_list: list) -> dict: if torch_op_data: pid = torch_op_data[0].pid elif enqueue_data_list or dequeue_data_list: @@ -259,35 +201,31 @@ class FwkFileParser: self.logger.error("Get fwk api data failed, framework data is empty.") return {} + Str2IdManager().set_start_id(DbConstant.START_STRING_ID_FWK_API) + connection_id_manager = ConnectionIdManager() + str2id_manager = Str2IdManager() + call_chain_id_manager = CallChainIdManager() + + connection_ids = [] + task_enqueues = [] + task_dequeues = [] + correlation_id_name_dict = {} + torch_op_apis = [] fwd_bwd_dict = {} torch_op_idx = 0 mstx_mark_apis = [] + python_trace_apis = [] torch_tids = set() - for torch_op in torch_op_data: - api = [torch_op.ts, torch_op.end_ns, contact_2num(pid, torch_op.tid), [], torch_op.name, - torch_op.args.get(Constant.SEQUENCE_NUMBER, -1), torch_op.args.get(Constant.FORWARD_THREAD_ID), - torch_op.args.get(Constant.INPUT_DTYPES), torch_op.args.get(Constant.INPUT_SHAPES), - torch_op.call_stack] - if torch_op.name == "mstx_mark_op": - mstx_mark_apis.append(api) - else: - torch_op_apis.append(api) - self.filter_fwd_bwd_api(fwd_bwd_dict, torch_op, torch_op_idx) - torch_op_idx += 1 - torch_tids.add(torch_op.tid) - - connection_ids = [] - task_enqueues = [] - task_dequeues = [] - correlation_id_name_dict = {} for dequeue_data in dequeue_data_list: task_dequeues.append( [dequeue_data.ts, dequeue_data.ts + dequeue_data.dur, contact_2num(pid, dequeue_data.tid), - dequeue_data.corr_id, dequeue_data.name]) + connection_id_manager.get_id_from_connection_ids([dequeue_data.corr_id + DbConstant.START_CONNECTION_ID_FWK_API]), str2id_manager.get_id_from_str(dequeue_data.name), + None, None, None, None, None, ApiType.TASK_QUEUE]) correlation_id_name_dict[dequeue_data.corr_id] = dequeue_data.origin_name torch_tids.add(dequeue_data.tid) + for enqueue_data in enqueue_data_list: name = enqueue_data.name if enqueue_data.corr_id in correlation_id_name_dict: @@ -295,28 +233,45 @@ class FwkFileParser: name += f"@{correlation_id_name_dict[enqueue_data.corr_id]}" task_enqueues.append( [enqueue_data.ts, enqueue_data.ts + enqueue_data.dur, contact_2num(pid, enqueue_data.tid), - enqueue_data.corr_id, name]) + connection_id_manager.get_id_from_connection_ids([enqueue_data.corr_id + DbConstant.START_CONNECTION_ID_FWK_API]), str2id_manager.get_id_from_str(name), + None, None, None, None, None, ApiType.TASK_QUEUE]) connection_ids.append(enqueue_data.corr_id) torch_tids.add(enqueue_data.tid) + for torch_op in torch_op_data: + api = [torch_op.ts, torch_op.end_ns, contact_2num(pid, torch_op.tid), [], str2id_manager.get_id_from_str(torch_op.name), + torch_op.args.get(Constant.SEQUENCE_NUMBER, -1), torch_op.args.get(Constant.FORWARD_THREAD_ID), + None if not torch_op.args.get(Constant.INPUT_DTYPES) else str2id_manager.get_id_from_str(torch_op.args.get(Constant.INPUT_DTYPES)), + None if not torch_op.args.get(Constant.INPUT_SHAPES) else str2id_manager.get_id_from_str(torch_op.args.get(Constant.INPUT_SHAPES)), + None if not torch_op.args.get(Constant.CALL_STACK) else call_chain_id_manager.get_callchain_id_from_callstack(torch_op.args.get(Constant.CALL_STACK)), + ApiType.TORCH_OP] + if torch_op.name == "mstx_mark_op": + mstx_mark_apis.append(api) + else: + torch_op_apis.append(api) + self.filter_fwd_bwd_api(fwd_bwd_dict, torch_op, torch_op_idx) + torch_op_idx += 1 + torch_tids.add(torch_op.tid) + start_connection_id = max(connection_ids) + 1 if connection_ids else 0 self.update_fwd_bwd_connection_id(fwd_bwd_dict, torch_op_apis, start_connection_id) trace_hash_data = self.get_file_data_by_tag(FileTag.PYTHON_TRACER_HASH) func_call_data = self.get_file_data_by_tag(FileTag.PYTHON_TRACER_FUNC) - python_trace_parser = PythonTraceParser(torch_tids, trace_hash_data, func_call_data) - python_trace_apis = python_trace_parser.get_python_trace_api_data() - return {"torch_op": torch_op_apis, "task_enqueues": task_enqueues, "task_dequeues": task_dequeues, - "python_trace": python_trace_apis, "mstx_op": mstx_mark_apis} + if trace_hash_data and func_call_data: + python_trace_parser = PythonTraceParser(torch_tids, trace_hash_data, func_call_data) + python_trace_apis = python_trace_parser.get_python_trace_api_data() + return {Constant.TORCH_OP_DATA: torch_op_apis, Constant.ENQUEUE_DATA: task_enqueues, Constant.DEQUEUE_DATA: task_dequeues, + Constant.PYTHON_TRACE_DATA: python_trace_apis, Constant.MSTX_OP_DATA: mstx_mark_apis} - def get_first_fwk_op(self): - torch_op_data = self.get_file_data_by_tag(FileTag.TORCH_OP) + def get_first_fwk_op(self, torch_op_data: list): if not torch_op_data: return None return min(torch_op_data, key=lambda op: op.ts) - def get_torch_op_tids(self): - torch_op_data = self.get_file_data_by_tag(FileTag.TORCH_OP) + def get_torch_op_tids(self, torch_op_data: list = None): + if not torch_op_data: + torch_op_data = self.get_file_data_by_tag(FileTag.TORCH_OP) if not torch_op_data: return set() return {op.tid for op in torch_op_data} diff --git a/torch_npu/profiler/analysis/prof_parse/_python_trace_parser.py b/torch_npu/profiler/analysis/prof_parse/_python_trace_parser.py index 378423e825d..2f0107e5166 100644 --- a/torch_npu/profiler/analysis/prof_parse/_python_trace_parser.py +++ b/torch_npu/profiler/analysis/prof_parse/_python_trace_parser.py @@ -1,7 +1,8 @@ from collections import defaultdict from enum import Enum -from ..prof_common_func._constant import contact_2num +from ..prof_common_func._constant import contact_2num, ApiType from ..prof_common_func._trace_event_manager import TraceEventManager +from ..prof_common_func._id_manager import Str2IdManager __all__ = [] @@ -145,8 +146,10 @@ class PythonTraceParser: if not trace_event_list: return [] trace_api_data = [None] * len(trace_event_list) + str2id_manager = Str2IdManager() for i, event in enumerate(trace_event_list): - trace_api_data[i] = [event.ts, event.ts + event.dur, contact_2num(event.pid, event.tid), event.name] + trace_api_data[i] = [event.ts, event.ts + event.dur, contact_2num(event.pid, event.tid), None, + str2id_manager.get_id_from_str(event.name), None, None, None, None, None, ApiType.PYTHON_TRACE] return trace_api_data def get_pycall_data(self) -> list: diff --git a/torch_npu/profiler/analysis/prof_view/_communication_parser.py b/torch_npu/profiler/analysis/prof_view/_communication_parser.py index e07f68b785b..d0b9ab4bf79 100644 --- a/torch_npu/profiler/analysis/prof_view/_communication_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_communication_parser.py @@ -63,12 +63,14 @@ class CommunicationParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "CommunicationParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("CommunicationParser start.") try: self._init_step_list(deps_data) self.generate_view() except Exception as e: self.logger.error("Failed to generate communication.json or communication_matrix.json, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("CommunicationParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: diff --git a/torch_npu/profiler/analysis/prof_view/_integrate_parser.py b/torch_npu/profiler/analysis/prof_view/_integrate_parser.py index 28472a24117..1c296c95342 100644 --- a/torch_npu/profiler/analysis/prof_view/_integrate_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_integrate_parser.py @@ -30,12 +30,14 @@ class IntegrateParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "IntegrateParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("IntegrateParser start.") try: ProfilerConfig().load_info(self._profiler_path) self.generate_view() except Exception as e: self.logger.error("Failed to generate data_preprocess.csv or l2_cache.csv, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("IntegrateParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: diff --git a/torch_npu/profiler/analysis/prof_view/_kernel_view_parser.py b/torch_npu/profiler/analysis/prof_view/_kernel_view_parser.py index ded9a612c6c..b913f5ac3c1 100644 --- a/torch_npu/profiler/analysis/prof_view/_kernel_view_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_kernel_view_parser.py @@ -35,6 +35,7 @@ class KernelViewParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "KernelViewParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("KernelViewParser start.") try: ProfilerConfig().load_info(self._profiler_path) self._init_step_range(deps_data) @@ -42,6 +43,7 @@ class KernelViewParser(BaseParser): except Exception as e: self.logger.error("Failed to generate kernel_details.csv, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("KernelViewParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: diff --git a/torch_npu/profiler/analysis/prof_view/_memory_prepare_parser.py b/torch_npu/profiler/analysis/prof_view/_memory_prepare_parser.py index 0e74f688c03..2e3e97bf2c9 100644 --- a/torch_npu/profiler/analysis/prof_view/_memory_prepare_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_memory_prepare_parser.py @@ -39,9 +39,12 @@ class MemoryPrepareParser(BaseParser): super().__init__(name, param_dict) self.pta_record_list = [] self.memory_data = dict() + self._torch_op_data = [] self._torch_op_node = [] self._incomplete_num = 0 self._is_malloc_workspace_in_dequeue_enabled = False + self._enqueue_data = [] + self._dequeue_data = [] self._dequeue_record_dict = defaultdict(list) # {(pid, tid): [dequeue_records]} self._enqueue_record_dict = {} # {corrid: enqueue} self._dequeue_pids = set() @@ -62,19 +65,26 @@ class MemoryPrepareParser(BaseParser): return left def run(self, deps_data: dict): + self.logger.info("MemoryPrepareParser start.") try: self._torch_op_node = deps_data.get(Constant.TREE_BUILD_PARSER, []) + self._torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + task_queue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}) + self._enqueue_data = task_queue_data.get(Constant.ENQUEUE_DATA, []) + self._dequeue_data = task_queue_data.get(Constant.DEQUEUE_DATA, []) self.generate_view() except Exception as e: self.logger.error("Failed to generate pytorch memory data, error: %s", str(e), exc_info=True) return Constant.FAIL, {} if self._incomplete_num > 0: print_warn_msg(f"{self._incomplete_num} memory record(s) are incomplete.") + self.logger.info("MemoryPrepareParser finish.") return Constant.SUCCESS, {"pta_record_list": self.pta_record_list, "memory_data": self.memory_data} def generate_view(self) -> None: ProfilerConfig().load_info(self._profiler_path) self._init_torch_op() + self._init_queue_info() self._add_pta_memory_data() def _find_matched_torch_op_name(self, mem_start_ts: int, torch_ops: list) -> str: @@ -88,35 +98,30 @@ class MemoryPrepareParser(BaseParser): return matched_torch_op.name def _init_queue_info(self): - enqueue_records = FwkFileParser(self._profiler_path).get_enqueue_data() - for enqueue_record in enqueue_records: - self._enqueue_record_dict[enqueue_record.corr_id] = enqueue_record - dequeue_records = FwkFileParser(self._profiler_path).get_dequeue_data() - for dequeue_record in dequeue_records: - self._dequeue_pids.add(dequeue_record.pid) - self._dequeue_tids.add(dequeue_record.tid) - key = (dequeue_record.pid, dequeue_record.tid) - self._dequeue_record_dict.setdefault(key, []).append(dequeue_record) + self._enqueue_record_dict = {record.corr_id: record for record in self._enqueue_data} + for record in self._dequeue_data: + self._dequeue_pids.add(record.pid) + self._dequeue_tids.add(record.tid) + self._dequeue_record_dict[(record.pid, record.tid)].append(record) def _add_pta_memory_data(self): - self._init_queue_info() pta_memory_data = FwkFileParser(self._profiler_path).get_file_data_by_tag(FileTag.MEMORY) - npu_memory_dict = {} - torch_op_dict = {} - pta_memory_data = sorted(pta_memory_data, key=lambda x: x.time_ns) + npu_memory_dict = defaultdict(list) + torch_op_dict = defaultdict(list) + pta_memory_data.sort(key=lambda x: x.time_ns) for record in pta_memory_data: if record.is_npu(): if record.is_inner_allocator(): - npu_memory_dict.setdefault(record.pid, []).append(record) + npu_memory_dict[record.pid].append(record) self.pta_record_list.append(record) for torch_op in self._torch_op_node: - torch_op_dict.setdefault(torch_op.pid, []).append(torch_op) + torch_op_dict[torch_op.pid].append(torch_op) for pid_key, memory_records in npu_memory_dict.items(): torch_ops = torch_op_dict.get(pid_key, []) if not torch_ops: warn(f"Lack of torch ops to connect memory record, whose process id is {pid_key}") continue - torch_ops = sorted(torch_ops, key=lambda x: x.start_time) + torch_ops.sort(key=lambda x: x.start_time) memory_dict = defaultdict(list) for record in memory_records: memory_dict[record.ptr].append(record) @@ -241,7 +246,7 @@ class MemoryPrepareParser(BaseParser): active_duration_time, records[0].total_allocated, records[0].total_reserved, records[0].total_active, records[free_idx].total_allocated, records[free_idx].total_reserved, records[free_idx].total_active, records[0].stream_ptr, device_tag or records[0].device_tag] - ret_list.append(combine_data[:]) + ret_list.append(combine_data) return ret_list def _complete_record_entry_for_db(self, ptr_records: list, torch_ops: list) -> list: @@ -287,11 +292,11 @@ class MemoryPrepareParser(BaseParser): active_duration_time, records[0].total_allocated_for_db, records[0].total_reserved_for_db, records[0].total_active_for_db, records[free_idx].total_allocated_for_db, records[free_idx].total_reserved_for_db, records[free_idx].total_active_for_db, records[0].stream_ptr, device_index if device_index != -1 else records[0].device_index] - ret_list.append(combine_data[:]) + ret_list.append(combine_data) return ret_list def _init_torch_op(self): if not ProfilerPathManager.get_cann_path(self._profiler_path): - self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(only_fwk=True) + self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(self._torch_op_data) if self._torch_op_node: self._torch_op_node = self._torch_op_node[1:] diff --git a/torch_npu/profiler/analysis/prof_view/_memory_view_parser.py b/torch_npu/profiler/analysis/prof_view/_memory_view_parser.py index c061b38f878..34f4918939b 100644 --- a/torch_npu/profiler/analysis/prof_view/_memory_view_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_memory_view_parser.py @@ -75,6 +75,7 @@ class MemoryViewParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "MemoryViewParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("MemoryViewParser start.") try: self.memory_data = deps_data.get(Constant.MEMORY_PREPARE, {}).get("memory_data", {}).get(Constant.Text, []) self.pta_record_list = deps_data.get(Constant.MEMORY_PREPARE, {}).get("pta_record_list", []) @@ -85,10 +86,10 @@ class MemoryViewParser(BaseParser): except Exception as e: self.logger.error("Failed to generate operator_memory.csv or memory_record.csv, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("MemoryViewParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: - self._init_pta_data() self._add_memory_from_cann() self._add_pta_ge_record_data() FileManager.create_csv_file(self._output_path, self.memory_data, self.OPERATOR_MEMORY, self.HEADERS_OPERATOR) @@ -158,11 +159,3 @@ class MemoryViewParser(BaseParser): ge_op_memory_file = CANNFileParser(self._profiler_path).get_file_list_by_type(CANNDataEnum.GE_OPERATOR_MEMORY) \ if Constant.NPU_ACTIVITIES in self._activities else set() self.memory_data.extend(self._get_data_from_file(ge_op_memory_file, GeOpMemoryBean)) - - def _init_pta_data(self): - if not ProfilerPathManager.get_cann_path(self._profiler_path): - torch_nop_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(only_fwk=True) - deps_data = {Constant.TREE_BUILD_PARSER: torch_nop_node} - _, pta_data = MemoryPrepareParser(Constant.MEMORY_PREPARE, self._param_dict).run(deps_data) - self.memory_data = pta_data.get("memory_data", {}).get(Constant.Text, []) - self.pta_record_list = pta_data.get("pta_record_list", []) diff --git a/torch_npu/profiler/analysis/prof_view/_operator_view_parser.py b/torch_npu/profiler/analysis/prof_view/_operator_view_parser.py index 7c10e9d4bf4..a75625fa1c9 100644 --- a/torch_npu/profiler/analysis/prof_view/_operator_view_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_operator_view_parser.py @@ -20,19 +20,23 @@ class OperatorViewParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) self._torch_op_node = [] + self._torch_op_data = [] self._root_node = None self._kernel_dict = {} def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "OperatorViewParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("OperatorViewParser start.") try: self._torch_op_node = deps_data.get(Constant.TREE_BUILD_PARSER, []) self._kernel_dict = deps_data.get(Constant.RELATION_PARSER, {}) + self._torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) self.generate_view() except Exception as e: self.logger.error("Failed to generate operator_details.csv, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("OperatorViewParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: @@ -70,7 +74,7 @@ class OperatorViewParser(BaseParser): def _init_torch_op(self): if not ProfilerPathManager.get_cann_path(self._profiler_path): - self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(only_fwk=True) + self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(self._torch_op_data) if self._torch_op_node: self._root_node = self._torch_op_node[0] self._torch_op_node = self._torch_op_node[1:] diff --git a/torch_npu/profiler/analysis/prof_view/_stack_view_parser.py b/torch_npu/profiler/analysis/prof_view/_stack_view_parser.py index b4a85271d99..6c7dce2b8cd 100644 --- a/torch_npu/profiler/analysis/prof_view/_stack_view_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_stack_view_parser.py @@ -20,6 +20,8 @@ class StackViewParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) self._torch_op_node = [] + self._torch_op_data = [] + self._dequeue_data = [] self._root_node = None self._kernel_dict = {} self._metric = param_dict.get("metric") @@ -27,12 +29,16 @@ class StackViewParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "StackViewParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("StackViewParser start.") try: self._torch_op_node = deps_data.get(Constant.TREE_BUILD_PARSER, []) + self._torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + self._dequeue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}).get(Constant.DEQUEUE_DATA, []) self.generate_view() except Exception as e: self.logger.error("Failed to export stack, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("StackViewParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: @@ -70,14 +76,14 @@ class StackViewParser(BaseParser): def _init_data(self): if not ProfilerPathManager.get_cann_path(self._profiler_path): - self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(only_fwk=True) + self._torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(self._torch_op_data) if not self._torch_op_node: return self._root_node = self._torch_op_node[0] self._torch_op_node = self._torch_op_node[1:] if self._metric == Constant.METRIC_NPU_TIME: - self._kernel_dict = FwkCANNRelationParser(self._profiler_path).get_kernel_dict() + self._kernel_dict = FwkCANNRelationParser(self._profiler_path).get_kernel_dict(self._dequeue_data) if not FwkFileParser(self._profiler_path).has_task_queue_data(): for acl_ts in self._kernel_dict.keys(): TreeBuilder.update_tree_node_info(acl_ts, self._root_node) diff --git a/torch_npu/profiler/analysis/prof_view/_trace_step_time_parser.py b/torch_npu/profiler/analysis/prof_view/_trace_step_time_parser.py index 46093bec4e8..ae21211563d 100644 --- a/torch_npu/profiler/analysis/prof_view/_trace_step_time_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_trace_step_time_parser.py @@ -51,6 +51,7 @@ class TraceStepTimeParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) self.step_range = [] + self.torch_op_data = [] @classmethod def is_float_num(cls, num): @@ -102,7 +103,7 @@ class TraceStepTimeParser(BaseParser): if cur_step[_StepInfoIndex.ID.value] == step: first_task_start_ts = cur_step[_StepInfoIndex.FIRST_TASK_TS.value] if step is None: - first_fwk_op = FwkFileParser(self._profiler_path).get_first_fwk_op() + first_fwk_op = FwkFileParser(self._profiler_path).get_first_fwk_op(self.torch_op_data) return (first_task_start_ts - convert_ns2us_float(first_fwk_op.ts)) if first_fwk_op else 0 return first_task_start_ts - cur_step[_StepInfoIndex.FWK_START_TS.value] return 0 @@ -165,12 +166,15 @@ class TraceStepTimeParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "TraceStepTimeParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("TraceStepTimeParser start.") try: self._init_step_range(deps_data) + self.torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) self.generate_view() except Exception as e: self.logger.error("Failed to generate step_trace_time.csv, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("TraceStepTimeParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: diff --git a/torch_npu/profiler/analysis/prof_view/_trace_view_parser.py b/torch_npu/profiler/analysis/prof_view/_trace_view_parser.py index c5e572e1bcf..dc03cf7adc1 100644 --- a/torch_npu/profiler/analysis/prof_view/_trace_view_parser.py +++ b/torch_npu/profiler/analysis/prof_view/_trace_view_parser.py @@ -22,10 +22,13 @@ class TraceViewParser(BaseParser): super().__init__(name, param_dict) self._trace_file_path = os.path.join(self._output_path, self.TRACE_VIEW) if os.path.isdir( self._output_path) else self._output_path - self._temp_trace_file_path = os.path.join(self._output_path, Constant.TRACE_VIEW_TEMP) if os.path.isdir( + self._temp_trace_file_path = os.path.join(self._output_path, Constant.TRACE_VIEW) if os.path.isdir( self._output_path) else self._output_path self._trace_data = [] self._torch_op_node = [] + self._torch_op_data = [] + self._enqueue_data = [] + self._dequeue_data = [] self._root_node = None @staticmethod @@ -47,21 +50,28 @@ class TraceViewParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "TraceViewParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("TraceViewParser start.") try: ProfilerConfig().load_info(self._profiler_path) torch_op_node = deps_data.get(Constant.TREE_BUILD_PARSER, []) if torch_op_node: self._root_node = torch_op_node[0] self._torch_op_node = torch_op_node[1:] + self._torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + task_queue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}) + self._enqueue_data = task_queue_data.get(Constant.ENQUEUE_DATA, []) + self._dequeue_data = task_queue_data.get(Constant.DEQUEUE_DATA, []) self.generate_view() except Exception as e: self.logger.error("Failed to generate trace_view.json, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("TraceViewParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: if not ProfilerPathManager.get_cann_path(self._profiler_path): - self._trace_data = FwkFileParser(self._profiler_path).get_fwk_trace_data() + self._trace_data = FwkFileParser(self._profiler_path).get_fwk_trace_data( + self._torch_op_data, self._enqueue_data, self._dequeue_data) else: msprof_timeline_data = CANNFileParser(self._profiler_path).get_timeline_all_data() self._trace_data.extend( @@ -86,8 +96,7 @@ class TraceViewParser(BaseParser): flow_event_list.extend( TraceEventManager.create_torch_to_npu_flow(matched_torch_op.event, kernel)) return flow_event_list - dequeue_data_list = FwkFileParser(self._profiler_path).get_dequeue_data() - kernel_dict = FwkCANNRelationParser.combine_kernel_dict(acl_to_npu_dict, dequeue_data_list) + kernel_dict = FwkCANNRelationParser.combine_kernel_dict(acl_to_npu_dict, self._dequeue_data) for torch_op_node in self._torch_op_node: for corr_id in torch_op_node.corr_id_self: kernel_list = kernel_dict.get(corr_id, []) diff --git a/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_analyze.py b/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_analyze.py index da8037f982b..90db88e0142 100644 --- a/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_analyze.py +++ b/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_analyze.py @@ -38,6 +38,7 @@ class CANNAnalyzeParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "CANNAnalyzeParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("CANNAnalyzeParser start.") try: ProfilerConfig().load_info(self._profiler_path) if not os.path.isdir(self._cann_path): @@ -63,4 +64,5 @@ class CANNAnalyzeParser(BaseParser): print_error_msg("Failed to analyze CANN Profiling data.") self.logger.error("Failed to analyze CANN Profiling data, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("CANNAnalyzeParser finish.") return Constant.SUCCESS, None diff --git a/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_export.py b/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_export.py index 0595358190a..12a61d724e3 100644 --- a/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_export.py +++ b/torch_npu/profiler/analysis/prof_view/cann_parse/_cann_export.py @@ -48,6 +48,7 @@ class CANNExportParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "CANNExportParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("CANNExportParser start.") try: ProfilerConfig().load_info(self._profiler_path) if not os.path.isdir(self._cann_path): @@ -75,6 +76,7 @@ class CANNExportParser(BaseParser): return Constant.FAIL, None end_time = datetime.utcnow() print_info_msg(f"CANN profiling data parsed in a total time of {end_time - start_time}") + self.logger.info("CANNExportParser finish.") return Constant.SUCCESS, None def _check_msprof_environment(self): @@ -149,6 +151,9 @@ class CANNTimelineParser(BaseParser): self._cann_path = ProfilerPathManager.get_cann_path(self._profiler_path) def run(self, deps_data: dict): + ProfilerLogger.init(self._profiler_path, "CANNTimelineParser") + self.logger = ProfilerLogger.get_instance() + self.logger.info("CANNTimelineParser start.") if not os.path.isdir(self._cann_path): return Constant.SUCCESS, None ProfilerConfig().load_info(self._profiler_path) @@ -158,6 +163,7 @@ class CANNTimelineParser(BaseParser): if os.path.exists(output_path): for file_name in os.listdir(output_path): if file_name.endswith('.csv'): + self.logger.info("CANNTimelineParser finish.") return Constant.SUCCESS, None try: time.sleep(Constant.SLEEP_TIME) @@ -168,6 +174,7 @@ class CANNTimelineParser(BaseParser): while True: for file in os.listdir(self._cann_path): if re.match(patten, file) and os.path.isfile(os.path.join(self._cann_path, file)): + self.logger.info("CANNTimelineParser finish.") return Constant.SUCCESS, None try: time.sleep(Constant.SLEEP_TIME) diff --git a/torch_npu/profiler/analysis/prof_view/prepare_parse/_fwk_pre_parser.py b/torch_npu/profiler/analysis/prof_view/prepare_parse/_fwk_pre_parser.py index 939e06cf748..ba84960e5c9 100644 --- a/torch_npu/profiler/analysis/prof_view/prepare_parse/_fwk_pre_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prepare_parse/_fwk_pre_parser.py @@ -19,7 +19,8 @@ from ...prof_common_func._constant import Constant from ...prof_common_func._file_manager import FileManager from ...prof_common_func._log import ProfilerLogger from ...prof_parse._fwk_file_parser import FwkFileParser -from .._base_parser import BaseParser +from ...prof_view._base_parser import BaseParser +from ...prof_common_func._file_tag import FileTag __all__ = [] @@ -32,14 +33,21 @@ class TracePreParser(BaseParser): def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "TracePreParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("TracePreParser start.") try: - fwk_trace_data = FwkFileParser(self._profiler_path).get_fwk_trace_data() - trace_file_path = os.path.join(self._output_path, Constant.TRACE_VIEW_TEMP) if os.path.isdir( + torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + task_queue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}) + enqueue_data = task_queue_data.get(Constant.ENQUEUE_DATA, []) + dequeue_data = task_queue_data.get(Constant.DEQUEUE_DATA, []) + fwk_trace_data = FwkFileParser(self._profiler_path).get_fwk_trace_data( + torch_op_data, enqueue_data, dequeue_data) + trace_file_path = os.path.join(self._output_path, Constant.TRACE_VIEW) if os.path.isdir( self._output_path) else self._output_path FileManager.create_prepare_trace_json_by_path(trace_file_path, fwk_trace_data) except Exception as e: self.logger.error("Failed to create prepare trace json, error: %s", str(e), exc_info=True) return Constant.FAIL, None + self.logger.info("TracePreParser finish.") return Constant.SUCCESS, None @@ -51,9 +59,72 @@ class TreeBuildParser(BaseParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("TreeBuildParser start.") try: - torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node() + enqueue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}).get(Constant.ENQUEUE_DATA, []) + torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + torch_op_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(torch_op_data, enqueue_data) except Exception as e: self.logger.error("Failed to build torch op tree, error: %s", str(e), exc_info=True) return Constant.FAIL, [] + self.logger.info("TreeBuildParser finish.") return Constant.SUCCESS, torch_op_node + + +class TaskQueueParser(BaseParser): + + def __init__(self, name: str, param_dict: dict): + super().__init__(name, param_dict) + + def run(self, deps_data: dict): + ProfilerLogger.init(self._profiler_path, "TaskQueueParser") + self.logger = ProfilerLogger.get_instance() + self.logger.info("TaskQueueParser start.") + try: + enqueue_data, dequeue_data = FwkFileParser(self._profiler_path).get_task_queue_data() + except Exception as e: + self.logger.error("Failed to get task queue data, error: %s", str(e), exc_info=True) + return Constant.FAIL, {} + self.logger.info("TaskQueueParser finish.") + return Constant.SUCCESS, {Constant.ENQUEUE_DATA: enqueue_data, Constant.DEQUEUE_DATA: dequeue_data} + + +class TorchOpParser(BaseParser): + + def __init__(self, name: str, param_dict: dict): + super().__init__(name, param_dict) + + def run(self, deps_data: dict): + ProfilerLogger.init(self._profiler_path, "TorchOpParser") + self.logger = ProfilerLogger.get_instance() + self.logger.info("TorchOpParser start.") + try: + torch_op_data = FwkFileParser(self._profiler_path).get_file_data_by_tag(FileTag.TORCH_OP) + except Exception as e: + self.logger.error("Failed to get torch op tree, error: %s", str(e), exc_info=True) + return Constant.FAIL, [] + self.logger.info("TorchOpParser finish.") + return Constant.SUCCESS, torch_op_data + + +class DbPreParser(BaseParser): + + def __init__(self, name: str, param_dict: dict): + super().__init__(name, param_dict) + + def run(self, deps_data: dict): + ProfilerLogger.init(self._profiler_path, "DbPreParser") + self.logger = ProfilerLogger.get_instance() + self.logger.info("DbPreParser start.") + try: + torch_op_data = deps_data.get(Constant.TORCH_OP_PARSER, []) + task_queue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}) + enqueue_data = task_queue_data.get(Constant.ENQUEUE_DATA, []) + dequeue_data = task_queue_data.get(Constant.DEQUEUE_DATA, []) + fwk_db_data = FwkFileParser(self._profiler_path).get_fwk_api( + torch_op_data, enqueue_data, dequeue_data) + except Exception as e: + self.logger.error("Failed to create prepare db data, error: %s", str(e), exc_info=True) + return Constant.FAIL, None + self.logger.info("DbPreParser finish.") + return Constant.SUCCESS, fwk_db_data diff --git a/torch_npu/profiler/analysis/prof_view/prepare_parse/_relation_parser.py b/torch_npu/profiler/analysis/prof_view/prepare_parse/_relation_parser.py index 5e8a941de28..713677f9dd9 100644 --- a/torch_npu/profiler/analysis/prof_view/prepare_parse/_relation_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prepare_parse/_relation_parser.py @@ -23,13 +23,17 @@ __all__ = [] class RelationParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) + self._dequeue_data = [] def run(self, deps_data: dict): ProfilerLogger.init(self._profiler_path, "RelationParser") self.logger = ProfilerLogger.get_instance() + self.logger.info("RelationParser start.") try: - kernel_dict = FwkCANNRelationParser(self._profiler_path).get_kernel_dict() + self._dequeue_data = deps_data.get(Constant.TASK_QUEUE_PARSER, {}).get(Constant.DEQUEUE_DATA, []) + kernel_dict = FwkCANNRelationParser(self._profiler_path).get_kernel_dict(self._dequeue_data) except Exception as e: self.logger.error("Failed to get acl to npu flow dict, error: %s", str(e), exc_info=True) return Constant.FAIL, {} + self.logger.info("RelationParser finish.") return Constant.SUCCESS, kernel_dict diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_basic_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_basic_db_parser.py index 938a1c93855..965bbc3c093 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_basic_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_basic_db_parser.py @@ -24,6 +24,7 @@ class BasicDbParser(BaseParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("BasicDbParser start.") try: cann_db_path = self.get_cann_db_path() if cann_db_path: @@ -36,6 +37,7 @@ class BasicDbParser(BaseParser): except Exception as error: self.logger.error("Failed to generate basic db file. Error: %s", str(error), exc_info=True) return Constant.FAIL, "" + self.logger.info("BasicDbParser finish.") return Constant.SUCCESS, "" def get_cann_db_path(self): diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_communication_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_communication_db_parser.py index af72fdcdf25..62bff9c9f07 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_communication_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_communication_db_parser.py @@ -77,6 +77,7 @@ class CommunicationDbParser(CommunicationParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("CommunicationDbParser start.") try: self._init_step_list(deps_data) self.generate_view() @@ -84,6 +85,7 @@ class CommunicationDbParser(CommunicationParser): self.logger.error("Failed to generate communication table, error: %s", str(error), exc_info=True) DbManager.destroy_db_connect(self.cann_comm_db_conn, self.cann_comm_db_curs) return Constant.FAIL, None + self.logger.info("CommunicationDbParser finish.") return Constant.SUCCESS, None def _init_step_list(self, deps_data: dict): diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_db_parser.py index 89cc322980a..83cc061c6a9 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_db_parser.py @@ -37,6 +37,7 @@ class DbParser(BaseParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("DbParser start.") ProfilerConfig().load_info(self._profiler_path) torch_db_path = DbConstant.DB_ASCEND_PYTORCH_PROFILER if ProfilerConfig().rank_id != -1: @@ -60,4 +61,5 @@ class DbParser(BaseParser): finally: TorchDb().close() AnalysisDb().close() + self.logger.info("DbParser finish.") return Constant.SUCCESS, "" diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_fwk_api_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_fwk_api_db_parser.py index 2ae2ac6474e..98ea10ea68a 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_fwk_api_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_fwk_api_db_parser.py @@ -1,153 +1,90 @@ -from enum import Enum from ...prof_common_func._db_manager import TorchDb -from ...prof_common_func._id_manager import Str2IdManager, ConnectionIdManager, CallChainIdManager -from ...prof_common_func._constant import Constant, DbConstant, TableColumnsManager from .._base_parser import BaseParser from ...prof_common_func._log import ProfilerLogger -from ...prof_parse._fwk_file_parser import FwkFileParser +from ...prof_common_func._id_manager import Str2IdManager, ConnectionIdManager, CallChainIdManager +from ...prof_common_func._constant import ( + Constant, + DbConstant, + TableColumnsManager, + ApiType, + TorchOpDataOri, + TaskQueueDataOri, + CannNodeLaunchApiOri +) __all__ = [] -class ApiType(Enum): - TORCH_OP = 50001 - TASK_QUEUE = 50002 - PYTHON_TRACE = 50003 - MSTX_OP = 50004 - - -class TorchOpDataOri(Enum): - START_NS = 0 - END_NS = 1 - GLOBAL_TID = 2 - CONNECTION_ID = 3 - NAME = 4 - SEQUENCE_NUM = 5 - FWD_THREAD_ID = 6 - INPUT_DIMS = 7 - INPUT_SHAPES = 8 - CALL_STACK = 9 - - -class TaskQueueDataOri(Enum): - START_NS = 0 - END_NS = 1 - GLOBAL_TID = 2 - CORRELATION_ID = 3 - NAME = 4 - - -class PythonTraceApiDataOri(Enum): - START_NS = 0 - END_NS = 1 - GLOBAL_TID = 2 - NAME = 3 - - -class CannNodeLaunchApiOri(Enum): - START_NS = 0 - END_NS = 1 - GLOBAL_TID = 2 - CORRELATION_ID = 3 - - class FwkApiDbParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) - self._max_cann_connection_id = 0 self._fwk_apis = [] ProfilerLogger.init(self._profiler_path, "FwkApiDbParser") self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("FwkApiDbParser start.") try: self.init_db_connect() self.set_start_string_id() - self.get_max_cann_id() - fwk_api_data = FwkFileParser(self._profiler_path).get_fwk_api() + fwk_api_data = deps_data.get(Constant.DB_PRE_PARSER, {}) + self.logger.info("FwkApiDbParser get fwk api data finish.") self.get_api_data_for_db(fwk_api_data) + self.logger.info("FwkApiDbParser get api data for db finish.") self.save_api_data_to_db() + self.logger.info("FwkApiDbParser save api data to db finish.") except Exception as error: self.logger.error("Failed to generate framework api table, error: %s", str(error), exc_info=True) return Constant.FAIL, None + self.logger.info("FwkApiDbParser finish.") return Constant.SUCCESS, None def get_api_data_for_db(self, fwk_api_data: dict): if not fwk_api_data: return - task_enqueues = fwk_api_data.get("task_enqueues", []) - task_dequeues = fwk_api_data.get("task_dequeues", []) - for enqueue in task_enqueues: - self._fwk_apis.append([enqueue[TaskQueueDataOri.START_NS.value], - enqueue[TaskQueueDataOri.END_NS.value], - enqueue[TaskQueueDataOri.GLOBAL_TID.value], - ConnectionIdManager().get_id_from_connection_ids( - [enqueue[TaskQueueDataOri.CORRELATION_ID.value] + self._max_cann_connection_id]), - Str2IdManager().get_id_from_str(enqueue[TaskQueueDataOri.NAME.value]), - None, None, None, None, None, ApiType.TASK_QUEUE.value]) - for dequeue in task_dequeues: - self._fwk_apis.append([dequeue[TaskQueueDataOri.START_NS.value], - dequeue[TaskQueueDataOri.END_NS.value], - dequeue[TaskQueueDataOri.GLOBAL_TID.value], - ConnectionIdManager().get_id_from_connection_ids( - [dequeue[TaskQueueDataOri.CORRELATION_ID.value] + self._max_cann_connection_id]), - Str2IdManager().get_id_from_str(dequeue[TaskQueueDataOri.NAME.value]), - None, None, None, None, None, ApiType.TASK_QUEUE.value]) - python_trace_apis = fwk_api_data.get("python_trace", []) - for python_trace_api in python_trace_apis: - self._fwk_apis.append([python_trace_api[PythonTraceApiDataOri.START_NS.value], - python_trace_api[PythonTraceApiDataOri.END_NS.value], - python_trace_api[PythonTraceApiDataOri.GLOBAL_TID.value], - None, - Str2IdManager().get_id_from_str(python_trace_api[PythonTraceApiDataOri.NAME.value]), - None, None, None, None, None, ApiType.PYTHON_TRACE.value]) - torch_op_apis = fwk_api_data.get("torch_op", []) - if not torch_op_apis: - return - # update torch op api connection id if there is cann api - if self._max_cann_connection_id != 0: - for torch_op_api in torch_op_apis: - # update torch op api inner connection id, include fwd_bwd id - if torch_op_api[TorchOpDataOri.CONNECTION_ID.value]: - torch_op_api[TorchOpDataOri.CONNECTION_ID.value] = [conn_id + self._max_cann_connection_id for conn_id in torch_op_api[TorchOpDataOri.CONNECTION_ID.value]] + task_enqueues = fwk_api_data.get(Constant.ENQUEUE_DATA, []) + task_dequeues = fwk_api_data.get(Constant.DEQUEUE_DATA, []) + torch_op_apis = fwk_api_data.get(Constant.TORCH_OP_DATA, []) + python_trace_apis = fwk_api_data.get(Constant.PYTHON_TRACE_DATA, []) + mstx_mark_apis = fwk_api_data.get(Constant.MSTX_OP_DATA, []) + self._fwk_apis = python_trace_apis + task_enqueues + task_dequeues + + if TorchDb().judge_table_exist(DbConstant.TABLE_CANN_API): self.get_torch_op_connection_ids_with_cann_api(task_enqueues, task_dequeues, torch_op_apis) + + # update connection id for torch op + connectionId_manager = ConnectionIdManager() for torch_op_api in torch_op_apis: - self._fwk_apis.append([torch_op_api[TorchOpDataOri.START_NS.value], - torch_op_api[TorchOpDataOri.END_NS.value], - torch_op_api[TorchOpDataOri.GLOBAL_TID.value], - None if not torch_op_api[TorchOpDataOri.CONNECTION_ID.value] else ConnectionIdManager().get_id_from_connection_ids(torch_op_api[TorchOpDataOri.CONNECTION_ID.value]), - Str2IdManager().get_id_from_str(torch_op_api[TorchOpDataOri.NAME.value]), - torch_op_api[TorchOpDataOri.SEQUENCE_NUM.value], - torch_op_api[TorchOpDataOri.FWD_THREAD_ID.value], - None if not torch_op_api[TorchOpDataOri.INPUT_DIMS.value] else Str2IdManager().get_id_from_str(torch_op_api[TorchOpDataOri.INPUT_DIMS.value]), - None if not torch_op_api[TorchOpDataOri.INPUT_SHAPES.value] else Str2IdManager().get_id_from_str(torch_op_api[TorchOpDataOri.INPUT_SHAPES.value]), - None if not torch_op_api[TorchOpDataOri.CALL_STACK.value] else CallChainIdManager().get_callchain_id_from_callstack(torch_op_api[TorchOpDataOri.CALL_STACK.value]), - ApiType.TORCH_OP.value]) - mstx_mark_apis = fwk_api_data.get("mstx_op", []) - if not mstx_mark_apis: - return - self.get_mstx_mark_op_connection_ids_with_cann_api(task_enqueues, task_dequeues, mstx_mark_apis) + torch_op_api[TorchOpDataOri.CONNECTION_ID] = connectionId_manager.get_id_from_connection_ids(torch_op_api[TorchOpDataOri.CONNECTION_ID]) if torch_op_api[TorchOpDataOri.CONNECTION_ID] else None + self._fwk_apis.extend(torch_op_apis) + + if TorchDb().judge_table_exist(DbConstant.TABLE_MSTX_EVENTS): + self.get_mstx_mark_op_connection_ids_with_cann_api(task_enqueues, task_dequeues, mstx_mark_apis) + + # update connection id for mstx mark op for mstx_mark_api in mstx_mark_apis: - self._fwk_apis.append([mstx_mark_api[TorchOpDataOri.START_NS.value], mstx_mark_api[TorchOpDataOri.END_NS.value], mstx_mark_api[TorchOpDataOri.GLOBAL_TID.value], - None if not mstx_mark_api[TorchOpDataOri.CONNECTION_ID.value] else ConnectionIdManager().get_id_from_connection_ids(mstx_mark_api[TorchOpDataOri.CONNECTION_ID.value]), - Str2IdManager().get_id_from_str(mstx_mark_api[TorchOpDataOri.NAME.value]), - None, mstx_mark_api[TorchOpDataOri.FWD_THREAD_ID.value], None, None, None, - ApiType.MSTX_OP.value]) + if mstx_mark_api[TorchOpDataOri.CONNECTION_ID]: + mstx_mark_api[TorchOpDataOri.CONNECTION_ID] = connectionId_manager.get_id_from_connection_ids(mstx_mark_api[TorchOpDataOri.CONNECTION_ID]) + self._fwk_apis.extend(mstx_mark_apis) def get_mstx_mark_op_connection_ids_with_cann_api(self, task_enqueues: list, task_dequeues: list, mstx_mark_apis: list): + if not mstx_mark_apis: + return sql = "select startNs, endNs, globalTid, connectionId from {} order by startNs".format( DbConstant.TABLE_MSTX_EVENTS) cann_tx_apis = TorchDb().fetch_all_data(sql) if not cann_tx_apis: raise RuntimeWarning("Failed to get msprof_tx apis") - mstx_mark_apis.sort(key=lambda x: x[TorchOpDataOri.START_NS.value]) + mstx_mark_apis.sort(key=lambda x: x[TorchOpDataOri.START_NS]) mstx_op_len = len(mstx_mark_apis) if task_enqueues and task_dequeues: self.get_torch_op_connection_ids_with_task_queue(task_enqueues, task_dequeues, mstx_mark_apis, mstx_op_len, cann_tx_apis) def get_torch_op_connection_ids_with_cann_api(self, task_enqueues: list, task_dequeues: list, torch_op_apis: list): + if not torch_op_apis: + return sql = "select id from {} where value = 'launch'".format(DbConstant.TABLE_STRING_IDS) node_launch_str_ids = TorchDb().fetch_one_data(sql) node_launch_str_id = 0 @@ -161,7 +98,7 @@ class FwkApiDbParser(BaseParser): node_launch_apis = TorchDb().fetch_all_data(sql) if not node_launch_apis: raise RuntimeWarning("Failed to get node launch apis") - torch_op_apis.sort(key=lambda x: x[TorchOpDataOri.START_NS.value]) + torch_op_apis.sort(key=lambda x: x[TorchOpDataOri.START_NS]) torch_op_len = len(torch_op_apis) if task_enqueues and task_dequeues: self.get_torch_op_connection_ids_with_task_queue(task_enqueues, task_dequeues, torch_op_apis, torch_op_len, @@ -170,44 +107,43 @@ class FwkApiDbParser(BaseParser): self.get_torch_op_connection_ids_without_task_queue(torch_op_apis, torch_op_len, node_launch_apis) def get_torch_op_connection_ids_with_task_queue(self, task_enqueues: list, task_dequeues: list, torch_op_apis: list, torch_op_len: int, node_lauch_apis: list): - enqueue_corr_ids = {task_enqueue[TaskQueueDataOri.CORRELATION_ID.value] for task_enqueue in task_enqueues} - dequeue_corr_ids = {task_dequeue[TaskQueueDataOri.CORRELATION_ID.value] for task_dequeue in task_dequeues} - enqueue_list = [] - for task_enqueue in task_enqueues: - if task_enqueue[TaskQueueDataOri.CORRELATION_ID.value] in dequeue_corr_ids: - enqueue_list.append(task_enqueue) - dequeue_list = [] - for task_dequeue in task_dequeues: - if task_dequeue[TaskQueueDataOri.CORRELATION_ID.value] in enqueue_corr_ids: - dequeue_list.append(task_dequeue) + connection_id_manager = ConnectionIdManager() + enqueue_corr_ids = {connection_id_manager.get_connection_ids_from_id(task_enqueue[TaskQueueDataOri.CORRELATION_ID])[0] for task_enqueue in task_enqueues} + dequeue_corr_ids = {connection_id_manager.get_connection_ids_from_id(task_dequeue[TaskQueueDataOri.CORRELATION_ID])[0] for task_dequeue in task_dequeues} + matched_corr_ids = enqueue_corr_ids & dequeue_corr_ids + enqueue_list = [enqueue for enqueue in task_enqueues if connection_id_manager.get_connection_ids_from_id(enqueue[TaskQueueDataOri.CORRELATION_ID])[0] in matched_corr_ids] + dequeue_list = [dequeue for dequeue in task_dequeues if connection_id_manager.get_connection_ids_from_id(dequeue[TaskQueueDataOri.CORRELATION_ID])[0] in matched_corr_ids] + last_dequeue_index = 0 last_torch_op_index = 0 dequeue_len = len(dequeue_list) for node_launch_api in node_lauch_apis: for idx in range(last_dequeue_index, dequeue_len): - if node_launch_api[CannNodeLaunchApiOri.START_NS.value] > dequeue_list[idx][TaskQueueDataOri.START_NS.value] and \ - node_launch_api[CannNodeLaunchApiOri.END_NS.value] < dequeue_list[idx][TaskQueueDataOri.END_NS.value]: + if node_launch_api[CannNodeLaunchApiOri.START_NS] > dequeue_list[idx][TaskQueueDataOri.START_NS] and \ + node_launch_api[CannNodeLaunchApiOri.END_NS] < dequeue_list[idx][TaskQueueDataOri.END_NS]: last_dequeue_index = idx enqeue = enqueue_list[idx] last_torch_op_index = self.get_torch_op_connection_ids_with_enqueue(torch_op_apis, torch_op_len, enqeue, last_torch_op_index, - node_launch_api[CannNodeLaunchApiOri.CORRELATION_ID.value]) + node_launch_api[CannNodeLaunchApiOri.CORRELATION_ID]) + break + if dequeue_list[idx][TaskQueueDataOri.START_NS] > node_launch_api[CannNodeLaunchApiOri.END_NS]: break def get_torch_op_connection_ids_with_enqueue(self, torch_op_apis: list, torch_op_len: int, enqeue: list, last_torch_op_index: int, connection_id: int) -> int: last_op_api = None for idx in range(last_torch_op_index, torch_op_len): - if enqeue[TaskQueueDataOri.START_NS.value] > torch_op_apis[idx][TorchOpDataOri.END_NS.value]: + if enqeue[TaskQueueDataOri.START_NS] > torch_op_apis[idx][TorchOpDataOri.END_NS]: continue - if enqeue[TaskQueueDataOri.START_NS.value] > torch_op_apis[idx][TorchOpDataOri.START_NS.value] and enqeue[TaskQueueDataOri.END_NS.value] < torch_op_apis[idx][TorchOpDataOri.END_NS.value]: + if enqeue[TaskQueueDataOri.START_NS] > torch_op_apis[idx][TorchOpDataOri.START_NS] and enqeue[TaskQueueDataOri.END_NS] < torch_op_apis[idx][TorchOpDataOri.END_NS]: last_op_api = torch_op_apis[idx] last_torch_op_index = idx elif last_op_api: break if last_op_api: - torch_op_apis[last_torch_op_index][TorchOpDataOri.CONNECTION_ID.value].append(connection_id) + torch_op_apis[last_torch_op_index][TorchOpDataOri.CONNECTION_ID].append(connection_id) return last_torch_op_index def get_torch_op_connection_ids_without_task_queue(self, torch_op_apis: list, torch_op_len: int, node_lauch_apis: list): @@ -215,30 +151,22 @@ class FwkApiDbParser(BaseParser): last_op_index = 0 for node_launch_api in node_lauch_apis: for idx in range(last_op_index, torch_op_len): - if torch_op_apis[idx][TorchOpDataOri.GLOBAL_TID.value] != node_launch_api[CannNodeLaunchApiOri.GLOBAL_TID.value]: + if torch_op_apis[idx][TorchOpDataOri.GLOBAL_TID] != node_launch_api[CannNodeLaunchApiOri.GLOBAL_TID]: continue - if node_launch_api[CannNodeLaunchApiOri.START_NS.value] > torch_op_apis[idx][TorchOpDataOri.END_NS.value]: + if node_launch_api[CannNodeLaunchApiOri.START_NS] > torch_op_apis[idx][TorchOpDataOri.END_NS]: continue - if node_launch_api[CannNodeLaunchApiOri.START_NS.value] > torch_op_apis[idx][TorchOpDataOri.START_NS.value] and \ - node_launch_api[CannNodeLaunchApiOri.END_NS.value] < torch_op_apis[idx][TorchOpDataOri.END_NS.value]: + if node_launch_api[CannNodeLaunchApiOri.START_NS] > torch_op_apis[idx][TorchOpDataOri.START_NS] and \ + node_launch_api[CannNodeLaunchApiOri.END_NS] < torch_op_apis[idx][TorchOpDataOri.END_NS]: last_op_api = torch_op_apis[idx] last_op_index = idx elif last_op_api: - torch_op_apis[last_op_index][TorchOpDataOri.CONNECTION_ID.value].append(node_launch_api[CannNodeLaunchApiOri.CORRELATION_ID.value]) + torch_op_apis[last_op_index][TorchOpDataOri.CONNECTION_ID].append(node_launch_api[CannNodeLaunchApiOri.CORRELATION_ID]) last_op_api = None break def set_start_string_id(self): Str2IdManager().set_start_id(DbConstant.START_STRING_ID_FWK_API) - def get_max_cann_id(self): - if not TorchDb().judge_table_exist(DbConstant.TABLE_CANN_API): - return - sql = "select max(connectionId) from {}".format(DbConstant.TABLE_CANN_API) - connectionIds = TorchDb().fetch_one_data(sql) - if connectionIds and connectionIds[0]: - self._max_cann_connection_id = connectionIds[0] + 1 - def init_db_connect(self) -> None: if not TorchDb().create_connect_db(): raise RuntimeError(f"Failed to connect to db file: {TorchDb().get_db_path()}") @@ -285,10 +213,10 @@ class FwkApiDbParser(BaseParser): TorchDb().create_table_with_headers(DbConstant.TABLE_ENUM_API_TYPE, TableColumnsManager.TableColumns.get(DbConstant.TABLE_ENUM_API_TYPE)) api_types = [ - (ApiType.TORCH_OP.value, 'op'), - (ApiType.TASK_QUEUE.value, 'queue'), - (ApiType.PYTHON_TRACE.value, 'trace'), - (ApiType.MSTX_OP.value, 'mstx') + (ApiType.TORCH_OP, 'op'), + (ApiType.TASK_QUEUE, 'queue'), + (ApiType.PYTHON_TRACE, 'trace'), + (ApiType.MSTX_OP, 'mstx') ] TorchDb().insert_data_into_table(DbConstant.TABLE_ENUM_API_TYPE, api_types) diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_gc_record_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_gc_record_db_parser.py index a570e909e32..b464241f7c3 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_gc_record_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_gc_record_db_parser.py @@ -30,6 +30,7 @@ class GCRecordDbParser(BaseParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("GCRecordDbParser start.") try: self.init_db_connect() self._gc_record_data = FwkFileParser(self._profiler_path).get_gc_record_db_data() @@ -37,6 +38,7 @@ class GCRecordDbParser(BaseParser): except Exception as error: self.logger.error("Failed to generate gc record table, error: %s", str(error), exc_info=True) return Constant.FAIL, None + self.logger.info("GCRecordDbParser finish.") return Constant.SUCCESS, None def init_db_connect(self) -> None: diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_memory_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_memory_db_parser.py index ab8aca2de54..733f9335f2a 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_memory_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_memory_db_parser.py @@ -79,6 +79,7 @@ class MemoryDbParser(BaseParser): return [cur_record, pta_ge_record_list] def run(self, deps_data: dict): + self.logger.info("MemoryDbParser start.") try: cann_path = ProfilerPathManager.get_cann_path(self._profiler_path) device_ids = ProfilerPathManager.get_device_id(cann_path) @@ -87,11 +88,11 @@ class MemoryDbParser(BaseParser): self.set_start_string_id() self._pta_op_memory_data = deps_data.get(Constant.MEMORY_PREPARE, {}).get("memory_data", {}).get(Constant.Db, []) self._pta_memory_bean_list = deps_data.get(Constant.MEMORY_PREPARE, {}).get("pta_record_list", []) - self.init_pta_memory_data() self.save_memory_data_to_db() except Exception as error: self.logger.error("Failed to generate memory_record table or op_memory table, error: %s", str(error), exc_info=True) return Constant.FAIL, None + self.logger.info("MemoryDbParser finish.") return Constant.SUCCESS, None def init_db_connect(self): @@ -241,14 +242,6 @@ class MemoryDbParser(BaseParser): TorchDb().create_table_with_headers(DbConstant.TABLE_MEMORY_RECORD, TableColumnsManager.TableColumns.get(DbConstant.TABLE_MEMORY_RECORD)) TorchDb().insert_data_into_table(DbConstant.TABLE_MEMORY_RECORD, self._record_list) - def init_pta_memory_data(self): - if not ProfilerPathManager.get_cann_path(self._profiler_path): - torch_nop_node = FwkFileParser(self._profiler_path).get_torch_op_tree_node(only_fwk=True) - deps_data = {Constant.TREE_BUILD_PARSER: torch_nop_node} - _, pta_data = MemoryPrepareParser(Constant.MEMORY_PREPARE, self._param_dict).run(deps_data) - self._pta_op_memory_data = pta_data.get("memory_data", {}).get(Constant.Db, []) - self._pta_memory_bean_list = pta_data.get("pta_record_list", []) - def save_strings_id(self): TorchDb().create_table_with_headers(DbConstant.TABLE_STRING_IDS, TableColumnsManager.TableColumns.get(DbConstant.TABLE_STRING_IDS)) TorchDb().insert_data_into_table(DbConstant.TABLE_STRING_IDS, Str2IdManager().get_all_string_2_id_data()) diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_step_info_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_step_info_db_parser.py index df3b8fea4f2..3c6e5674eba 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_step_info_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_step_info_db_parser.py @@ -31,12 +31,14 @@ class StepInfoDbParser(BaseParser): self.logger = ProfilerLogger.get_instance() def run(self, deps_data: dict): + self.logger.info("StepInfoDbParser start.") try: torch_op_node = deps_data.get(Constant.TREE_BUILD_PARSER, []) step_range = self.get_step_range(torch_op_node[0] if torch_op_node else None) except Exception as error: self.logger.error("Failed to get step info from db, error: %s", str(error), exc_info=True) return Constant.FAIL, [] + self.logger.info("StepInfoDbParser finish.") return Constant.SUCCESS, step_range def get_api_data_in_time_range(self, begin_ts, end_ts) -> list: diff --git a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_trace_step_time_db_parser.py b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_trace_step_time_db_parser.py index db82064fdef..2c3a275c7e2 100644 --- a/torch_npu/profiler/analysis/prof_view/prof_db_parse/_trace_step_time_db_parser.py +++ b/torch_npu/profiler/analysis/prof_view/prof_db_parse/_trace_step_time_db_parser.py @@ -16,12 +16,11 @@ from collections import defaultdict from enum import Enum from .._base_parser import BaseParser from ...prof_common_func._constant import Constant, print_warn_msg -from ...prof_common_func._constant import DbConstant, TableColumnsManager +from ...prof_common_func._constant import DbConstant, TableColumnsManager, TorchOpDataOri from ...prof_common_func._db_manager import AnalysisDb, TorchDb from ...prof_common_func._constant import convert_ns2us_float from ...prof_common_func._log import ProfilerLogger from ...prof_common_func._time_range_calculator import CommunicationTimeRange, RangeCaculator -from ...prof_parse._fwk_file_parser import FwkFileParser __all__ = [] @@ -38,6 +37,7 @@ class TraceStepTimeDbParser(BaseParser): def __init__(self, name: str, param_dict: dict): super().__init__(name, param_dict) self.step_range = [] + self.torch_op_data = [] self.compute_task_info = defaultdict(list) self.communication_op_info = defaultdict(list) ProfilerLogger.init(self._profiler_path, "TraceStepTimeDbParser") @@ -58,7 +58,7 @@ class TraceStepTimeDbParser(BaseParser): if not first_task_start_ts: return 0 if step_info.get(Constant.STEP_ID) is None: - first_fwk_op = FwkFileParser(self._profiler_path).get_first_fwk_op() + first_fwk_op = min(self.torch_op_data, key=lambda op: op[TorchOpDataOri.START_NS]) if self.torch_op_data else None return (first_task_start_ts - first_fwk_op.ts) if first_fwk_op else 0 return first_task_start_ts - step_info.get(Constant.FWK_START_TS, 0) @@ -71,13 +71,16 @@ class TraceStepTimeDbParser(BaseParser): AnalysisDb().insert_data_into_table(DbConstant.TABLE_STEP_TRACE_TIME, step_trace_data) def run(self, deps_data: dict): + self.logger.info("TraceStepTimeDbParser start.") try: + self.torch_op_data = deps_data.get(Constant.DB_PRE_PARSER, {}).get("torch_op", []) self._init_step_range(deps_data) self._init_task_info_from_db() self.generate_view() except Exception as error: self.logger.error("Failed to generate step_trace_time table, error: %s", str(error), exc_info=True) return Constant.FAIL, None + self.logger.info("TraceStepTimeDbParser finish.") return Constant.SUCCESS, None def generate_view(self) -> None: -- Gitee