diff --git a/README.md b/README.md index 601deec66865dd8734eca30f74b67372c721150d..fcabe6f78bb7287840e96b93d1897edfff424875 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,10 @@ MindStudio Training Tools,MindStudio训练工具链。针对训练&大模型 将Ascend PyTorch Profiler或者msprof采集的PyThon场景性能数据进行分析,并输出性能调优建议。 +4. [bind_core](https://gitee.com/ascend/mstt/blob/master/profiler/affinity_cpu_bind) + + 绑核脚本,支持非侵入修改工程代码,实现一键式绑核功能 + ### [Tensorboard](https://gitee.com/ascend/mstt/tree/master/plugins/tensorboard-plugins/tb_plugin) diff --git a/profiler/affinity_cpu_bind/README.md b/profiler/affinity_cpu_bind/README.md new file mode 100644 index 0000000000000000000000000000000000000000..8c3b47ed5183fd2dbade8fc316e0319b8feea880 --- /dev/null +++ b/profiler/affinity_cpu_bind/README.md @@ -0,0 +1,40 @@ +# 昇腾亲和性CPU绑核工具 + +昇腾亲和性CPU绑核工具支持用户无需修改代码,直接运行工具即可实现按CPU亲和性策略绑核,提升推理或训练性能。 + +绑核工具用户arm服务器环境,对于训练或推理任务因为CPU资源调度等出现host_bound问题时使用,可改善该问题;对于非host_bound的场景无明显改善效果。 + +## 使用须知 + +使用绑核工具前手动执行npu-smi info -t topo,出现以下类似信息,说明环境支持绑核,否则请将环境HDK包升级到Ascend HDK 23.0.RC2及以上版本。 + + NPU0 NPU1 NPU2 NPU3 NPU4 NPU5 NPU6 NPU7 NPUx CPU Affinity + NPU0 X HCCS HCCS HCCS HCCS HCCS HCCS HCCS ... xx-xx + NPU1 HCCS X HCCS HCCS HCCS HCCS HCCS HCCS ... xx-xx + NPU2 HCCS HCCS X HCCS HCCS HCCS HCCS HCCS ... xx-xx + NPU3 HCCS HCCS HCCS X HCCS HCCS HCCS HCCS ... xx-xx + NPU4 HCCS HCCS HCCS HCCS X HCCS HCCS HCCS ... xx-xx + NPU5 HCCS HCCS HCCS HCCS HCCS X HCCS HCCS ... xx-xx + NPU6 HCCS HCCS HCCS HCCS HCCS HCCS X HCCS ... xx-xx + NPU7 HCCS HCCS HCCS HCCS HCCS HCCS HCCS X ... xx-xx + NPUx ... ... ... ... ... ... ... ... ... ... + +## 使用方式 + +1.执行以下命令实施绑核: + + - 直接执行绑核命令 +```bash +python3 bind_core.py -app/--application="inferenec/train cmd" +``` +该方式会自动拉起训练或推理任务,检测任务进程,并实施绑核。 + + - 手动拉起训练或推理任务后再执行绑核 +```bash +python3 bind_core.py +``` +该方式会循环查找(循环5次,每次10s,若找不到进程,则直接退出)使用到NPU的任务进程,并实施绑核。 + +2.绑核运行过程的日志会保存到当前路径的bind_core_时间戳.log。 + +3.如果推理或训练进程拉起后需要一定时间预处理,才会真正执行任务,可在执行绑核命令时设置-t/--time参数(单位秒),绑核工具会在延迟配置的时间后,再实施绑核动作。例如:python3 bind_core.py -app="cmd" -t=10,配置后工具会在10秒后执行绑核操作。 \ No newline at end of file diff --git a/profiler/affinity_cpu_bind/bind_core.py b/profiler/affinity_cpu_bind/bind_core.py new file mode 100644 index 0000000000000000000000000000000000000000..912081a31f519c56b04910b5df624947fa45d325 --- /dev/null +++ b/profiler/affinity_cpu_bind/bind_core.py @@ -0,0 +1,212 @@ +import subprocess +import argparse +import os +import time +import logging +from datetime import datetime +from datetime import timezone + + +class PathManager: + DATA_FILE_AUTHORITY = 0o640 + + @classmethod + def create_file_safety(cls, path: str): + base_name = os.path.basename(path) + msg = f"Failed to create file: {base_name}" + if os.path.islink(path): + raise RuntimeError(msg) + if os.path.exists(path): + return + try: + os.close(os.open(path, os.O_WRONLY | os.O_CREAT, cls.DATA_FILE_AUTHORITY)) + except Exception as err: + raise RuntimeError(msg) from err + + +class BindCoreManager(): + DEFAULT_FIND_RUNNING_PID_TIMES = 5 + + def __init__(self): + self.npu_id_list = [] + self.running_pid_on_npu = {} + self.find_running_pid_times = self.DEFAULT_FIND_RUNNING_PID_TIMES + self.npu_affinity_cpu_dict = {} + self.log_file = '' + self._init_log_file() + + def get_running_pid_on_npu(self) -> bool: + no_running_pids_on_npu_msg = '[INFO] Now there is no running process on all NPUs, stop bind cores' + logging.info('Begin to find running process on all NPUs') + # get running process on NPUs + for _ in range(self.find_running_pid_times): + running_pid_on_npu = {} + for npu_id in self.npu_id_list: + get_npu_pids_cmd = 'npu-smi info -t proc-mem -i {} -c 0'.format(npu_id) + get_npu_pids_process = subprocess.run(get_npu_pids_cmd.split(), shell=False, capture_output=True) + res = get_npu_pids_process.stdout.decode('utf-8').split() + pid_list = [] + for value in res: + if value.startswith('id:'): + pid = value.split(':')[1] + pid_list.append(pid) + if pid_list: + running_pid_on_npu[npu_id] = list(set(pid_list)) + + if len(self.running_pid_on_npu.keys()) == len(running_pid_on_npu.keys()) and running_pid_on_npu: + self.running_pid_on_npu = running_pid_on_npu + break + + self.running_pid_on_npu = running_pid_on_npu + time.sleep(5) + + # delete repeat pid + for npu_id in self.npu_id_list: + if npu_id not in self.running_pid_on_npu: + continue + pids_on_npu = self.running_pid_on_npu[npu_id] + for npu_id_with_pids, pids in self.running_pid_on_npu.items(): + if npu_id == npu_id_with_pids: + continue + pids_on_npu = list(set(pids_on_npu) - set(pids)) + self.running_pid_on_npu[npu_id] = pids_on_npu + + if_running_process = False + for npu_id, pids in self.running_pid_on_npu.items(): + if not pids: + logging.info('There is no running process on NPU %d', npu_id) + else: + logging.info('Succeed to find running process %s on NPU %d', pids, npu_id) + if_running_process = True + if not if_running_process: + print(no_running_pids_on_npu_msg) + return if_running_process + + def get_npu_info(self) -> bool: + try: + self._get_all_npu_id() + if not self._get_npu_affinity(): + return False + except subprocess.CalledProcessError: + return False + return True + + def run_bind_core(self): + if not self.running_pid_on_npu: + return + for npu, pid_list in self.running_pid_on_npu.items(): + if npu not in self.npu_affinity_cpu_dict.keys(): + logging.warning('Cannot find affinity cpu for npu: %d', npu) + continue + affinity_cpu = self.npu_affinity_cpu_dict.get(npu) + for pid in pid_list: + try: + logging.info('Begin to bind cores for process %d on NPU %d', pid, npu) + set_affinity_cpu_cmd = 'taskset -pc {} {}'.format(affinity_cpu, pid) + p = subprocess.run(set_affinity_cpu_cmd.split(), shell=False, capture_output=True) + logging.info(p.stdout.decode('utf-8')) + except subprocess.CalledProcessError: + print('[ERROR] Failed to bind process {} on NPU {} with cpu cores list {}'.format(pid, npu, affinity_cpu)) + + logging.info('Succeed to bind process %d on NPU %d with cpu cores list %s', pid, npu, affinity_cpu) + + def args_parse(self): + parser = argparse.ArgumentParser(description='This is a affinity cpu core bind script.') + parser.add_argument('-t', '--time', type=int, metavar='', help='Wait time before bind cores that you want to set. The unit is \'s\'.') + parser.add_argument('-app', '--application', metavar='', nargs='+', help='Training or inference command that you want to run.') + args = parser.parse_args() + if args.application: + application_cmd = ' '.join(args.application) + self._launch_process(application_cmd) + time.sleep(2) + # if time is set, wait for setting time before bind cores + if args.time: + time.sleep(args.time) + + def _init_log_file(self): + now_time = datetime.now(tz=timezone.utc) + time_stamp = str(now_time.year) + '_' + \ + str(now_time.month) + '_' + \ + str(now_time.day) + '_' + \ + str(now_time.hour) + '_' + \ + str(now_time.minute) + '_' + \ + str(now_time.second) + log_file_name = 'bind_core_' + time_stamp + '.log' + msg = f"Failed to create file: {log_file_name}" + try: + PathManager.create_file_safety(os.path.join(os.getcwd(), log_file_name)) + except RuntimeError as err: + raise RuntimeError(msg) from err + self.log_file = log_file_name + logging.basicConfig(filename=self.log_file, + level=logging.INFO, + filemode='w', + format='%(asctime)s-%(name)s-%(levelname)s-%(message)s') + + def _get_all_npu_id(self) -> None: + get_npu_info_cmd = 'npu-smi info -l' + get_npu_info_process = subprocess.run(get_npu_info_cmd.split(), shell=False, capture_output=True) + get_npu_id_cmd = 'grep ID' + get_npu_id_process = subprocess.run(get_npu_id_cmd.split(), shell=False, input=get_npu_info_process.stdout, capture_output=True) + res = get_npu_id_process.stdout.decode('utf-8').split() + for i in res: + if i.isdigit(): + self.npu_id_list.append(int(i)) + logging.info('NPU total id list: %s', self.npu_id_list) + + def _get_npu_affinity(self) -> bool: + cpu_num = os.cpu_count() + cpu_num_for_each_npu = cpu_num // len(self.npu_id_list) + get_npu_topo_cmd = 'npu-smi info -t topo' + p = subprocess.run(get_npu_topo_cmd.split(), shell=False, capture_output=True) + res = p.stdout.decode('utf-8').split() + if not res: + print('[ERROR] Failed to run get npu affinity info, please check if driver version support cmd npu-smi info -t topo') + return False + + index = 0 + for v in res: + if '-' in v: + affinity_cpus = [] + cpu_lists = v.split(',') + for cpu_list in cpu_lists: + cpus = cpu_list.split('-') + if len(cpus) != 2: + continue + if int(cpus[1]) - int(cpus[0]) == cpu_num_for_each_npu - 1: + cpus[1] = str(int(cpus[1]) + cpu_num_for_each_npu) + affinity_cpus.append(cpus[0] + '-' + cpus[1]) + if index < len(self.npu_id_list): + self.npu_affinity_cpu_dict[self.npu_id_list[index]] = ','.join(affinity_cpu for affinity_cpu in affinity_cpus) + index += 1 + else: + print('[ERROR] Get affinity_cpu_list for {} npus, more than real npu num: {}'.format(index + 1, len(self.npu_id_list))) + return False + + for k in self.npu_affinity_cpu_dict.keys(): + logging.info('Affinity CPU list [%s] for NPU %d', self.npu_affinity_cpu_dict[k], k) + return True + + def _launch_process(self, cmd: list): + logging.info('Start to execute cmd: %s', cmd) + try: + subprocess.Popen(cmd.split(), shell=False) + except subprocess.CalledProcessError as e: + raise RuntimeError(f'Failed to run cmd: {cmd}') from e + + +if __name__ == '__main__': + print('[INFO] Begin to run bind-cores script...') + bind_core_manager = BindCoreManager() + bind_core_manager.args_parse() + + if not bind_core_manager.get_npu_info(): + print('[ERROR] Failed to get current npus info') + exit() + + if not bind_core_manager.get_running_pid_on_npu(): + exit() + bind_core_manager.run_bind_core() + print('[INFO] End to run bind-cores script, the log is saved in {}'.format(bind_core_manager.log_file)) + +