From dbf1af3074145993a47c81472c50f4dc8e2a6c62 Mon Sep 17 00:00:00 2001 From: mei-feiyao Date: Tue, 23 Jan 2024 11:44:04 +0800 Subject: [PATCH 1/2] add bind_core.py --- profiler/affinity_cpu_bind/README.md | 34 ++++ profiler/affinity_cpu_bind/bind_core.py | 227 ++++++++++++++++++++++++ 2 files changed, 261 insertions(+) create mode 100644 profiler/affinity_cpu_bind/README.md create mode 100644 profiler/affinity_cpu_bind/bind_core.py diff --git a/profiler/affinity_cpu_bind/README.md b/profiler/affinity_cpu_bind/README.md new file mode 100644 index 000000000..832bd555e --- /dev/null +++ b/profiler/affinity_cpu_bind/README.md @@ -0,0 +1,34 @@ +### **昇腾亲和性CPU绑核工具** + +### **介绍** +昇腾亲和性CPU绑核工具支持用户无需侵入式修改工程,直接运行工具即可实现按亲和性策略绑核,提升推理或训练性能。 + +### **使用方式** +1.命令行输入python3 bind_core.py -app/--application="inference/train cmd"(如果命令含多个参数,放在双引号中) +该方式会在拉起任务后,监测任务进程,并实施绑核,直至任务进程结束。 + +2.推理或训练任务已经拉起,命令行输入python3 bind_core.py。该方式会循环查找使用到NPU卡的任务进程,并实施绑核。 + +3.绑核运行过程的日志默认不存盘;想保存运行日志的话,执行绑核命令时设置-l/--log参数,例如 : python3 bind_core.py -l/--log,这样就会将运行日志保存到当前路径的bind_core_xxx.txt + +### **使用须知** +1.该脚本会在拉起后查找使用到NPU卡的进程,每次查找10s,循环5次。如果找不到进程,会超时退出。 + +2.使用工具前应提前安装pstree工具,参考命令yum install -y psmisc或apt -y install psmisc。 + +3.使用前手动执行npu-smi info -t topo,出现如下类似信息,说明环境支持绑核,否则请将环境驱动包升级到Ascend HDK 23.0.RC2以上版本。 + + NPU0 NPU1 NPU2 NPU3 NPU4 NPU5 NPU6 NPU7 CPU Affinity + NPU0 X HCCS HCCS HCCS HCCS HCCS HCCS HCCS xx-xx + NPU1 HCCS X HCCS HCCS HCCS HCCS HCCS HCCS xx-xx + NPU2 HCCS HCCS X HCCS HCCS HCCS HCCS HCCS xx-xx + NPU3 HCCS HCCS HCCS X HCCS HCCS HCCS HCCS xx-xx + NPU4 HCCS HCCS HCCS HCCS X HCCS HCCS HCCS xx-xx + NPU5 HCCS HCCS HCCS HCCS HCCS X HCCS HCCS xx-xx + NPU6 HCCS HCCS HCCS HCCS HCCS HCCS X HCCS xx-xx + NPU7 HCCS HCCS HCCS HCCS HCCS HCCS HCCS X xx-xx + + + + + diff --git a/profiler/affinity_cpu_bind/bind_core.py b/profiler/affinity_cpu_bind/bind_core.py new file mode 100644 index 000000000..a3f6d67a8 --- /dev/null +++ b/profiler/affinity_cpu_bind/bind_core.py @@ -0,0 +1,227 @@ +#! /usr/bin/python3 +# Copyright 2023 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess +import re +import argparse +from datetime import datetime +from datetime import timezone +import time + +NPU_IDS = [] +RUNNING_PIDS = {} +NPU_CPU_AFFINITY_DICT = {} +SAVE_LOG_TO_FILE = False + +# binding core log file +nowtime = datetime.now(tz=timezone.utc) +BIND_CORE_RESULT_FILE = 'bind_core_' + \ + str(nowtime.year) + '_' + \ + str(nowtime.month) + '_' + \ + str(nowtime.day) + '_' + \ + str(nowtime.hour) + '_' + \ + str(nowtime.minute) + '_' + \ + str(nowtime.second) + '.txt' + + +# print log to logfile +def print_log_to_file(msg): + global SAVE_LOG_TO_FILE + if not SAVE_LOG_TO_FILE: + return + with open(file=BIND_CORE_RESULT_FILE, mode="a", encoding="utf-8") as f: + f.write(msg + '\n') + + +# launch training or inference process +def launch_process(cmd): + global RUNNING_CMD_PID + print_log_to_file('[INFO] Start to execute cmd: {}'.format(cmd)) + subprocess.Popen(cmd.split(), shell=False) + + +# parse input cmd +def args_parse(): + global SAVE_LOG_TO_FILE + bind_wait_core_time = 0 + parser = argparse.ArgumentParser(description='This is a sample program.') + parser.add_argument('-t', '--time', type=int, metavar='', nargs='+', help='Wait time before bind cores that you want to set. The unit is \'s\'') + parser.add_argument('-app', '--application', metavar='', nargs='+', help='Training or inference command that you want to run.') + parser.add_argument('-l', '--log', default=False, action='store_true', help='Switch to save running log to local file.') + args = parser.parse_args() + if args.application: + application_cmd = ' '.join(args.application) + launch_process(application_cmd) + time.sleep(10) + if args.time: + bind_wait_core_time = int(args.time[0]) + if args.log: + SAVE_LOG_TO_FILE = True + + # if time is set, wait for setting time before bind cores + if bind_wait_core_time != 0: + time.sleep(bind_wait_core_time) + + +# get npu affinity +def get_npu_affinity() -> bool: + global NPU_CPU_AFFINITY_DICT + global NPU_IDS + + get_npu_topo_cmd = 'npu-smi info -t topo' + p = subprocess.run(get_npu_topo_cmd.split(), shell=False, capture_output=True) + res = p.stdout.decode('utf-8').strip().split() + if not res: + print('[ERROR] Failed to run get npu affinity info, please check if driver version support cmd npu-smi info -t topo') + return False + + i = 0 + for v in res: + if '-' in v: + NPU_CPU_AFFINITY_DICT[NPU_IDS[i]] = v + i += 1 + for k in NPU_CPU_AFFINITY_DICT.keys(): + print_log_to_file('[INFO] Affinity CPU list {} for NPU {}'.format(NPU_CPU_AFFINITY_DICT[k], k)) + return True + + +# get total npu id +def get_total_npu_id() -> bool: + global NPU_IDS + get_npu_info_cmd = 'npu-smi info -l' + get_npu_info_process = subprocess.run(get_npu_info_cmd.split(), shell=False, capture_output=True) + get_npu_ids_cmd = 'grep ID' + get_npu_ids_process = subprocess.run(get_npu_ids_cmd.split(), shell=False, input=get_npu_info_process.stdout, capture_output=True) + res = get_npu_ids_process.stdout.decode('utf-8').strip().split() + for i in res: + if i.isdigit(): + NPU_IDS.append(int(i)) + if not NPU_IDS: + print('[ERROR] Failed to get total NPU id list, please make sure there is NPU on this device') + return False + print_log_to_file('[INFO] NPU total id list: {}'.format(NPU_IDS)) + return True + + +# get app pid on npu +def get_pid_on_npu() -> bool: + global RUNNING_PIDS + global NPU_IDS + print_log_to_file('[INFO] Begin to find running process on all NPUs') + RUNNING_PIDS.clear() + # get process pid on NPUs, retry times : 5 + for times in range(5): + for i in NPU_IDS: + get_npu_pids_cmd = 'npu-smi info -t proc-mem -i {} -c 0'.format(str(i)) + p = subprocess.run(get_npu_pids_cmd.split(), shell=False, capture_output=True) + res = p.stdout.decode('utf-8').strip().split() + + if 'Process' in res: + for v in res: + if v.startswith('id:'): + pid_on_npu = v.split(':')[1] + if i not in RUNNING_PIDS: + RUNNING_PIDS[i] = [int(pid_on_npu)] + else: + RUNNING_PIDS[i].append(int(pid_on_npu)) + + if RUNNING_PIDS: + break + print_log_to_file('[WARNING] Found no running process on all NPUs, retry times: {}, wait for 5 s'.format(times + 1)) + # wait 5 s for each time + time.sleep(5) + + # no running process on NPUs, stop + if not RUNNING_PIDS: + print_log_to_file('[INFO] Found no running process on all NPUs, stop bind cores') + print('[INFO] Now there is no running process on all NPUs, stop bind cores') + return False + + # delete repeat pid + for i in NPU_IDS: + if i not in RUNNING_PIDS: + continue + pids_npu = RUNNING_PIDS[i] + for n, pid in RUNNING_PIDS.items(): + if n != i and pid in pids_npu: + RUNNING_PIDS[n].remove(pid) + + for k in RUNNING_PIDS.keys(): + print_log_to_file('[INFO] Succeed to find running process {} on NPU {}'.format(RUNNING_PIDS[k], k)) + return True + + +# get device info +def get_dev_info() -> bool: + if not get_total_npu_id(): + return False + if not get_npu_affinity(): + return False + return True + + +# get process affinity +def get_process_affinity(pid): + get_affinity_cpu_cmd = 'taskset -pc {} '.format(pid) + p = subprocess.run(get_affinity_cpu_cmd.split(), shell=False, capture_output=True) + res = p.stdout.decode('utf-8').strip().split() + return res[len(res) - 1] + + +# run bind core +def run_bind_core(): + global NPU_IDS + global NPU_CPU_AFFINITY_DICT + for k, pid_list in RUNNING_PIDS.items(): + cpu_list = NPU_CPU_AFFINITY_DICT[k].split('-') + start_cpu_id = cpu_list[0] + end_cpu_id = cpu_list[1] + + for pid in pid_list: + get_child_pids_cmd = 'pstree {} -p -T'.format(pid) + p = subprocess.run(get_child_pids_cmd.split(), shell=False, capture_output=True) + res = p.stdout.decode('utf-8').strip().split() + for ele in res: + ele = re.sub(u"\\(|\\)", ",", ele) + ele_list = ele.split(',') + for sub_p in ele_list: + if sub_p.isdigit(): + sub_p = int(sub_p) + + # if process has set to right affinity, continue + current_affinity_cpu_list = get_process_affinity(sub_p) + if not current_affinity_cpu_list: + continue + current_cpu_list = current_affinity_cpu_list.split('-') + if current_cpu_list and current_cpu_list[0] == start_cpu_id and current_cpu_list[1] == end_cpu_id: + continue + print_log_to_file('[INFO] Begin to bind cores for process {} on NPU {}'.format(str(sub_p), k)) + set_affinity_cpu_cmd = 'taskset -pc {}-{} {}'.format(int(start_cpu_id), int(end_cpu_id), sub_p) + p = subprocess.run(set_affinity_cpu_cmd.split(), shell=False, capture_output=True) + print_log_to_file(p.stdout.decode('utf-8')) + + print_log_to_file('[INFO] Succeed to bind process {} on NPU {} with cpu cores list {}'.format(str(sub_p), k, NPU_CPU_AFFINITY_DICT[k])) + + +if __name__ == '__main__': + print("[INFO] Begin to run bind-cores script...") + args_parse() + if not get_dev_info(): + exit() + + while True: + if not get_pid_on_npu(): + exit() + run_bind_core() -- Gitee From 48cd9bb01598039d0ad051062bb33326f7473b10 Mon Sep 17 00:00:00 2001 From: mei-feiyao Date: Tue, 19 Mar 2024 14:23:48 +0800 Subject: [PATCH 2/2] update bind core script --- profiler/affinity_cpu_bind/README.md | 4 +- profiler/affinity_cpu_bind/bind_core.py | 404 +++++++++++------------- 2 files changed, 188 insertions(+), 220 deletions(-) diff --git a/profiler/affinity_cpu_bind/README.md b/profiler/affinity_cpu_bind/README.md index 832bd555e..0ad8f3dd3 100644 --- a/profiler/affinity_cpu_bind/README.md +++ b/profiler/affinity_cpu_bind/README.md @@ -14,9 +14,7 @@ ### **使用须知** 1.该脚本会在拉起后查找使用到NPU卡的进程,每次查找10s,循环5次。如果找不到进程,会超时退出。 -2.使用工具前应提前安装pstree工具,参考命令yum install -y psmisc或apt -y install psmisc。 - -3.使用前手动执行npu-smi info -t topo,出现如下类似信息,说明环境支持绑核,否则请将环境驱动包升级到Ascend HDK 23.0.RC2以上版本。 +2.使用前手动执行npu-smi info -t topo,出现如下类似信息,说明环境支持绑核,否则请将环境驱动包升级到Ascend HDK 23.0.RC2以上版本。 NPU0 NPU1 NPU2 NPU3 NPU4 NPU5 NPU6 NPU7 CPU Affinity NPU0 X HCCS HCCS HCCS HCCS HCCS HCCS HCCS xx-xx diff --git a/profiler/affinity_cpu_bind/bind_core.py b/profiler/affinity_cpu_bind/bind_core.py index a3f6d67a8..1dea45806 100644 --- a/profiler/affinity_cpu_bind/bind_core.py +++ b/profiler/affinity_cpu_bind/bind_core.py @@ -1,227 +1,197 @@ -#! /usr/bin/python3 -# Copyright 2023 Huawei Technologies Co., Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - import subprocess -import re import argparse +import os +import time from datetime import datetime from datetime import timezone -import time -NPU_IDS = [] -RUNNING_PIDS = {} -NPU_CPU_AFFINITY_DICT = {} -SAVE_LOG_TO_FILE = False - -# binding core log file -nowtime = datetime.now(tz=timezone.utc) -BIND_CORE_RESULT_FILE = 'bind_core_' + \ - str(nowtime.year) + '_' + \ - str(nowtime.month) + '_' + \ - str(nowtime.day) + '_' + \ - str(nowtime.hour) + '_' + \ - str(nowtime.minute) + '_' + \ - str(nowtime.second) + '.txt' - - -# print log to logfile -def print_log_to_file(msg): - global SAVE_LOG_TO_FILE - if not SAVE_LOG_TO_FILE: - return - with open(file=BIND_CORE_RESULT_FILE, mode="a", encoding="utf-8") as f: - f.write(msg + '\n') - - -# launch training or inference process -def launch_process(cmd): - global RUNNING_CMD_PID - print_log_to_file('[INFO] Start to execute cmd: {}'.format(cmd)) - subprocess.Popen(cmd.split(), shell=False) - - -# parse input cmd -def args_parse(): - global SAVE_LOG_TO_FILE - bind_wait_core_time = 0 - parser = argparse.ArgumentParser(description='This is a sample program.') - parser.add_argument('-t', '--time', type=int, metavar='', nargs='+', help='Wait time before bind cores that you want to set. The unit is \'s\'') - parser.add_argument('-app', '--application', metavar='', nargs='+', help='Training or inference command that you want to run.') - parser.add_argument('-l', '--log', default=False, action='store_true', help='Switch to save running log to local file.') - args = parser.parse_args() - if args.application: - application_cmd = ' '.join(args.application) - launch_process(application_cmd) - time.sleep(10) - if args.time: - bind_wait_core_time = int(args.time[0]) - if args.log: - SAVE_LOG_TO_FILE = True - - # if time is set, wait for setting time before bind cores - if bind_wait_core_time != 0: - time.sleep(bind_wait_core_time) - - -# get npu affinity -def get_npu_affinity() -> bool: - global NPU_CPU_AFFINITY_DICT - global NPU_IDS - - get_npu_topo_cmd = 'npu-smi info -t topo' - p = subprocess.run(get_npu_topo_cmd.split(), shell=False, capture_output=True) - res = p.stdout.decode('utf-8').strip().split() - if not res: - print('[ERROR] Failed to run get npu affinity info, please check if driver version support cmd npu-smi info -t topo') - return False - - i = 0 - for v in res: - if '-' in v: - NPU_CPU_AFFINITY_DICT[NPU_IDS[i]] = v - i += 1 - for k in NPU_CPU_AFFINITY_DICT.keys(): - print_log_to_file('[INFO] Affinity CPU list {} for NPU {}'.format(NPU_CPU_AFFINITY_DICT[k], k)) - return True - - -# get total npu id -def get_total_npu_id() -> bool: - global NPU_IDS - get_npu_info_cmd = 'npu-smi info -l' - get_npu_info_process = subprocess.run(get_npu_info_cmd.split(), shell=False, capture_output=True) - get_npu_ids_cmd = 'grep ID' - get_npu_ids_process = subprocess.run(get_npu_ids_cmd.split(), shell=False, input=get_npu_info_process.stdout, capture_output=True) - res = get_npu_ids_process.stdout.decode('utf-8').strip().split() - for i in res: - if i.isdigit(): - NPU_IDS.append(int(i)) - if not NPU_IDS: - print('[ERROR] Failed to get total NPU id list, please make sure there is NPU on this device') - return False - print_log_to_file('[INFO] NPU total id list: {}'.format(NPU_IDS)) - return True - - -# get app pid on npu -def get_pid_on_npu() -> bool: - global RUNNING_PIDS - global NPU_IDS - print_log_to_file('[INFO] Begin to find running process on all NPUs') - RUNNING_PIDS.clear() - # get process pid on NPUs, retry times : 5 - for times in range(5): - for i in NPU_IDS: - get_npu_pids_cmd = 'npu-smi info -t proc-mem -i {} -c 0'.format(str(i)) - p = subprocess.run(get_npu_pids_cmd.split(), shell=False, capture_output=True) - res = p.stdout.decode('utf-8').strip().split() - - if 'Process' in res: - for v in res: - if v.startswith('id:'): - pid_on_npu = v.split(':')[1] - if i not in RUNNING_PIDS: - RUNNING_PIDS[i] = [int(pid_on_npu)] - else: - RUNNING_PIDS[i].append(int(pid_on_npu)) - - if RUNNING_PIDS: - break - print_log_to_file('[WARNING] Found no running process on all NPUs, retry times: {}, wait for 5 s'.format(times + 1)) - # wait 5 s for each time - time.sleep(5) - - # no running process on NPUs, stop - if not RUNNING_PIDS: - print_log_to_file('[INFO] Found no running process on all NPUs, stop bind cores') - print('[INFO] Now there is no running process on all NPUs, stop bind cores') - return False - - # delete repeat pid - for i in NPU_IDS: - if i not in RUNNING_PIDS: - continue - pids_npu = RUNNING_PIDS[i] - for n, pid in RUNNING_PIDS.items(): - if n != i and pid in pids_npu: - RUNNING_PIDS[n].remove(pid) - - for k in RUNNING_PIDS.keys(): - print_log_to_file('[INFO] Succeed to find running process {} on NPU {}'.format(RUNNING_PIDS[k], k)) - return True - - -# get device info -def get_dev_info() -> bool: - if not get_total_npu_id(): - return False - if not get_npu_affinity(): - return False - return True - - -# get process affinity -def get_process_affinity(pid): - get_affinity_cpu_cmd = 'taskset -pc {} '.format(pid) - p = subprocess.run(get_affinity_cpu_cmd.split(), shell=False, capture_output=True) - res = p.stdout.decode('utf-8').strip().split() - return res[len(res) - 1] - - -# run bind core -def run_bind_core(): - global NPU_IDS - global NPU_CPU_AFFINITY_DICT - for k, pid_list in RUNNING_PIDS.items(): - cpu_list = NPU_CPU_AFFINITY_DICT[k].split('-') - start_cpu_id = cpu_list[0] - end_cpu_id = cpu_list[1] - - for pid in pid_list: - get_child_pids_cmd = 'pstree {} -p -T'.format(pid) - p = subprocess.run(get_child_pids_cmd.split(), shell=False, capture_output=True) - res = p.stdout.decode('utf-8').strip().split() - for ele in res: - ele = re.sub(u"\\(|\\)", ",", ele) - ele_list = ele.split(',') - for sub_p in ele_list: - if sub_p.isdigit(): - sub_p = int(sub_p) - - # if process has set to right affinity, continue - current_affinity_cpu_list = get_process_affinity(sub_p) - if not current_affinity_cpu_list: - continue - current_cpu_list = current_affinity_cpu_list.split('-') - if current_cpu_list and current_cpu_list[0] == start_cpu_id and current_cpu_list[1] == end_cpu_id: - continue - print_log_to_file('[INFO] Begin to bind cores for process {} on NPU {}'.format(str(sub_p), k)) - set_affinity_cpu_cmd = 'taskset -pc {}-{} {}'.format(int(start_cpu_id), int(end_cpu_id), sub_p) - p = subprocess.run(set_affinity_cpu_cmd.split(), shell=False, capture_output=True) - print_log_to_file(p.stdout.decode('utf-8')) - - print_log_to_file('[INFO] Succeed to bind process {} on NPU {} with cpu cores list {}'.format(str(sub_p), k, NPU_CPU_AFFINITY_DICT[k])) + +class BindCoreManager(): + DEFAULT_FIND_RUNNING_PID_TIMES = 5 + + def __init__(self): + self.npu_id_list = [] + self.running_pid_on_npu = {} + self.find_running_pid_times = self.DEFAULT_FIND_RUNNING_PID_TIMES + self.npu_affinity_cpu_dict = {} + self.log_file = self._init_log_file() + + def _init_log_file(self): + now_time = datetime.now(tz=timezone.utc) + file_name = 'bind_core_' + \ + str(now_time.year) + '_' + \ + str(now_time.month) + '_' + \ + str(now_time.day) + '_' + \ + str(now_time.hour) + '_' + \ + str(now_time.minute) + '_' + \ + str(now_time.second) + '.log' + return file_name + + def print_log_to_file(self, msg: str): + with open(self.log_file, mode='a', encoding='utf-8') as f: + f.write(msg + '\n') + + def _get_all_npu_id(self) -> None: + get_npu_info_cmd = 'npu-smi info -l' + get_npu_info_process = subprocess.run(get_npu_info_cmd.split(), shell=False, capture_output=True) + get_npu_id_cmd = 'grep ID' + get_npu_id_process = subprocess.run(get_npu_id_cmd.split(), shell=False, input=get_npu_info_process.stdout, capture_output=True) + res = get_npu_id_process.stdout.decode('utf-8').split() + for i in res: + if i.isdigit(): + self.npu_id_list.append(int(i)) + self.print_log_to_file(f'[INFO] NPU total id list: {self.npu_id_list}') + + def _get_npu_affinity(self) -> bool: + cpu_num = os.cpu_count() + cpu_num_for_each_npu = cpu_num // len(self.npu_id_list) + get_npu_topo_cmd = 'npu-smi info -t topo' + p = subprocess.run(get_npu_topo_cmd.split(), shell=False, capture_output=True) + res = p.stdout.decode('utf-8').split() + if not res: + print('[ERROR] Failed to run get npu affinity info, please check if driver version support cmd npu-smi info -t topo') + return False + + i = 0 + for v in res: + if '-' in v: + cpu_list = v.split('-') + if int(cpu_list[1]) - int(cpu_list[0]) == cpu_num_for_each_npu - 1: + cpu_list[1] = str(int(cpu_list[1]) + cpu_num_for_each_npu) + self.npu_affinity_cpu_dict[self.npu_id_list[i]] = cpu_list[0] + '-' + cpu_list[1] + i += 1 + + for k in self.npu_affinity_cpu_dict.keys(): + self.print_log_to_file('[INFO] Affinity CPU list {} for NPU {}'.format(self.npu_affinity_cpu_dict[k], k)) + return True + + def get_running_pid_on_npu(self) -> bool: + no_running_pids_on_npu_msg = '[INFO] Now there is no running process on all NPUs, stop bind cores' + self.print_log_to_file('[INFO] Begin to find running process on all NPUs') + # get running process on NPUs + for times in range(self.find_running_pid_times): + if_find_pid = False + for npu_id in self.npu_id_list: + get_npu_pids_cmd = 'npu-smi info -t proc-mem -i {} -c 0'.format(npu_id) + get_npu_pids_process = subprocess.run(get_npu_pids_cmd.split(), shell=False, capture_output=True) + res = get_npu_pids_process.stdout.decode('utf-8').split() + pid_list = [] + for value in res: + if value.startswith('id:'): + pid = value.split(':')[1] + pid_list.append(pid) + if_find_pid = True + self.running_pid_on_npu[npu_id] = list(set(pid_list)) + + if not if_find_pid: + self.print_log_to_file('[WARNING] Found no running process on all NPUs, retry times: {}, wait for 10 s'.format(times + 1)) + time.sleep(10) + + # delete repeat pid + for npu_id in self.npu_id_list: + if npu_id not in self.running_pid_on_npu: + continue + pids_on_npu = self.running_pid_on_npu[npu_id] + for pid in pids_on_npu: + for npu_id_with_pids, pids in self.running_pid_on_npu.items(): + if npu_id == npu_id_with_pids: + continue + if pid in pids: + pids_on_npu.remove(pid) + + if_running_process = False + for npu_id, pids in self.running_pid_on_npu.items(): + if not bool(pids): + self.print_log_to_file('[INFO] There is no running process on NPU {}'.format(npu_id)) + else: + self.print_log_to_file('[INFO] Succeed to find running process {} on NPU {}'.format(pids, npu_id)) + if_running_process = True + if not if_running_process: + print(no_running_pids_on_npu_msg) + return if_running_process + + def get_npu_info(self) -> bool: + try: + self._get_all_npu_id() + if not self._get_npu_affinity(): + return False + except subprocess.CalledProcessError: + return False + return True + + def get_process_affinity(self, pid: int) -> list: + try: + get_process_affinity_cmd = 'taskset -pc {}'.format(pid) + p = subprocess.run(get_process_affinity_cmd.split(), shell=False, capture_output=True) + res = p.stdout.decode('utf-8').split() + except subprocess.CalledProcessError: + print('[ERROR] Failed to find process affinity') + return [] + return res[len(res) - 1] + + def run_bind_core(self): + if not self.running_pid_on_npu: + return + for k, pid_list in self.running_pid_on_npu.items(): + cpu_list = self.npu_affinity_cpu_dict[k].split('-') + start_cpu_id = cpu_list[0] + end_cpu_id = cpu_list[1] + + for pid in pid_list: + # if process has set to right affinity, continue + current_affinity_cpu_list = self.get_process_affinity(pid) + if not current_affinity_cpu_list: + continue + current_cpu_list = current_affinity_cpu_list.split('-') + if current_cpu_list and current_cpu_list[0] == start_cpu_id and current_cpu_list[1] == end_cpu_id: + continue + try: + self.print_log_to_file('[INFO] Begin to bind cores for process {} on NPU {}'.format(pid, k)) + set_affinity_cpu_cmd = 'taskset -pc {}-{} {}'.format(start_cpu_id, end_cpu_id, pid) + p = subprocess.run(set_affinity_cpu_cmd.split(), shell=False, capture_output=True) + self.print_log_to_file(p.stdout.decode('utf-8')) + except subprocess.CalledProcessError: + print('[ERROR] Failed to bind process {} on NPU {} with cpu cores list {}'.format(pid, k, self.npu_affinity_cpu_dict[k])) + + self.print_log_to_file('[INFO] Succeed to bind process {} on NPU {} with cpu cores list {}'.format(pid, k, self.npu_affinity_cpu_dict[k])) + + def args_parse(self): + bind_wait_core_time = 0 + parser = argparse.ArgumentParser(description='This is a sample program.') + parser.add_argument('-t', '--time', type=int, metavar='', nargs='+', help='Wait time before bind cores that you want to set. The unit is \'s\'') + parser.add_argument('-app', '--application', metavar='', nargs='+', help='Training or inference command that you want to run.') + args = parser.parse_args() + if args.application: + application_cmd = ' '.join(args.application) + self.launch_process(application_cmd) + time.sleep(10) + if args.time: + bind_wait_core_time = int(args.time[0]) + + # if time is set, wait for setting time before bind cores + if bind_wait_core_time != 0: + time.sleep(bind_wait_core_time) + + def launch_process(self, cmd: list): + self.print_log_to_file('[INFO] Start to execute cmd: {}'.format(cmd)) + try: + subprocess.Popen(cmd.split(), shell=False) + except subprocess.CalledProcessError: + raise RuntimeError(f'Failed to run cmd: {cmd}') if __name__ == '__main__': - print("[INFO] Begin to run bind-cores script...") - args_parse() - if not get_dev_info(): + print('[INFO] Begin to run bind-cores script...') + bind_core_manager = BindCoreManager() + bind_core_manager.args_parse() + + if not bind_core_manager.get_npu_info(): + print('[ERROR] Failed to get current npus info') + exit() + + if not bind_core_manager.get_running_pid_on_npu(): exit() + bind_core_manager.run_bind_core() + print('[INFO] End to run bind-cores script') + - while True: - if not get_pid_on_npu(): - exit() - run_bind_core() -- Gitee