diff --git a/component/taskd/taskd/python/adaptor/pytorch/__init__.py b/component/taskd/taskd/python/adaptor/pytorch/__init__.py deleted file mode 100644 index 8ac511d2e88f40fb6181e50e6dc9da23fdb6f677..0000000000000000000000000000000000000000 --- a/component/taskd/taskd/python/adaptor/pytorch/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== \ No newline at end of file diff --git a/component/taskd/taskd/python/adaptor/pytorch/group_info.py b/component/taskd/taskd/python/adaptor/pytorch/group_info.py deleted file mode 100644 index 06bc4ac88775f5ad4d43f5574ea955f0cda04be5..0000000000000000000000000000000000000000 --- a/component/taskd/taskd/python/adaptor/pytorch/group_info.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -from taskd.python.cython_api import cython_api -from taskd.python.utils.log import run_log -from taskd.python.constants.constants import CHECK_STEP_PERIOD, JOB_ID_KEY, DEFAULT_GROUP_DIR, \ - PROFILING_DIR_MODE, GROUP_INFO_NAME, GROUP_INFO_KEY, GROUP_NAME_KEY, GROUP_RANK_KEY, \ - GLOBAL_RANKS_KEY, DEFAULT_GROUP, GROUP_BASE_DIR_ENV -import threading -import os -import json -import time - - -def get_save_path(rank) -> str: - job_id = os.getenv(JOB_ID_KEY) - if job_id is None or job_id == "": - run_log.error(f"job id is invalid") - return "" - base_dir = os.getenv(GROUP_BASE_DIR_ENV) - if base_dir is None: - base_dir = "" - if not os.path.exists(base_dir): - run_log.warning(f"config group base dir {base_dir} not exists, use default group info dir") - base_dir = DEFAULT_GROUP_DIR - rank_path = os.path.join(base_dir, job_id, str(rank)) - try: - os.makedirs(rank_path, mode=PROFILING_DIR_MODE, exist_ok=True) - except FileExistsError: - run_log.warning(f"filepath={rank_path} exist") - return rank_path - except OSError as err: - run_log.error(f"filepath={rank_path} failed, err={err}") - return "" - return rank_path - - -def get_group_info(rank: int) -> dict: - try: - import torch - from torch.distributed.distributed_c10d import _world as distributed_world - if not torch.distributed.is_available() or not torch.distributed.is_initialized(): - run_log.error(f'distributed is not available or not initialized, rank={rank}') - return {} - group_info = {} - global_rank = rank - distributed_world.pg_names - for group, group_config in distributed_world.pg_map.items(): - run_log.debug(f'distributed world data: {group}, {group_config}') - if len(group_config) < 1: - run_log.warning(f'group config is invalid, group={group}, group_config={group_config}') - continue - backend = str(group_config[0]).lower() - if backend != "hccl": - continue - hccl_group = group._get_backend(torch.device("npu")) - comm_name = hccl_group.get_hccl_comm_name(global_rank, init_comm=False) - if comm_name is not None: - group_info[comm_name] = { - GROUP_NAME_KEY: hccl_group.options.hccl_config.get("group_name", ""), - GROUP_RANK_KEY: torch.distributed.get_group_rank(group, global_rank), - GLOBAL_RANKS_KEY: torch.distributed.get_process_group_ranks(group) - } - default_group = torch.distributed.distributed_c10d._get_default_group() - comm_name = default_group._get_backend(torch.device("npu")).get_hccl_comm_name(global_rank, init_comm=False) - if comm_name is not None: - group_info[comm_name] = { - GROUP_NAME_KEY: DEFAULT_GROUP, - GROUP_RANK_KEY: torch.distributed.get_group_rank(default_group, global_rank), - GLOBAL_RANKS_KEY: torch.distributed.get_process_group_ranks(default_group) - } - return group_info - except Exception as err: - run_log.error(f'get group info failed, err={err}') - return {} - - -def dump_group_info(): - try: - import torch - rank = torch.distributed.get_rank() - run_log.info(f'start dump group info for rank={rank}') - group_info = get_group_info(rank) - if group_info is not None: - run_log.info(f'get group info: {group_info}') - save_path = get_save_path(rank) - if save_path == "": - run_log.error(f'get save path for group info failed') - return - run_log.info(f'save group info to: {save_path}') - full_path = os.path.join(save_path, GROUP_INFO_NAME) - with open(full_path, "w", encoding="utf-8") as f: - json.dump(group_info, f, ensure_ascii=False, indent=4) - except Exception as err: - run_log.error(f'save group info failed: {err}') \ No newline at end of file diff --git a/component/taskd/taskd/python/framework/worker/worker.py b/component/taskd/taskd/python/framework/worker/worker.py index 4d3b611e118294c5feb372fd0d1f12e0d9b6ea86..b9df2fa3f412fc709e2775a6fee690928c88cfd1 100644 --- a/component/taskd/taskd/python/framework/worker/worker.py +++ b/component/taskd/taskd/python/framework/worker/worker.py @@ -20,7 +20,6 @@ from typing import List from taskd.python.cython_api import cython_api from taskd.python.utils.log import run_log -from taskd.python.adaptor.pytorch.group_info import dump_group_info class Worker: diff --git a/component/taskd/tests/ut/python/adaptor/test_group_info.py b/component/taskd/tests/ut/python/adaptor/test_group_info.py deleted file mode 100644 index 0efdc67af2e6811c02cfa6a085cb375093b7474f..0000000000000000000000000000000000000000 --- a/component/taskd/tests/ut/python/adaptor/test_group_info.py +++ /dev/null @@ -1,46 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Copyright 2025. Huawei Technologies Co.,Ltd. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -import unittest -from unittest.mock import patch, MagicMock -import os - -from taskd.python.adaptor.pytorch.group_info import get_save_path, get_group_info, dump_group_info - - -class TestGroupInfoFunctions(unittest.TestCase): - - @patch('os.getenv') - @patch('os.path.exists') - @patch('os.makedirs') - def test_get_save_path(self, mock_makedirs, mock_exists, mock_getenv): - mock_getenv.side_effect = [None, None] - result = get_save_path(1) - self.assertEqual(result, "") - - mock_getenv.side_effect = ['job_id', 'base_dir'] - mock_exists.return_value = True - result = get_save_path(1) - self.assertEqual(result, os.path.join('base_dir', 'job_id', '1')) - - mock_getenv.side_effect = ['job_id', 'base_dir'] - mock_exists.return_value = False - mock_makedirs.side_effect = OSError('Test error') - result = get_save_path(1) - self.assertEqual(result, "") - -if __name__ == '__main__': - unittest.main() \ No newline at end of file