diff --git "a/doc/deepseek/DeepSeek-V3&R1\351\203\250\347\275\262\346\214\207\345\215\227.md" "b/doc/deepseek/DeepSeek-V3&R1\351\203\250\347\275\262\346\214\207\345\215\227.md" index e70be4665129f0f6184a0deb769e580626e16a91..256540d8d40ebc107a34db1c4a28632f317f6e26 100644 --- "a/doc/deepseek/DeepSeek-V3&R1\351\203\250\347\275\262\346\214\207\345\215\227.md" +++ "b/doc/deepseek/DeepSeek-V3&R1\351\203\250\347\275\262\346\214\207\345\215\227.md" @@ -99,14 +99,14 @@ sh mindspore-deepseek/workspace/roles/prepare/files/lib/ascend_prepare.sh **Step1:下载oedeploy工具(下载到控制节点)** ```shell -# 下载插件包并解压 -wget https://repo.oepkgs.net/openEuler/rpm/openEuler-24.03-LTS/contrib/oedp/plugins/mindspore-deepseek.tar.gz - -tar zxvf mindspore-deepseek.tar.gz # 下载安装oedp工具,例如: wget https://repo.oepkgs.net/openEuler/rpm/openEuler-24.03-LTS/contrib/oedp/aarch64/Packages/oedp-1.0.0-2.oe2503.aarch64.rpm yum localinstall oedp-1.0.0-2.oe2503.aarch64.rpm +# 下载插件包 +git clone https://gitee.com/openeuler/llm_solution.git + +cd llm_solution/script/mindspore-deepseek ``` **Step2:调整oedeploy配置文件** @@ -353,11 +353,11 @@ npu-smi set -t reset -i $id -c $chip_id 该步骤在宿主机执行,需在所有节点执行 -**Step1:** 可使用even-iso.py绑核脚本,进行细粒度绑核提升性能 +**Step1:** 可使用fine-grainded-bind.py绑核脚本,进行细粒度绑核提升性能 ```shell # 所有节点执行 -python ./lib/even-iso.py +python ./lib/fine-grainded-bind.py ``` diff --git a/script/mindspore-deepseek/workspace/roles/prepare/files/lib/fine-grainded-bind.py b/script/mindspore-deepseek/workspace/roles/prepare/files/lib/fine-grainded-bind.py new file mode 100644 index 0000000000000000000000000000000000000000..ef27be7ca6e6d859ef1d7f0106848fddac0feb31 --- /dev/null +++ b/script/mindspore-deepseek/workspace/roles/prepare/files/lib/fine-grainded-bind.py @@ -0,0 +1,352 @@ +# Copyright 2024 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ + +""" +Utils for cann workqueue cores +""" + +import os +import psutil +import subprocess + +def int_to_binary_list(value: int, align_length: int = 4) -> list: + """ + convert int value to binary list + e.g. 13 => [1, 1, 0, 1] + current only for 0 - 15 + + Args: + value (`int`): + The int value to convert to binary list. + align_length (`int`, *optional*, defaults to `4`): + The align length for list, it will add 0 for small value + + Returns: + The binary list with the value. + """ + bin_list = [] + divider = value + remainder = 0 + while True: + remainder = divider % 2 + divider = int(divider / 2) + bin_list.append(remainder) + if divider == 0: + break + + while len(bin_list) < align_length: + bin_list.append(0) + + bin_list.reverse() + return bin_list + + +def binary_list_to_int(bin_list: list) -> int: + """ + convert binary list to int value + e.g. [1, 1, 0, 1] => 13 + current only for 0 - 15 + + Args: + bin_list (`list`): + The binary list represent to int value. + + Returns: + The int value. + """ + value = 0 + muliplier = 1 + bin_list.reverse() + for v in bin_list: + value = value + v * muliplier + muliplier *= 2 + return value + + +def string_to_bit_list(array_string: str) -> list: + """ + convert hex string to binary list + e.g. "ff" => [1, 1, 1, 1, 1, 1, 1, 1] + "deadbeef" => [1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1] + + Args: + array_string (`str`): + The binary list represent to int value. + + Returns: + The binary list for the string. + """ + bin_list = [] + for c in array_string: + bit_list = int_to_binary_list(int(c, 16)) + bin_list += bit_list + bin_list.reverse() + return bin_list + + +class BitArray: + """ + The bit array class to solve core mask string. + + Args: + length(`int`, *optional*, defaults to `0`): + The max bit length of the array. + """ + + def __init__(self, length: int = 0): + self.bits = [0 for _ in range(length)] + + def load_from_str(self, array_string: str): + """ + load bit array from hex string + + Args: + array_string (`str`): + The binary list represent to int value. + + Returns: + NA. + """ + self.bits = string_to_bit_list(array_string) + + def get_marked_index(self) -> list: + """ + get the index list with value 1 + + Args: + NA. + + Returns: + The index list. + """ + marked_index_list = [] + for idx, item in enumerate(self.bits): + if item == 1: + marked_index_list.append(idx) + return marked_index_list + + def to_bytes_array(self) -> list: + """ + convert the bit array to byte array which is 8-bit elements + + Args: + NA. + + Returns: + The array values with bytes. + """ + bytes_array = [] + slide_window_list = [] + self.bits.reverse() + for idx, item in enumerate(self.bits): + slide_window_list.append(item) + if (idx + 1) % 8 == 0: + value = binary_list_to_int(slide_window_list) + slide_window_list.clear() + bytes_array.append(value) + self.bits.reverse() + return bytes_array + + def __setitem__(self, index: int, value: int): + """ + set the bit value with index + + Args: + index (`int`): + The index to set value. + value (`int`): + The value to set. + + Returns: + NA. + """ + self.bits[index] = value + + def __getitem__(self, index: int) -> int: + """ + get the bit value with index + + Args: + index (`int`): + The index to get value. + + Returns: + The value to get. + """ + return self.bits[index] + + + +def get_cann_workqueue_cores(device_id: int) -> list: + """ + get cann workqueue binding cores list + for most system, the config is set on path: + /sys/devices/virtual/workqueue/dev0_sq_send_wq/cpumask + + Args: + device_id (`int`): + The device_id for the workqueue, most time is related to rank_ik. + + Returns: + The marked core index list. + """ + cann_workqueue_config_path = f"/sys/devices/virtual/workqueue/dev{device_id}_sq_send_wq/cpumask" + if not os.path.exists(cann_workqueue_config_path): + # no this config, return [] to disable cann binding + return [] + + f = open(cann_workqueue_config_path) + cann_config = f.read() + cann_config = cann_config.replace(",", "") + cann_config = cann_config.replace("\n", "") + mask_array = BitArray() + mask_array.load_from_str(cann_config) + return mask_array.get_marked_index() + + +def mask_to_str(mask: BitArray) -> str: + """ + convert BitArray mask to string format with workqueue config + + Args: + mask (`BitArray`): + The BitArray mask to convert to string. + + Returns: + The string followed with cann workqueue format to config. + """ + mask_bytes = mask.to_bytes_array() + mask_str = "" + separete_num = 4 + i = 0 + for mask_value in mask_bytes: + mask_str += '{:02x}'.format(mask_value) + i += 1 + if i % separete_num == 0: + mask_str += "," + mask_str = mask_str[:-1] + return mask_str + + +def execute_cmd(cmd: str, fake: bool ): + """ + execute shell command + + Args: + cmd (`str`): + The command need to execute. + fake (`bool`, *optional*, defaults to `False`): + If fake execute is True, then print command instead to execute. + + Returns: + NA. + """ + if fake: + print(cmd) + return + sub_process = subprocess.Popen(cmd, shell=True) + ret = sub_process.wait() + if ret != 0: + raise SystemError(f"Execute cmd({cmd}) failed!") + +def execute_command(cmd_list): + try: + with subprocess.Popen(cmd_list, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: + out, _ = p.communicate(timeout=1000) + res = out.decode() + return res + except FileNotFoundError as e: + raise RuntimeError(f"Failed to execute command, because {e}.") + +def get_numa_map(): + numa_topo_out = execute_command(["npu-smi", "info", "-t", "topo"]).strip().split("\n") + + line_no = 0 + npu_no = 0 + numa_to_npu_map = {} + + for val in numa_topo_out: + line_no += 1 + line = ''.join(val.split()) + if line.startswith("NPU") and line_no > 1: + cpu_range = line[33:] + if numa_to_npu_map.get(cpu_range, None) is None: + numa_to_npu_map[cpu_range] = list() + numa_to_npu_map[cpu_range].append(npu_no) + npu_no += 1 + + npu_to_core_map = {} + for key, val in numa_to_npu_map.items(): + cpu_range = key.split("-") + cpu_start = int(cpu_range[0]) + cpu_end = int(cpu_range[1]) + #total_core_num = cpu_end - cpu_start + 1 + #shared_npu_num = len(val) + #core_num_per_npu = int(total_core_num / shared_npu_num) + core_num_per_npu = cpu_end - cpu_start + 1 + core_start = cpu_start + for npu in val: + npu_to_core_map[npu] = core_start + core_num_per_npu - 1 + core_start += core_num_per_npu + + return npu_to_core_map + + +def binding_cann_workqueue(device_num: int, core_num_per_workqueue: int, separate_device_cores: bool): + """ + binding cann workqueue cores + + Args: + device_num (`int`): + The total device number on the server. + core_num_per_workqueue (`int`): + The core number for each workqueue, the core index will alloc from end core index for each device. + separate_device_cores (`int`): + If separate device cores, each device workqueue binding itself cores, + otherwise, all device workqueu binding to same cores. + + Returns: + NA. + """ + print(f"the cann workqueue config command list in the follow, please execute the cmd by root user!") + total_core_num = psutil.cpu_count(logical=True) + core_num_per_device = int(total_core_num / device_num) + + device_core_mask = BitArray(total_core_num) + end_core_map = get_numa_map() + for i in range(device_num): + cann_workqueue_config_path = f"/sys/devices/virtual/workqueue/dev{i}_sq_send_wq/cpumask" + mask = BitArray(total_core_num) + #start_core_num = i * core_num_per_device + end_core_num = end_core_map[i] #start_core_num + core_num_per_device - 1 + for j in range(core_num_per_workqueue): + core_index = end_core_num - j + mask[core_index] = 1 + device_core_mask[core_index] = 1 + + if separate_device_cores: + mask_str = mask_to_str(mask) + bind_cann_core_cmd = f"echo \"{mask_str}\" > {cann_workqueue_config_path}" + execute_cmd(bind_cann_core_cmd, False) + + if not separate_device_cores: + device_core_mask_str = mask_to_str(device_core_mask) + + for i in range(device_num): + cann_workqueue_config_path = f"/sys/devices/virtual/workqueue/dev{i}_sq_send_wq/cpumask" + bind_cann_core_cmd = f"echo \"{device_core_mask_str}\" > {cann_workqueue_config_path}" + execute_cmd(bind_cann_core_cmd) + +binding_cann_workqueue(8, 4, True) diff --git a/script/mindspore-deepseek/workspace/roles/prepare/files/lib/set_env.sh b/script/mindspore-deepseek/workspace/roles/prepare/files/lib/set_env.sh index e43b87789a534d843656588371a0ab4a1e2af644..85fca8b0e79ccd02cffcbb4315554ed6310e18bc 100644 --- a/script/mindspore-deepseek/workspace/roles/prepare/files/lib/set_env.sh +++ b/script/mindspore-deepseek/workspace/roles/prepare/files/lib/set_env.sh @@ -20,6 +20,7 @@ export ASCEND_RT_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 export ASCEND_TOTAL_MEMORY_GB=64 export HCCL_CONNECT_TIMEOUT=7200 export MS_COMPILER_CACHE_ENABLE=1 +export CPU_AFFINITY=0 ' NET_ENV=" @@ -29,17 +30,15 @@ export HCCL_SOCKET_IFNAME=$RAY_DEVICE " if [ $NODE_NUM -eq 2 ]; then - YAML_FILE='/root/Python-3.11/lib/python3.11/site-packages/research/deepseek3/deepseek_r1_671b/predict_deepseek_r1_671b_w8a8.yaml' + YAML_FILE='/usr/local/Python-3.11/lib/python3.11/site-packages/research/deepseek3/deepseek_r1_671b/predict_deepseek_r1_671b_w8a8.yaml' elif [ $NODE_NUM -eq 4 ]; then - YAML_FILE='/root/Python-3.11/lib/python3.11/site-packages/research/deepseek3/deepseek_r1_671b/predict_deepseek_r1_671b.yaml' + YAML_FILE='/usr/local/Python-3.11/lib/python3.11/site-packages/research/deepseek3/deepseek_r1_671b/predict_deepseek_r1_671b.yaml' fi # 修改权重类型 sed -e 's/^load_ckpt_format.*/load_ckpt_format: "'$MODEL_TYPE'"/' -i $YAML_FILE -if [ "$MODEL_TYPE" = "ckpt" ]; then - sed -e 's/^auto_trans_ckpt.*/auto_trans_ckpt: False/' -i $YAML_FILE -fi +sed -e 's/^auto_trans_ckpt.*/auto_trans_ckpt: False/' -i $YAML_FILE YAML_ENV="export MINDFORMERS_MODEL_CONFIG=$YAML_FILE" diff --git a/script/mindspore-deepseek/workspace/roles/prepare/files/lib/start_docker.sh b/script/mindspore-deepseek/workspace/roles/prepare/files/lib/start_docker.sh index 5e97f7fbc3a82cf65e15e594de91ceff8aa9faac..0c7c508c05069d2c356e28264579bd1cd5a7576d 100644 --- a/script/mindspore-deepseek/workspace/roles/prepare/files/lib/start_docker.sh +++ b/script/mindspore-deepseek/workspace/roles/prepare/files/lib/start_docker.sh @@ -25,7 +25,7 @@ if [ $IS_STOP_OTHER_CONTAINER -ne 0 ]; then fi # 如果存在名称相同的容器,则直接使用 -docker ps -a | grep $IMAGE_NAME:$IMAGE_TAG | grep $CONTAINER_NAME +docker ps -a | grep $IMAGE_NAME:$IMAGE_TAG | grep -w $CONTAINER_NAME if [ $? -eq 0 ]; then echo "发现容器 $CONTAINER_NAME 已存在,直接使用" docker start $CONTAINER_NAME @@ -33,7 +33,7 @@ if [ $? -eq 0 ]; then fi # 如果存在名称相同,但镜像不同容器,则报错 -docker ps -a | grep $CONTAINER_NAME +docker ps -a | grep -w $CONTAINER_NAME if [ $? -eq 0 ]; then echo "发现容器名称 $CONTAINER_NAME 已被使用,请排查" exit 1 diff --git a/script/mindspore-deepseek/workspace/roles/prepare/files/prepare.sh b/script/mindspore-deepseek/workspace/roles/prepare/files/prepare.sh index c000112f0f6e7f2ce8909a6679eec3d802479ace..2504c2b58231409fe196ee69c13dbacea147bebc 100644 --- a/script/mindspore-deepseek/workspace/roles/prepare/files/prepare.sh +++ b/script/mindspore-deepseek/workspace/roles/prepare/files/prepare.sh @@ -43,7 +43,7 @@ main() { fi # 检测需要部署的节点ip数量 - if [ [ $NODE_NUM -ne 2 ] && [ $NODE_NUM -ne 4 ] ]; then + if [ $NODE_NUM -ne 2 ] && [ $NODE_NUM -ne 4 ]; then echo "当前仅支持两/四节点部署,当前数量是$NODE_NUM" exit 1 fi @@ -59,6 +59,17 @@ main() { # 3. 设置容器内环境变量 docker exec -it $CONTAINER_NAME /workspace/lib/set_env.sh + # 4. 进行绑核 + pip list | grep psutil + if [ $? -ne 0 ]; then + pip install psutil + fi + python $current_path/lib/fine-grainded-bind.py + if [ $? -ne 0 ]; then + echo "细粒度线程绑核失败,请确保驱动版本>=24.1.0" + exit 1 + fi + } # 执行主函数