From 99499b4a0efe83e8d872c3e012c1d06cdf92454f Mon Sep 17 00:00:00 2001 From: liu lili Date: Wed, 9 Apr 2025 17:59:17 +0800 Subject: [PATCH] lll: add bind cpu in ms ray worker --- vllm_mindspore/executor/ray_gpu_executor.py | 103 ++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/vllm_mindspore/executor/ray_gpu_executor.py b/vllm_mindspore/executor/ray_gpu_executor.py index d9c2aff..093ef67 100644 --- a/vllm_mindspore/executor/ray_gpu_executor.py +++ b/vllm_mindspore/executor/ray_gpu_executor.py @@ -29,12 +29,115 @@ from vllm.executor.ray_utils import RayWorkerWrapper, ray, available_resources_p from vllm.executor.ray_distributed_executor import PlacementGroupSchedulingStrategy +import subprocess +import psutil + logger = init_logger(__name__) +npu_device_dict = {"d801": "910A", "d802": "910B", "d803": "910C", "d100": "310", "d500": "310P"} + +def execute_command(cmd_list): + try: + with subprocess.Popen(cmd_list, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: + out, _ = p.communicate(timeout=1000) + + res = out.decode() + return res + except FileNotFoundError as e: + raise RuntimeError(f"Failed to execute command, because {e}.") + + +def get_numa_map(): + numa_topo_out = execute_command(["npu-smi", "info", "-t", "topo"]).strip().split("\n") + + line_no = 0 + npu_no = 0 + numa_to_npu_map = {} + numa_number = 0 + max_cpu = 0 + + numa_node = execute_command("lscpu").strip().split("\n") + for val in numa_node: + if val.startswith("CPU(s):"): + max_cpu = int(val.split(" ")[-1]) - 1 + if val.startswith("NUMA"): + nodes = val.split(" ") + numa_number = int(nodes[-1]) + break + + npu_max_cpu = False + npu_max_cpu_no = 0 + for val in numa_topo_out: + line_no += 1 + line = ''.join(val.split()) + if line.startswith("NPU") and line_no > 1: + cpu_range = line[33:] + npu_max_cpu_no = max(npu_max_cpu_no, int(cpu_range.split("-")[1])) + if numa_to_npu_map.get(cpu_range, None) is None: + numa_to_npu_map[cpu_range] = list() + numa_to_npu_map[cpu_range].append(npu_no) + npu_no += 1 + + npu_max_cpu = True if npu_max_cpu_no==max_cpu else False + shared_mode = False + if npu_no > numa_number: + shared_mode = True + + npu_to_core_map = {} + for key, val in numa_to_npu_map.items(): + cpu_range = key.split("-") + total_core_num = int(cpu_range[1]) - int(cpu_range[0]) + 1 + cpu_start = int(cpu_range[0]) + total_core_num if npu_max_cpu == False else int(cpu_range[0]) - total_core_num + cpu_end = int(cpu_range[1]) + total_core_num if npu_max_cpu == False else int(cpu_range[1]) - total_core_num + shared_mode = True + if shared_mode: + shared_npu_num = len(val) + core_num_per_npu = int(total_core_num / shared_npu_num) + else: + core_num_per_npu = total_core_num if npu_max_cpu == False else -(total_core_num) + core_start = cpu_start + for npu in val: + npu_to_core_map[npu] = [core_start, core_start + core_num_per_npu] + core_start += core_num_per_npu + + return npu_to_core_map + + +def bind_cpu(rank): + device_type = os.getenv("DEVICE_TYPE", "NPU") + if device_type != "NPU": + logger.info(f"Current platform {device_type} not supported") + return + pci_out = execute_command("lspci").strip().split("\n") + npu_device_type = "" + for val in pci_out: + if "Processing accelerators" in val and "Device" in val: + npu_device_type = npu_device_dict[val.split(" ")[-3]] + break + if npu_device_type != "910B" and npu_device_type != "910A": + logger.info(f"Current platform {npu_device_type} not supported") + return + + rank_cpu_maps = get_numa_map() + + cpu_range = rank_cpu_maps[rank] + cpu_list = list(range(cpu_range[0], cpu_range[1])) + current_process = psutil.Process() + current_process.cpu_affinity(cpu_list) + + logger.info(f"ray bind process {current_process.pid} in rank {rank} to cpu: {cpu_list}") + + class MsRayWorkerWrapper(RayWorkerWrapper): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) + + # bind cpu + npu_ids = ray.get_runtime_context().get_accelerator_ids()["NPU"] + if len(npu_ids) > 0: + npu_id = int(npu_ids[0]) + bind_cpu(npu_id) def ms_init_workers_ray(self, placement_group: "PlacementGroup", -- Gitee