diff --git a/vllm_dp/ep.patch b/vllm_dp/ep.patch new file mode 100644 index 0000000000000000000000000000000000000000..9b7be259855e9883d3f65f917d81055176f9cbdd --- /dev/null +++ b/vllm_dp/ep.patch @@ -0,0 +1,80 @@ +diff --git a/vllm/distributed/parallel_state.py b/vllm/distributed/parallel_state.py +index fa493fefb..1426354ec 100644 +--- a/vllm/distributed/parallel_state.py ++++ b/vllm/distributed/parallel_state.py +@@ -761,6 +761,14 @@ def get_dp_group() -> GroupCoordinator: + return _DP + + ++_EP: Optional[GroupCoordinator] = None ++ ++ ++def get_ep_group() -> GroupCoordinator: ++ assert _EP is not None, ("expert parallel group is not initialized") ++ return _EP ++ ++ + def get_pp_group() -> GroupCoordinator: + assert _PP is not None, ( + "pipeline model parallel group is not initialized") +@@ -954,10 +962,21 @@ def initialize_model_parallel( + backend, + group_name="dp") + ++ global _EP ++ assert _EP is None, ("expert parallel group is already initialized") ++ group_ranks = all_ranks.transpose(1, 2).reshape( ++ -1, data_parallel_size * tensor_model_parallel_size).unbind(0) ++ group_ranks = [x.tolist() for x in group_ranks] ++ _EP = init_model_parallel_group(group_ranks, ++ get_world_group().local_rank, ++ backend, ++ group_name="ep") ++ + logger.info( + "rank %s in world size %s is assigned as " +- "DP rank %s, PP rank %s, TP rank %s", rank, world_size, +- _DP.rank_in_group, _PP.rank_in_group, _TP.rank_in_group) ++ "DP rank %s, PP rank %s, TP rank %s, EP rank %s", rank, world_size, ++ _DP.rank_in_group, _PP.rank_in_group, _TP.rank_in_group, ++ _EP.rank_in_group) + + + def ensure_kv_transfer_initialized(vllm_config: "VllmConfig") -> None: +@@ -1068,6 +1087,10 @@ def destroy_model_parallel(): + _DP.destroy() + _DP = None + ++ global _EP ++ if _EP: ++ _EP.destroy() ++ _EP = None + + def destroy_distributed_environment(): + global _WORLD +diff --git a/vllm/envs.py b/vllm/envs.py +index 6067f5bdd..1becf9e1b 100644 +--- a/vllm/envs.py ++++ b/vllm/envs.py +@@ -106,6 +106,8 @@ if TYPE_CHECKING: + VLLM_TPU_DISABLE_TOPK_TOPP_OPTIMIZATION: bool = False + VLLM_TPU_BUCKET_PADDING_GAP: int = 0 + VLLM_USE_DEEP_GEMM: bool = False ++ VLLM_MOE_DP_CHUNK_SIZE: int = 256 ++ VLLM_ALL2ALL_BACKEND: str = "naive" + + + def get_default_cache_root(): +@@ -692,6 +694,12 @@ environment_variables: dict[str, Callable[[], Any]] = { + # Allow use of DeepGemm kernels for fused moe ops. + "VLLM_USE_DEEP_GEMM": + lambda: bool(int(os.getenv("VLLM_USE_DEEP_GEMM", "0"))), ++ ++ "VLLM_ALL2ALL_BACKEND": ++ lambda: os.getenv("VLLM_ALL2ALL_BACKEND", "naive"), ++ ++ "VLLM_MOE_DP_CHUNK_SIZE": ++ lambda: int(os.getenv("VLLM_MOE_DP_CHUNK_SIZE", "256")), + } + + # end-env-vars-definition diff --git a/vllm_mindspore/config.py b/vllm_mindspore/config.py index 08fb6ab0b6dfe4963685852f7e723d1d465c6a19..521e3b2ff1039d2a88836d0ca6faecb852a5b9ae 100644 --- a/vllm_mindspore/config.py +++ b/vllm_mindspore/config.py @@ -23,7 +23,7 @@ import sys import threading import time from collections import Counter -from typing import Optional, Union +from typing import Optional, Union, TypeVar import torch import vllm.envs as envs @@ -425,3 +425,14 @@ def stateless_destroy_socket_process_group( dp_group.close() logger.info("Socket process group for rank %d destroyed.", dp_group.rank) + +T = TypeVar("T") + +def get_layers_from_vllm_config(vllm_config: VllmConfig, + layer_type: type[T]) -> dict[str, T]: + return { + layer_name: layer + for layer_name, layer in + vllm_config.compilation_config.static_forward_context.items() + if isinstance(layer, layer_type) + } diff --git a/vllm_mindspore/model_executor/layers/fused_moe/__init__.py b/vllm_mindspore/model_executor/layers/fused_moe/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..29502460a9ecc18e5da0d9797cf15e513427e76d --- /dev/null +++ b/vllm_mindspore/model_executor/layers/fused_moe/__init__.py @@ -0,0 +1 @@ +from .layer import FusedMoE \ No newline at end of file diff --git a/vllm_mindspore/model_executor/layers/fused_moe/fused_moe-bak.py b/vllm_mindspore/model_executor/layers/fused_moe/fused_moe-bak.py new file mode 100644 index 0000000000000000000000000000000000000000..a7b3bf7dadf03f1afef9c3c3dc4cc96bf01d35fe --- /dev/null +++ b/vllm_mindspore/model_executor/layers/fused_moe/fused_moe-bak.py @@ -0,0 +1,173 @@ +from typing import Optional + +from mindspore import Tensor, mint, ops +from mindspore.ops.auto_generate import (GroupedMatmulV4, + FusedAddTopKDiv, + MoeInitRoutingV2, + MoeTokenUnpermute) +import mindspore as ms +from vllm.distributed.parallel_state import get_ep_group, get_dp_group + +def fused_topk( + hidden_states: Tensor, + gating_output: Tensor, + topk: int, + renormalize: bool, + indices_type = None, +) -> tuple[Tensor, Tensor]: + score = mint.softmax(gating_output, dim=-1) + topk_weights, topk_ids = mint.topk( + score, + k=topk, + dim=-1 + ) + if renormalize: + topk_weights = topk_weights / topk_weights.sum(dim=-1, keepdim=True) + + if indices_type is not None: + topk_ids = topk_ids.to(indices_type) + return topk_weights, topk_ids + + +def grouped_topk( + hidden_states: Tensor, + gating_output: Tensor, + topk: int, + renormalize: bool, + num_expert_group: int = 0, + topk_group: int = 0, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[Tensor] = None +) -> tuple[Tensor, Tensor]: + fused_add_topk_div = FusedAddTopKDiv() + assert hidden_states.shape[0] == gating_output.shape[0], ( + "Number of tokens mismatch") + scoring_type = 0 # sigmoid + topk_in_group = 2 + topk_weights, topk_ids = fused_add_topk_div( + gating_output, + e_score_correction_bias, + num_expert_group, + topk_group, + topk, + topk_in_group, + scoring_type, + renormalize) + + return topk_weights, topk_ids + + +def fused_experts(hidden_states: Tensor, + w1: Tensor, + w2: Tensor, + topk_weights: Tensor, + topk_ids: Tensor, + activation: str = "silu", + global_num_experts: int = -1, + apply_router_weight_on_input: bool = False, + expert_map: Optional[Tensor] = None, + tp_size: int = 1, + ep_size: int = 0) -> Tensor: + + if tp_size >= 1: + # no ep, pure tp + if ep_size == 1: + hidden_states = _run_tp_moe(hidden_states, w1, w2, topk_ids, topk_weights, + activation, global_num_experts, + apply_router_weight_on_input) + # ep_size > 1 : pure ep or tp + ep + else: + # pure ep + if tp_size == 1: + hidden_states = _run_ep_moe(hidden_states, w1, w2, topk_ids, topk_weights, + activation, global_num_experts, + apply_router_weight_on_input) + # tp_size > 1 : tp + ep + else: + hidden_states = _run_tp_ep_moe(hidden_states, w1, w2, topk_ids, topk_weights, + activation, global_num_experts, + apply_router_weight_on_input) + + return hidden_states + +def _gate_activation(gate, activation): + if activation == "silu": + return mint.nn.functional.silu(gate) + elif activation == "gelu": + return mint.nn.functional.gelu(gate) + else: + raise ValueError(f"Unsupported activation function: {activation}") + + +group_matmul_ops = GroupedMatmulV4() +moe_init_routing_op = MoeInitRoutingV2() +moe_token_unpermute = MoeTokenUnpermute() + +def _group_matmul(hidden_states, weight, group_list): + return group_matmul_ops([hidden_states], [weight], + None, None, None, None, None, None, + group_list, split_item=3, group_type=0, group_list_type=1)[0] + +def _run_ep_moe(hidden_states, + w1, + w2, + topk_ids, + topk_weights, + activation, + global_num_experts, + apply_router_weight_on_input): + hidden_states = _group_matmul(hidden_states, w1, topk_ids) + hidden_states = _gate_activation(hidden_states, activation) + hidden_states = _group_matmul(hidden_states, w2, topk_ids) + return hidden_states + + +def _run_tp_moe(hidden_states, + w1, + w2, + topk_ids, + topk_weights, + activation, + global_num_experts, + apply_router_weight_on_input): + topk_weights = topk_weights.astype(hidden_states.dtype) + topk_ids = topk_ids.astype(ms.int32) + + sorted_input_tensor, unsort_map, group_list, _ = \ + moe_init_routing_op( + hidden_states, + topk_ids, + active_num=0, + expert_capacity=0, + expert_num=global_num_experts, + drop_pad_mode=0, + expert_tokens_count_or_cumsum_flag=2, + expert_tokens_before_capacity_flag=True) + + group_list = group_list.astype(ms.int64) + + gate_hidden_out = _group_matmul(sorted_input_tensor, mint.transpose(w1, -1, -2), group_list) + gate, hidden = mint.split(gate_hidden_out, + (w1.shape[1] // 2, w1.shape[1] // 2), -1) + gate = _gate_activation(gate, activation) + hidden = mint.mul(hidden, gate) + expert_output = _group_matmul(hidden, mint.transpose(w2, -1, -2), group_list) + expert_output = mint.nan_to_num(expert_output, 0, 0, 0) + moe_output = moe_token_unpermute(permuted_tokens=expert_output, + sorted_indices=unsort_map, + probs=topk_weights, + padded_mode=False, + restore_shape=None) + return moe_output + + +def _run_tp_ep_moe(hidden_states, + w1, + w2, + group_list, + group_logits, + activation, + global_num_experts, + apply_router_weight_on_input): + raise NotImplementedError( + "TP + EP MoE is not implemented yet. Please use pure TP or pure EP MoE instead.") diff --git a/vllm_mindspore/model_executor/layers/fused_moe/fused_moe.py b/vllm_mindspore/model_executor/layers/fused_moe/fused_moe.py new file mode 100644 index 0000000000000000000000000000000000000000..8e7b65e8dd0d339fd72e9774c30f0fd6fe3d3366 --- /dev/null +++ b/vllm_mindspore/model_executor/layers/fused_moe/fused_moe.py @@ -0,0 +1,452 @@ +from typing import Optional + +import numpy as np +from mindspore import Tensor, mint, ops, nn +from mindspore.ops.auto_generate import (GroupedMatmulV4, + FusedAddTopKDiv, + MoeInitRoutingV2, + MoeTokenUnpermute, + MoeDistributeDispatch, + MoeDistributeCombine) +import mindspore as ms +from vllm.distributed.parallel_state import (get_ep_group, get_tp_group, get_tensor_model_parallel_rank, + get_tensor_model_parallel_world_size) +from vllm_mindspore.distributed.communication_op import ReduceFromModelParallelRegion +from vllm_mindspore.utils import is_910b + +def fused_topk( + hidden_states: Tensor, + gating_output: Tensor, + topk: int, + renormalize: bool, + indices_type = None, +) -> tuple[Tensor, Tensor]: + score = mint.softmax(gating_output, dim=-1) + topk_weights, topk_ids = mint.topk( + score, + k=topk, + dim=-1 + ) + if renormalize: + topk_weights = topk_weights / topk_weights.sum(dim=-1, keepdim=True) + + if indices_type is not None: + topk_ids = topk_ids.to(indices_type) + return topk_weights, topk_ids + + +def grouped_topk( + hidden_states: Tensor, + gating_output: Tensor, + topk: int, + renormalize: bool, + num_expert_group: int = 0, + topk_group: int = 0, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[Tensor] = None +) -> tuple[Tensor, Tensor]: + fused_add_topk_div = FusedAddTopKDiv() + assert hidden_states.shape[0] == gating_output.shape[0], ( + "Number of tokens mismatch") + scoring_type = 0 # sigmoid + topk_in_group = 2 + topk_weights, topk_ids = fused_add_topk_div( + gating_output, + e_score_correction_bias, + num_expert_group, + topk_group, + topk, + topk_in_group, + scoring_type, + renormalize) + + return topk_weights, topk_ids + + +class FusedExperts(nn.Cell): + def __init__(self, moe_config): + super().__init__() + self.group_matmul_ops = GroupedMatmulV4() + self.moe_init_routing_op = MoeInitRoutingV2() + self.moe_token_unpermute = MoeTokenUnpermute() + + self.pure_tp = False + self.pure_ep = False + + self.experts_num = moe_config.num_experts + self.local_expert_num = moe_config.num_local_experts + self.ep_size = moe_config.moe_parallel_config.ep_size + self.ep_rank = moe_config.moe_parallel_config.ep_rank + if self.ep_size > 1: + experts_num_map = [(self.experts_num // self.ep_size) + for _ in range(self.ep_size - 1)] + experts_num_map.append(self.experts_num - ((self.experts_num // self.ep_size) * (self.ep_size - 1))) + self.experts_num_map = experts_num_map + self.ep_group = get_ep_group().device_group._name + # self.ep_rank = get_ep_group().rank_in_group + + # pure ep mode + if moe_config.moe_parallel_config.ep_size > 1 and \ + moe_config.moe_parallel_config.tp_size == 1: + self.pure_ep = True + + # some configuration for tensor model parallel region + self.tp_rank = get_tensor_model_parallel_rank() + self.tp_world_size = get_tensor_model_parallel_world_size() + self.tp_group = get_tp_group().device_group._name + self.all_reduce_across_tp = ReduceFromModelParallelRegion() + self.broadcast_to_tensor_parallel_region = ops.Broadcast(0, group=self.tp_group) + + self.use_dispatch_kernels = moe_config.use_dispatch_kernels + + if self.use_dispatch_kernels: + # some configuration for dispatch and combine + self.dispatch = MoeDistributeDispatch() # only support in 910b and 910_A3 + self.combine = MoeDistributeCombine() # only support in 910b and 910_A3 + self.dispatch_tp_world_size = 0 if is_910b() else 1 # 910b:0, 910_A3:1 + self.dispatch_shared_expert_num = 0 if is_910b() else 1 # 910b:0, 910_A3:1 + self.max_bs = 256 if is_910b() else 512 # max b*s in single npu + self.max_bs *= self.ep_size + + else: + # some configuration for experts + self.hidden_size = moe_config.hidden_dim + + # some configuration for alltoall communication + experts_num_map_np = np.array(self.experts_num_map, dtype=np.int64) + experts_num_map_cu_np = np.cumsum(experts_num_map_np, dtype=np.int64) + self.experts_num_map_cu_index = ms.Tensor(experts_num_map_cu_np - 1, dtype=ms.int64) + + if self.tp_rank == 0: + self.send_experts_num_map = ms.Tensor(experts_num_map, dtype=ms.int64) + else: + self.send_experts_num_map = mint.zeros(self.ep_size, dtype=ms.int64) + + recv_num_map_list = [] + recv_list_index = [] + for i in range(self.ep_size): + if i % self.tp_world_size == 0: + recv_num_map_list.append(moe_config.num_local_experts) + recv_list_index.append(i) + else: + recv_num_map_list.append(0) + self.recv_experts_num_map = ms.Tensor(recv_num_map_list, dtype=ms.int64) + self.recv_list_index = ms.Tensor(recv_list_index, dtype=ms.int64) + + self.prepend_tensor = ms.Tensor([0], dtype=ms.int64) + + self.all_to_all_v_across_ep_with_block_size = ops.AlltoAllV(block_size=self.hidden_size, + group=self.ep_group) + self.all_to_all_v_across_ep = ops.AlltoAllV(group=self.ep_group) + self.even_list = [1 for _ in range(self.ep_size)] + + self.dummy_token = mint.zeros((1, self.hidden_size), dtype=moe_config.in_dtype) + + self.depend = ops.Depend() + + # pure tp mode + elif moe_config.moe_parallel_config.ep_size == 1 and \ + moe_config.moe_parallel_config.tp_size >= 1: + self.pure_tp = True + # tp + ep mode + else: + experts_num_map_np = np.array(self.experts_num_map, dtype=np.int32) + experts_num_map_cu_np = np.cumsum(experts_num_map_np, dtype=np.int32) + self.expert_start_index = 0 if self.ep_rank == 0 else int(experts_num_map_cu_np[self.ep_rank - 1]) + self.expert_end_index = int(experts_num_map_cu_np[self.ep_rank]) + # raise NotImplementedError("tp + ep mode not support yet.") + + def construct(self, + hidden_states: Tensor, + w1: Tensor, + w2: Tensor, + topk_weights: Tensor, + topk_ids: Tensor, + activation: str = "silu", + global_num_experts: int = -1, + apply_router_weight_on_input: bool = False, + expert_map: Optional[Tensor] = None) -> Tensor: + + if self.pure_tp: + hidden_states = self.run_tp_moe(hidden_states, w1, w2, topk_ids, topk_weights, + activation, global_num_experts, + apply_router_weight_on_input) + # ep_size > 1 : pure ep or tp + ep + elif self.pure_ep: + # pure ep + hidden_states = self.run_ep_moe(hidden_states, w1, w2, topk_ids, topk_weights, + activation, global_num_experts, + apply_router_weight_on_input) + # tp_size > 1 : tp + ep + else: + hidden_states = self.run_tp_ep_moe(hidden_states, w1, w2, topk_ids, topk_weights, + activation, global_num_experts, + apply_router_weight_on_input) + + return hidden_states + + + def _gate_activation(self, gate, activation): + if activation == "silu": + return mint.nn.functional.silu(gate) + elif activation == "gelu": + return mint.nn.functional.gelu(gate) + else: + raise ValueError(f"Unsupported activation function: {activation}") + + def _group_matmul(self, hidden_states, weight, group_list): + return self.group_matmul_ops([hidden_states], [weight], + None, None, None, None, None, None, + group_list, split_item=3, group_type=0, group_list_type=1)[0] + + def _ffn(self, hidden_state, w1, w2, group_list, activation): + gate_hidden_out = self._group_matmul(hidden_state, mint.transpose(w1, -1, -2), group_list) + gate, hidden = mint.split(gate_hidden_out, + (w1.shape[1] // 2, w1.shape[1] // 2), -1) + gate = self._gate_activation(gate, activation) + hidden = mint.mul(hidden, gate) + expert_output = self._group_matmul(hidden, mint.transpose(w2, -1, -2), group_list) + expert_output = mint.nan_to_num(expert_output, 0, 0, 0) + return expert_output + + def run_tp_moe(self, + hidden_states, + w1, + w2, + topk_ids, + topk_weights, + activation, + global_num_experts, + apply_router_weight_on_input): + topk_weights = topk_weights.astype(hidden_states.dtype) + topk_ids = topk_ids.astype(ms.int32) + + sorted_input_tensor, unsort_map, group_list, _ = \ + self.moe_init_routing_op( + hidden_states, + topk_ids, + active_num=0, + expert_capacity=0, + expert_num=global_num_experts, + drop_pad_mode=0, + expert_tokens_count_or_cumsum_flag=2, + expert_tokens_before_capacity_flag=True) + + group_list = group_list.astype(ms.int64) + + expert_output = self._ffn(sorted_input_tensor, w1, w2, group_list, activation) + + moe_output = self.moe_token_unpermute(permuted_tokens=expert_output, + sorted_indices=unsort_map, + probs=topk_weights, + padded_mode=False, + restore_shape=None) + return moe_output + + + def run_tp_ep_moe(self, + hidden_states, + w1, + w2, + topk_ids, + topk_weights, + activation, + global_num_experts, + apply_router_weight_on_input): + topk_weights = topk_weights.astype(hidden_states.dtype) + topk_ids = topk_ids.astype(ms.int32) + + topk_mask = topk_ids < self.expert_start_index + local_topk_ids = topk_ids - self.expert_start_index + local_topk_ids = local_topk_ids.astype(ms.int32) + # trick: if tp + ep moe, means ep_size > 1, and expert will be + # distributed across ep_size, so max(local_topk_ids) < self.experts_num - 1. + # It will allow ffn not compute the expert output which are not assigned to this ep rank. + local_topk_ids = ops.masked_fill(local_topk_ids, topk_mask, self.experts_num - 1) + + weight_mask = local_topk_ids >= self.local_expert_num + topk_weights = ops.masked_fill(topk_weights, weight_mask, 0) + + sorted_input_tensor, unsort_map, group_list, _ = \ + self.moe_init_routing_op( + hidden_states, + local_topk_ids, + active_num=0, + expert_capacity=0, + expert_num=global_num_experts, + drop_pad_mode=0, + expert_tokens_count_or_cumsum_flag=2, + expert_tokens_before_capacity_flag=True) + + group_list = group_list[:self.local_expert_num] + group_list = group_list.astype(ms.int64) + expert_output = self._ffn(sorted_input_tensor, w1, w2, group_list, activation) + expert_output = mint.nan_to_num(expert_output, 0, 0, 0) + moe_output = self.moe_token_unpermute(permuted_tokens=expert_output, + sorted_indices=unsort_map, + probs=topk_weights, + padded_mode=False, + restore_shape=None) + return moe_output + + + def run_ep_moe(self, + hidden_states, + w1, + w2, + topk_ids, + topk_weights, + activation, + global_num_experts, + apply_router_weight_on_input): + topk_weights = topk_weights.astype(hidden_states.dtype) + topk_ids = topk_ids.astype(ms.int32) + + if self.use_dispatch_kernels: + return self._ep_with_dispatch_combine( + hidden_states, + w1, + w2, + topk_ids, + topk_weights, + activation, + global_num_experts, + apply_router_weight_on_input + ) + + return self._ep_with_all_to_all_v( + hidden_states, + w1, + w2, + topk_ids, + topk_weights, + activation, + global_num_experts, + apply_router_weight_on_input) + + def _ep_with_all_to_all_v(self, + hidden_states, + w1, + w2, + topk_ids, + topk_weights, + activation, + global_num_experts, + apply_router_weight_on_input): + + sorted_input_tensor, unsort_map, group_list, _ = \ + self.moe_init_routing_op( + hidden_states, + topk_ids, + active_num=0, + expert_capacity=0, + expert_num=global_num_experts, + drop_pad_mode=0, + expert_tokens_count_or_cumsum_flag=2, + expert_tokens_before_capacity_flag=True) + + # group_list = group_list.reshape(1, -1) + group_list = group_list.astype(ms.int64) + + local_group_list = self.all_to_all_v_across_ep(group_list, + self.send_experts_num_map, + self.recv_experts_num_map) + + local_group_list = local_group_list.reshape(-1, self.local_expert_num) + recv_list_value = local_group_list.sum(dim=-1) + recv_list = mint.zeros(self.ep_size, dtype=ms.int64) + recv_list[self.recv_list_index] = recv_list_value + + if self.tp_rank == 0: + group_list_cumsum = mint.cumsum(group_list, 0, dtype=ms.int64) + # expert index = [3, 7, 11, 15] (self.ep_group_size,) + # 看下每个rank, 发送多少tensor 数据给其他的rank + send_list = group_list_cumsum[self.experts_num_map_cu_index] # [20, 30, 40, 50] + send_list = mint.diff(send_list, prepend=self.prepend_tensor) + else: + send_list = mint.zeros(self.ep_size, dtype=ms.int64) # [0, 0, 0, 0] + + # recv_list = self.all_to_all_v_across_ep(send_list, self.even_list, self.even_list) + # recv_list [20, 40, 60, 70] + recv_list = self.depend(recv_list, local_group_list) + local_input_tensor = self.all_to_all_v_across_ep_with_block_size(sorted_input_tensor.reshape(-1), + send_list, + recv_list) + + topk_ids_1d, _ = mint.sort(topk_ids.reshape(-1)) + topk_ids_1d = self.depend(topk_ids_1d, local_input_tensor) + topk_ids_local = self.all_to_all_v_across_ep(topk_ids_1d, send_list, recv_list) + + local_group_list = local_group_list.sum(dim=0) + recv_tokens = recv_list.sum() + if recv_tokens > 0: + _, resort_index = mint.sort(topk_ids_local) + _, unresort_index = mint.sort(resort_index) + + local_input_tensor = local_input_tensor.reshape(-1, self.hidden_size) + local_input_tensor = mint.index_select(local_input_tensor, 0, resort_index) + + expert_output = self._ffn(local_input_tensor, w1, w2, local_group_list, activation) + + expert_output = mint.index_select(expert_output, 0, unresort_index) + else: + expert_output = self.dummy_token + expert_output = self.depend(expert_output, topk_ids_local) + expert_output = self.all_to_all_v_across_ep_with_block_size(expert_output.reshape(-1), + recv_list, + send_list) + if self.tp_rank == 0: + expert_output = expert_output.reshape(-1, self.hidden_size) + moe_output = self.moe_token_unpermute(permuted_tokens=expert_output, + sorted_indices=unsort_map, + probs=topk_weights, + padded_mode=False, + restore_shape=None) + else: + # hidden_states = self.depend(hidden_states, expert_output) + # moe_output = hidden_states + moe_output = mint.zeros_like(hidden_states) + moe_output = self.depend(moe_output, expert_output) + + # if self.tp_world_size > 0: + # moe_output = self.broadcast_to_tensor_parallel_region((moe_output,))[0] + moe_output = self.all_reduce_across_tp(moe_output) + return moe_output + + def _ep_with_dispatch_combine(self, hidden_states, w1, w2, topk_ids, topk_weights, + activation, global_num_experts, + apply_router_weight_on_input): + """fused ops, moe feed forward with dispatch and combine.""" + # Dispatch + expand_x, _, expand_idx, expert_token_nums, ep_recv_counts, tp_recv_counts, _ = self.dispatch( + x=hidden_states, + expert_ids=topk_ids, + ep_world_size=self.ep_size, + ep_rank_id=self.ep_rank, + moe_expert_num=global_num_experts, + group_ep=self.ep_group, + tp_world_size=self.dispatch_tp_world_size, + shared_expert_num=self.dispatch_shared_expert_num, + global_bs=self.max_bs, + expert_token_nums_type=1) + + # GroupMamtul + ffn_res = self._ffn(expand_x, w1, w2, expert_token_nums, activation) + + # Combine + moe_output = self.combine( + expand_x=ffn_res, + expert_ids=topk_ids, + expand_idx=expand_idx, + ep_send_counts=ep_recv_counts, + expert_scales=topk_weights.astype(ms.float32), + ep_world_size=self.ep_size, + ep_rank_id=self.ep_rank, + moe_expert_num=global_num_experts, + tp_send_counts=tp_recv_counts, + group_ep=self.ep_group, + tp_world_size=self.dispatch_tp_world_size, + shared_expert_num=self.dispatch_shared_expert_num, + global_bs=self.max_bs) + + return moe_output diff --git a/vllm_mindspore/model_executor/layers/fused_moe/layer.py b/vllm_mindspore/model_executor/layers/fused_moe/layer.py new file mode 100644 index 0000000000000000000000000000000000000000..1f5454677165e3a2f37912d0bcde5686ff82cd15 --- /dev/null +++ b/vllm_mindspore/model_executor/layers/fused_moe/layer.py @@ -0,0 +1,1013 @@ +# SPDX-License-Identifier: Apache-2.0 + +import importlib +from abc import abstractmethod +from dataclasses import dataclass +from enum import Enum +from typing import Callable, Optional + +import vllm.envs as envs +from vllm.config import ParallelConfig, get_current_vllm_config +from vllm.distributed import (get_dp_group, get_ep_group, + get_tensor_model_parallel_rank, + get_tensor_model_parallel_world_size) +from vllm.logger import init_logger +from vllm.model_executor.layers.quantization.base_config import QuantizationConfig +from vllm.model_executor.utils import set_weight_attrs +from vllm.forward_context import get_forward_context +# from vllm.model_executor.layers.fused_moe.layer import FusedMoEParallelConfig + +from vllm.model_executor.layers.fused_moe.layer import (determine_expert_map, + FusedMoeWeightScaleSupported, + FusedMoEMethodBase, + ) +from vllm.config import get_current_vllm_config + +# from vllm_mindspore.model_executor.layers.fused_moe.fused_moe import (fused_topk, +# grouped_topk, +# fused_experts) +from vllm_mindspore.model_executor.layers.fused_moe.fused_moe import (fused_topk, + grouped_topk, + FusedExperts) +from vllm_mindspore.model_executor.layers.quantization.base_config import QuantizeMethodBase +from vllm_mindspore.distributed.communication_op import ReduceFromModelParallelRegion +from vllm_mindspore.model_executor.model_loader.weight_utils import ( + split_loaded_weight) + +from mindspore import nn, Tensor, Parameter, mint, ops, from_numpy +import mindspore as ms +from mindspore.ops import ReduceOp + +logger = init_logger(__name__) + + +@dataclass +class FusedMoEParallelConfig: + tp_size: int + dp_size: int + ep_size: int + tp_rank: int + dp_rank: int + ep_rank: int + + use_ep: bool # whether to use EP or not + + @property + def use_all2all_kernels(self): + return self.dp_size > 1 and self.use_ep and self.tp_size == 1 + + @property + def use_dispatch_kernels(self): + return (self.use_all2all_kernels + and envs.VLLM_ALL2ALL_BACKEND == 'dispatch') + + @staticmethod + def make(tp_size_: int, dp_size_: int, + vllm_parallel_config: ParallelConfig) -> "FusedMoEParallelConfig": + """ + Determine MoE parallel configuration. Based on the input tp_size_, + dp_size_, ep_size_ and vllm's parallel config, determine what + level's of parallelism to use in the fused moe layer. + + Args: + tp_size_ (int): tp_size passed into the FusedMoE constructor. + dp_size_ (int): dp_size passed into the FusedMoE constructor. + ep_size_ (int): ep_size passed into the FusedMoE constructor. + vllm_parallel_config (ParallelConfig): vllm's parallel config + object. + + Examples: + When there is no parallelism requested, i.e. tp_size_ = dp_size_ = 1, + we simply return the sizes unaltered and the ranks set to 0. + + Expert Parallelism is considered only when either dp_size_ or tp_size_ + is non trivial. + + When TP = 2, DP = 1 and EP = False, the configuration on different + devices, + - device 0 : TP = {2, 0} DP = {1, 0} EP = {1, 0} // + legend : {size, rank} + - device 1 : TP = {2, 1} DP = {1, 0} EP = {1, 0} + - Comment : Tensors are sharded across 2 devices. + + When TP = 1, DP = 2 and EP = False, the configuration on different + devices, + - device 0 : TP = {2, 0} DP = {2, 0} EP = {1, 0} + - device 1 : TP = {2, 1} DP = {2, 1} EP = {1, 0} + - Comment: There are 2 engine instances and the tensors are sharded + across 2 decvices. + + When TP = 2, DP = 2 and EP = False, the configuration on different + devices, + - device 0: TP = {4, 0} DP = {2, 0} EP = {1, 0} + - device 1: TP = {4, 1} DP = {2, 0} EP = {1, 0} + - device 2: TP = {4, 2} DP = {2, 1} EP = {1, 0} + - device 3: TP = {4, 3} DP = {2, 1} EP = {1, 0} + - Comment: There are 2 engine instances and the tensors are sharded + across 4 devices. + + When, TP = 2, DP = 1 and EP = True, the configuration on different + devices, + - device 0: TP = {1, 0} DP = {1, 0} EP = {2, 0} + - device 1: TP = {1, 0} DP = {1, 0} EP = {2, 1} + - Comment: The experts are split between the 2 devices. + + When, TP = 1, DP = 2 and EP = True, the configuration on different + devices, + - device 0: TP = {1, 0} DP = {2, 0} EP = {2, 0} + - device 1: TP = {1, 0} DP = {2, 1} EP = {2, 1} + - Comment: There are 2 engine instances and the experts are split + between the 2 devices. + + When TP = 2, DP = 2 and EP = True, the configuration on different + devices, + - device 0: TP = {1, 0} DP = {2, 0} EP = {4, 0} + - device 1: TP = {1, 0} DP = {2, 0} EP = {4, 1} + - device 2: TP = {1, 0} DP = {2, 1} EP = {4, 2} + - device 3: TP = {1, 0} DP = {2, 1} EP = {4, 3} + - Comment: There are 2 engine instances and the experts are split + between the 4 devices. + """ + + def flatten_tp_across_dp(dp_rank: int): + tp_rank = 0 if tp_size_ == 1 else get_tensor_model_parallel_rank() + # There are actually dp_size_ * tp_size_ devices. Update tp_size + # and tp_rank so we shard across all devices. + tp_size = dp_size_ * tp_size_ + tp_rank = dp_rank * tp_size_ + tp_rank + return tp_size, tp_rank + + use_ep = (dp_size_ * tp_size_ > 1 + and vllm_parallel_config.enable_expert_parallel) + + dp_size = dp_size_ + dp_rank = get_dp_group().rank_in_group if dp_size > 1 else 0 + tp_size, tp_rank = flatten_tp_across_dp(dp_rank) + + if not use_ep: + return FusedMoEParallelConfig(tp_size=tp_size, + tp_rank=tp_rank, + dp_size=dp_size, + dp_rank=dp_rank, + ep_size=1, + ep_rank=0, + use_ep=False) + # DP + EP / TP + EP / DP + TP + EP + assert use_ep + + vllm_config = get_current_vllm_config() + if vllm_config.additional_config is not None and vllm_config.additional_config.get("ep_size", None) is not None: + custom_ep_size = int(vllm_config.additional_config.get("ep_size", None)) + ep_size = custom_ep_size + tp_size = tp_size // custom_ep_size + tp_rank = tp_rank % tp_size + ep_rank = get_ep_group().rank_in_group // tp_size + return FusedMoEParallelConfig(tp_size=tp_size, + tp_rank=tp_rank, + dp_size=dp_size, + dp_rank=dp_rank, + ep_size=ep_size, + ep_rank=ep_rank, + use_ep=True) + else: + # In EP, each device owns a set of experts fully. There is no tensor + # parallel update tp_size, tp_rank, ep_size and ep_rank to reflect that. + ep_size = tp_size + ep_rank = tp_rank + return FusedMoEParallelConfig(tp_size=1, + tp_rank=0, + dp_size=dp_size, + dp_rank=dp_rank, + ep_size=ep_size, + ep_rank=ep_rank, + use_ep=True) + + +@dataclass +class MoEConfig: + num_experts: int + experts_per_token: int + hidden_dim: int + + num_local_experts: int + moe_parallel_config: FusedMoEParallelConfig + + in_dtype: ms.dtype.Type # The activation type. + quant_dtype: ms.dtype.Type = None + + # TODO: add more quantization params, blocked, per-token, etc. + block_size: int = 128 + + max_num_tokens: int = envs.VLLM_MOE_DP_CHUNK_SIZE + + def __post_init__(self): + if self.dp_size > 1: + logger.debug("Using MOEConfig::max_num_tokens=%d", + self.max_num_tokens) + + @property + def tp_size(self): + return self.moe_parallel_config.tp_size + + @property + def dp_size(self): + return self.moe_parallel_config.dp_size + + @property + def ep_size(self): + return self.moe_parallel_config.ep_size + + @property + def tp_rank(self): + return self.moe_parallel_config.tp_rank + + @property + def dp_rank(self): + return self.moe_parallel_config.dp_rank + + @property + def ep_rank(self): + return self.moe_parallel_config.ep_rank + + @property + def use_ep(self): + return self.moe_parallel_config.use_ep + + @property + def use_dispatch_kernels(self): + return self.moe_parallel_config.use_dispatch_kernels + +class FusedMoEMethodBase(QuantizeMethodBase): + + @abstractmethod + def create_weights(self, layer: nn.Cell, num_experts: int, + hidden_size: int, intermediate_size_per_partition: int, + params_dtype, **extra_weight_attrs): + raise NotImplementedError + + @abstractmethod + def apply( + self, + layer: nn.Cell, + x: Tensor, + router_logits: Tensor, + top_k: int, + renormalize: bool, + use_grouped_topk: bool = False, + topk_group: Optional[int] = None, + num_expert_group: Optional[int] = None, + global_num_experts: int = -1, + expert_map: Optional[Tensor] = None, + custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[Tensor] = None, + apply_router_weight_on_input: bool = False, + activation: str = "silu", + ) -> Tensor: + raise NotImplementedError + +class UnquantizedFusedMoEMethod(FusedMoEMethodBase, nn.Cell): + """MoE method without quantization.""" + + def __init__(self, moe: MoEConfig): + super().__init__() + # self.fused_experts = fused_experts # type: ignore + self.fused_experts = FusedExperts(moe) + self.moe = moe + + def create_weights(self, layer: nn.Cell, num_experts: int, + hidden_size: int, intermediate_size_per_partition: int, + params_dtype, **extra_weight_attrs): + # Fused gate_up_proj (column parallel) + w13_weight = Parameter(mint.empty( + num_experts, + 2 * intermediate_size_per_partition, + hidden_size, + dtype=params_dtype), + requires_grad=False) + layer.insert_param_to_cell("w13_weight", w13_weight) + set_weight_attrs(w13_weight, extra_weight_attrs) + + # down_proj (row parallel) + w2_weight = Parameter(mint.empty( + num_experts, + hidden_size, + intermediate_size_per_partition, + dtype=params_dtype), + requires_grad=False) + layer.insert_param_to_cell("w2_weight", w2_weight) + set_weight_attrs(w2_weight, extra_weight_attrs) + + def apply( + self, + layer: nn.Cell, + x: Tensor, + router_logits: Tensor, + top_k: int, + renormalize: bool, + use_grouped_topk: bool = False, + topk_group: Optional[int] = None, + num_expert_group: Optional[int] = None, + global_num_experts: int = -1, + expert_map: Optional[Tensor] = None, + custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[Tensor] = None, + apply_router_weight_on_input: bool = False, + activation: str = "silu", + ) -> Tensor: + return self.forward_npu( + x=x, + layer=layer, + router_logits=router_logits, + top_k=top_k, + renormalize=renormalize, + use_grouped_topk=use_grouped_topk, + topk_group=topk_group, + num_expert_group=num_expert_group, + global_num_experts=global_num_experts, + expert_map=expert_map, + custom_routing_function=custom_routing_function, + scoring_func=scoring_func, + e_score_correction_bias=e_score_correction_bias, + activation=activation, + apply_router_weight_on_input=apply_router_weight_on_input) + + def forward_npu( + self, + layer: nn.Cell, + x: Tensor, + use_grouped_topk: bool, + top_k: int, + router_logits: Tensor, + renormalize: bool, + topk_group: Optional[int] = None, + num_expert_group: Optional[int] = None, + global_num_experts: int = -1, + expert_map: Optional[Tensor] = None, + custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[Tensor] = None, + apply_router_weight_on_input: bool = False, + activation: str = "silu", + ) -> Tensor: + topk_weights, topk_ids = FusedMoE.select_experts( + hidden_states=x, + router_logits=router_logits, + use_grouped_topk=use_grouped_topk, + top_k=top_k, + renormalize=renormalize, + topk_group=topk_group, + num_expert_group=num_expert_group, + custom_routing_function=custom_routing_function, + scoring_func=scoring_func, + e_score_correction_bias=e_score_correction_bias, + indices_type=None) + + return self.fused_experts( + hidden_states=x, + w1=layer.w13_weight, + w2=layer.w2_weight, + topk_weights=topk_weights, + topk_ids=topk_ids, + activation=activation, + global_num_experts=global_num_experts, + apply_router_weight_on_input=apply_router_weight_on_input, + expert_map=expert_map, + ) + + + +class FusedMoE(nn.Cell): + """FusedMoE layer for MoE models. + + This layer contains both MergedColumnParallel weights (gate_up_proj / + w13) and RowParallelLinear weights (down_proj/ w2). + + Note: Mixtral uses w1, w2, and w3 for gate, up, and down_proj. We + copy that naming convention here and handle any remapping in the + load_weights function in each model implementation. + + Args: + num_experts: Number of experts in the model + top_k: Number of experts selected for each token + hidden_size: Input hidden state size of the transformer + intermediate_size: Intermediate size of the experts + params_dtype: Data type for the parameters. + reduce_results: Whether to all all_reduce on the output of the layer + renomalize: Whether to renormalize the logits in the fused_moe kernel + quant_config: Quantization configure. + """ + + def __init__( + self, + num_experts: int, # Global number of experts + top_k: int, + hidden_size: int, + intermediate_size: int, + params_dtype = None, + reduce_results: bool = False, + renormalize: bool = True, + use_grouped_topk: bool = False, + num_expert_group: Optional[int] = None, + topk_group: Optional[int] = None, + quant_config: Optional[QuantizationConfig] = None, + tp_size: Optional[int] = None, + ep_size: Optional[int] = None, + dp_size: Optional[int] = None, + prefix: str = "", + custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[Tensor] = None, + apply_router_weight_on_input: bool = False, + activation: str = "silu", + ): + super().__init__() + + if params_dtype is None: + params_dtype = get_current_vllm_config().model_config.dtype + self.params_dtype = params_dtype + + vllm_config = get_current_vllm_config() + self.moe_parallel_config: FusedMoEParallelConfig = ( + FusedMoEParallelConfig.make( + tp_size_=(tp_size if tp_size is not None else + get_tensor_model_parallel_world_size()), + dp_size_=(dp_size if dp_size is not None else + get_dp_group().world_size), + vllm_parallel_config=vllm_config.parallel_config)) + + self.global_num_experts = num_experts + + # For smuggling this layer into the fused moe custom op + self.use_direct_call = self.dp_size == 1 + if not self.use_direct_call: + compilation_config = vllm_config.compilation_config + if prefix in compilation_config.static_forward_context: + raise ValueError("Duplicate layer name: {}".format(prefix)) + compilation_config.static_forward_context[prefix] = self + self.layer_name = prefix + + # Determine expert maps + if self.use_ep: + self.local_num_experts, self.expert_map = determine_expert_map( + ep_size=self.ep_size, + ep_rank=self.ep_rank, + global_num_experts=self.global_num_experts) + else: + self.local_num_experts, self.expert_map = (self.global_num_experts, + None) + + self.top_k = top_k + + assert intermediate_size % self.tp_size == 0 + self.hidden_size = hidden_size + self.intermediate_size_per_partition = intermediate_size // self.tp_size + self.reduce_results = reduce_results + self.renormalize = renormalize + self.use_grouped_topk = use_grouped_topk + if self.use_grouped_topk: + assert num_expert_group is not None and topk_group is not None + self.num_expert_group = num_expert_group + self.topk_group = topk_group + self.custom_routing_function = custom_routing_function + self.scoring_func = scoring_func + self.e_score_correction_bias = e_score_correction_bias + self.apply_router_weight_on_input = apply_router_weight_on_input + self.activation = activation + + if self.scoring_func != "softmax" and not self.use_grouped_topk: + raise ValueError("Only softmax scoring function is supported for " + "non-grouped topk.") + + moe = MoEConfig( + num_experts=self.global_num_experts, + experts_per_token=top_k, + hidden_dim=hidden_size, + num_local_experts=self.local_num_experts, + moe_parallel_config=self.moe_parallel_config, + # TODO (bnell): this needs to be fixed for quantized types. + in_dtype=params_dtype, + max_num_tokens=envs.VLLM_MOE_DP_CHUNK_SIZE, + ) + self.moe_config = moe + self.quant_config = quant_config + + # Note: get_quant_method will look at the layer's local_num_experts + # for heuristic purposes, so it must be initialized first. + quant_method: Optional[QuantizeMethodBase] = None + + if quant_config is None: + quant_method = UnquantizedFusedMoEMethod(moe) + else: + quant_method = quant_config.get_quant_method(self, prefix) + + assert quant_method is not None + assert isinstance(quant_method, FusedMoEMethodBase) + self.quant_method = quant_method + + moe_quant_params = { + "num_experts": self.local_num_experts, + "hidden_size": hidden_size, + "intermediate_size_per_partition": + self.intermediate_size_per_partition, + "params_dtype": params_dtype, + "weight_loader": self.weight_loader, + } + # need full intermediate size pre-sharding for WNA16 act order + if (self.quant_method.__class__.__name__ + in ("GPTQMarlinMoEMethod", + "CompressedTensorsWNA16MarlinMoEMethod", + "CompressedTensorsWNA16MoEMethod")): + moe_quant_params["intermediate_size_full"] = intermediate_size + + self.quant_method.create_weights(layer=self, **moe_quant_params) + + self.dp_group = get_dp_group().device_group._name + self.ep_group = get_ep_group().device_group._name + + self.tp_world_size = get_tensor_model_parallel_world_size() + + self.all_reduce_from_tp_group = ReduceFromModelParallelRegion() + + # pure_tp means using tensor parallelism only, no expert parallelism. + self.pure_tp = False + self.tp_ep = False + self.pure_ep = False + + # self.ep_size == 1, means use tensor parallelism to compute moe. + if self.ep_size == 1: + self.pure_tp = True + # self.ep_size > 1, means use expert parallelism or expert parallelism mix tensor parallelism. + else: + if self.tp_size == 1: + self.pure_ep = True + self.all_reduce_max_across_dp = ops.AllReduce(op=ReduceOp.MAX, group=self.dp_group) + else: + self.tp_ep = True + # self.moe_tp_group = get_moe_tp_group() + # self.moe_dp_group = get_moe_dp_group() + # self.all_reduce_from_moe_tp_group = ops.AllReduce(group=self.moe_tp_group) + # self.reduce_scatter_from_dp_group = ops.ReduceScatter(group=self.moe_dp_group) + + if (self.pure_tp or self.tp_ep) and self.dp_size > 1: + self.all_gather_from_dp_group = ops.AllGather(group=self.dp_group) + self.all_reduce_from_dp_group = ops.AllReduce(group=self.dp_group) + self.reduce_scatter_from_ep_group = ops.ReduceScatter(group=self.ep_group) + self.reduce_scatter_from_dp_group = ops.ReduceScatter(group=self.dp_group) + + @property + def tp_size(self): + return self.moe_parallel_config.tp_size + + @property + def dp_size(self): + return self.moe_parallel_config.dp_size + + @property + def ep_size(self): + return self.moe_parallel_config.ep_size + + @property + def tp_rank(self): + return self.moe_parallel_config.tp_rank + + @property + def dp_rank(self): + return self.moe_parallel_config.dp_rank + + @property + def ep_rank(self): + return self.moe_parallel_config.ep_rank + + @property + def use_ep(self): + return self.moe_parallel_config.use_ep + + @property + def use_dispatch_kernels(self): + return self.moe_parallel_config.use_dispatch_kernels + + def _load_w13(self, param: Parameter, shard_dim: int, + shard_id: str, loaded_weight: Tensor, expert_id: int, + tp_rank: int, load_full: bool = False): + + # Index the loaded weight for tp sharding. + # gate_up_proj: "MergedColumnParallel", so tp sharding on output_dim + shard_size = param.shape[shard_dim + 1] // 2 + # loaded_weight = loaded_weight.narrow(shard_dim, shard_size * tp_rank, + # shard_size) + loaded_weight = split_loaded_weight(loaded_weight, shard_dim, + shard_size * tp_rank, shard_size) + loaded_weight = from_numpy(loaded_weight) + # Narrow parameter and load. + # w1, gate_proj: Load into first logical weight of w13. + if not load_full: + if shard_id == "w1": + if shard_dim == 1: + param[expert_id, :, 0:shard_size] = loaded_weight + else: + assert shard_dim == 0 + param[expert_id, 0:shard_size, :] = loaded_weight + # w3, up_proj: Load into second logical weight of w13. + else: + assert shard_id == "w3" + if shard_dim == 1: + param[expert_id, :, shard_size:shard_size*2] = loaded_weight + else: + assert shard_dim == 0 + param[expert_id, shard_size:shard_size*2, :] = loaded_weight + else: + if shard_id == "w1": + if shard_dim == 2: + param[:, :, 0:shard_size] = loaded_weight + else: + assert shard_dim == 1 + param[:, 0:shard_size, :] = loaded_weight + # w3, up_proj: Load into second logical weight of w13. + else: + assert shard_id == "w3" + if shard_dim == 2: + param[:, :, shard_size:shard_size*2] = loaded_weight + else: + assert shard_dim == 1 + param[:, shard_size:shard_size*2, :] = loaded_weight + + def _load_w2(self, + param: Parameter, + shard_dim: int, + loaded_weight: Tensor, + tp_rank: int, + expert_id: int, + load_full: bool = False): + + # Index the loaded weight for tp sharding. + # down_proj: "RowParallel" so tp sharding on input_dim + # Narrow parameter and load. + if not load_full: + shard_size = param.shape[shard_dim + 1] + # loaded_weight = loaded_weight.narrow(shard_dim, + # shard_size * tp_rank, + # shard_size) + loaded_weight = split_loaded_weight(loaded_weight, shard_dim, + shard_size * tp_rank, + shard_size) + param[expert_id] = from_numpy(loaded_weight) + # w2, down_proj: Load into only logical weight of w2. + else: + param.set_data(from_numpy(loaded_weight)) + + def _load_single_value(self, param: Parameter, + loaded_weight: Tensor, expert_id: int): + loaded_weight = loaded_weight[:] + param[expert_id] = from_numpy(loaded_weight) + + def _load_g_idx(self, shard_id: str, param: Parameter, + shard_dim: int, loaded_weight: Tensor, tp_rank: int, + expert_id: int): + + if shard_id == "w2": + self._load_w2(shard_dim=shard_dim, + loaded_weight=loaded_weight, + param=param, + expert_id=expert_id, + tp_rank=tp_rank) + else: + assert shard_id in ("w1", "w3") + loaded_weight = loaded_weight[:] + param[expert_id] = from_numpy(loaded_weight) + + def _map_global_expert_id_to_local_expert_id(self, expert_id: int) -> int: + if self.expert_map is None: + return expert_id + return self.expert_map[expert_id].item() + + def _load_model_weight_or_group_weight_scale(self, + shard_dim: int, + param: Parameter, + shard_id: str, + loaded_weight: Tensor, + tp_rank: int, + expert_id: int, + load_full_w2: bool = False, + load_full_w3: bool = False): + """ + Load grouped weight scales for group quantization or model weights + :param shard_dim: dimension to shard + :param expert_data: parameter for a particular expert + :param shard_id: either w1, w2, or w3 + :param loaded_weight: checkpoint weight to load into the param + :param tp_rank: tensor parallel rank + :param load_full_w2: whether or not the w2 loaded should be sharded. + """ + if shard_id == "w2": + # In the case where we have actorder/g_idx, we do not partition the + # w2 scales, as indicated by `load_full` argument, for all tp cases + self._load_w2(shard_dim=shard_dim, + loaded_weight=loaded_weight, + param=param, + tp_rank=tp_rank, + expert_id=expert_id, + load_full=load_full_w2) + elif shard_id in ("w1", "w3"): + self._load_w13(shard_id=shard_id, + shard_dim=shard_dim, + loaded_weight=loaded_weight, + param=param, + expert_id=expert_id, + tp_rank=tp_rank, + load_full=load_full_w3) + + def weight_loader(self, param: Parameter, + loaded_weight: Tensor, weight_name: str, + shard_id: str, expert_id: int) -> None: + + expert_id = self._map_global_expert_id_to_local_expert_id(expert_id) + if expert_id == -1: + return + + if shard_id not in ("w1", "w2", "w3"): + raise ValueError(f"shard_id must be ['w1','w2','w3'] but " + f"got {shard_id}.") + + WEIGHT_SCALE_SUPPORTED = [ + e.value for e in FusedMoeWeightScaleSupported + ] + # Fetch the dim to shard the parameter/loaded weight + # based on the shard id. This will be whatever + # dimension intermediate_size_per_partition is used. + SHARD_ID_TO_SHARDED_DIM = {"w1": 0, "w2": 1, "w3": 0} + + # is_transposed: if the dim to shard the weight + # should be flipped. Required by GPTQ, compressed-tensors + # should be whatever dimension intermediate_size_per_partition is + is_transposed = getattr(param, "is_transposed", False) + shard_dim = SHARD_ID_TO_SHARDED_DIM[shard_id] + if is_transposed: + shard_dim = int(not shard_dim) + + loaded_weight = loaded_weight[:] + full_load = len(loaded_weight.shape) == 3 + if full_load: + shard_dim += 1 + + # Case g_idx + if "g_idx" in weight_name: + self._load_g_idx(shard_dim=0, + shard_id=shard_id, + loaded_weight=loaded_weight, + param=param, + tp_rank=self.tp_rank, + expert_id=expert_id) + return + + # Case weight_shape + if "weight_shape" in weight_name: + # only required by compressed-tensors + self._load_single_value(param=param, + loaded_weight=loaded_weight, + expert_id=expert_id) + return + + # Case model weights + if "weight" in weight_name: + self._load_model_weight_or_group_weight_scale( + shard_id=shard_id, + shard_dim=shard_dim, + loaded_weight=loaded_weight, + param=param, + expert_id=expert_id, + tp_rank=self.tp_rank) + return + + @staticmethod + def select_experts(hidden_states: Tensor, + router_logits: Tensor, + top_k: int, + use_grouped_topk: bool, + renormalize: bool, + topk_group: Optional[int] = None, + num_expert_group: Optional[int] = None, + custom_routing_function: Optional[Callable] = None, + scoring_func: str = "softmax", + e_score_correction_bias: Optional[Tensor] = None, + indices_type=None): + + # DeekSeekv2 uses grouped_top_k + if use_grouped_topk: + assert topk_group is not None + assert num_expert_group is not None + topk_weights, topk_ids = grouped_topk( + hidden_states=hidden_states, + gating_output=router_logits, + topk=top_k, + renormalize=renormalize, + num_expert_group=num_expert_group, + topk_group=topk_group, + scoring_func=scoring_func, + e_score_correction_bias=e_score_correction_bias) + if indices_type is not None: + topk_ids = topk_ids.to(dtype=indices_type) + elif custom_routing_function is None: + topk_weights, topk_ids = fused_topk( + hidden_states=hidden_states, + gating_output=router_logits, + topk=top_k, + renormalize=renormalize, + indices_type=indices_type, + ) + else: + topk_weights, topk_ids = custom_routing_function( + hidden_states=hidden_states, + gating_output=router_logits, + topk=top_k, + renormalize=renormalize) + if indices_type is not None: + topk_ids = topk_ids.to(dtype=indices_type) + + return topk_weights, topk_ids + + def must_reduce_shared_expert_outputs(self) -> bool: + # If dp_size == 1, means routed expert use the same tensor parallel group as shared expert. + # And meanwhile if ep_size == 1, it means using tensor parallel to compute routed expert. + # So we can delay the shared expert outputs reduce after the routed expert and + # the shared expert are added. + return not self.pure_tp + + def maybe_all_reduce_tensor_model_parallel( + self, final_hidden_states: Tensor): + """ + To all_reduce after routed expert and shared expert are added. + """ + # Do delay allreduce If "must_reduce_shared_expert_outputs" return True + if self.pure_tp or self.tp_ep: + return self.all_reduce_from_tp_group(final_hidden_states) + return final_hidden_states + + def construct(self, hidden_states: Tensor, + router_logits: Tensor, + dp_pad_index, + dp_unpad_index, + dp_pad_index_with_offset, + dp_unpad_index_total_with_offset): + if self.use_dispatch_kernels: + return self.forward_impl_chunked(hidden_states, router_logits) + + return self.forward_impl(hidden_states, router_logits, dp_pad_index, + dp_unpad_index, dp_pad_index_with_offset, + dp_unpad_index_total_with_offset) + + def forward_impl(self, hidden_states: Tensor, + router_logits: Tensor, dp_pad_index, dp_unpad_index, + dp_pad_index_total_with_offset, + dp_unpad_index_total_with_offset): + """ + If dp_world_size == 4, dp_rank == 1, tokens_num across dp is [1, 3, 4, 2], then + dp_pad_index = [0, 1, 2, 0] + dp_unpad_index = [0, 1, 2] + dp_pad_index_total_with_offset = [0, 0, 0, 0, 1, 2, 3, 0, 4, 5, 6, 0, 7, 8, 0, 0] + dp_unpad_index_total_with_offset = [0, 4, 5, 6, 8, 9, 10, 11, 12, 13] + """ + if (self.pure_tp or self.tp_ep) and self.dp_size > 1: + # ops.AllGather is not supported for uneven size tensor, so need to pad to same size. + hidden_buffer = mint.index_select(hidden_states, 0, dp_pad_index) + hidden_buffer = self.all_gather_from_dp_group(hidden_buffer) + + logit_buffer = mint.index_select(router_logits, 0, dp_pad_index) + logit_buffer = self.all_gather_from_dp_group(logit_buffer) + + hidden_states = mint.index_select(hidden_buffer, 0, dp_unpad_index_total_with_offset) + router_logits = mint.index_select(logit_buffer, 0, dp_unpad_index_total_with_offset) + + # Matrix multiply. + final_hidden_states = self.quant_method.apply( + layer=self, + x=hidden_states, + router_logits=router_logits, + top_k=self.top_k, + renormalize=self.renormalize, + use_grouped_topk=self.use_grouped_topk, + global_num_experts=self.global_num_experts, + expert_map=self.expert_map, + topk_group=self.topk_group, + num_expert_group=self.num_expert_group, + custom_routing_function=self.custom_routing_function, + scoring_func=self.scoring_func, + e_score_correction_bias=self.e_score_correction_bias, + activation=self.activation, + apply_router_weight_on_input=self.apply_router_weight_on_input, + ) + + if (self.pure_tp or self.tp_ep) and self.dp_size > 1: + final_hidden_states = mint.index_select(final_hidden_states, 0, dp_pad_index_total_with_offset) + final_hidden_states = final_hidden_states.reshape(self.dp_size, -1, final_hidden_states.shape[-1]) + if self.reduce_results: + final_hidden_states = mint.repeat_interleave(final_hidden_states, self.tp_world_size, dim=0) + final_hidden_states = final_hidden_states.reshape(-1, final_hidden_states.shape[-1]) + final_hidden_states = self.reduce_scatter_from_ep_group(final_hidden_states) + final_hidden_states = mint.index_select(final_hidden_states, 0, dp_unpad_index) + else: + final_hidden_states = final_hidden_states.reshape(-1, final_hidden_states.shape[-1]) + final_hidden_states = self.reduce_scatter_from_dp_group(final_hidden_states) + final_hidden_states = mint.index_select(final_hidden_states, 0, dp_unpad_index) + return final_hidden_states + + if self.reduce_results and (self.tp_size > 1 or self.ep_size > 1): + # Default set to False. (May have to add shared expert outputs.) + final_hidden_states = self.maybe_all_reduce_tensor_model_parallel( + final_hidden_states) + + return final_hidden_states + + def forward_impl_chunked(self, full_hidden_states: Tensor, + full_router_logits: Tensor): + + full_final_hidden_states = mint.empty_like(full_hidden_states) + + def process_chunk(chunk_start, chunk_end, skip_result_store=False): + chunk_size = chunk_end - chunk_start + hidden_states = full_hidden_states[chunk_start:chunk_end, :] + router_logits = full_router_logits[chunk_start:chunk_end, :] + + # Matrix multiply. + final_hidden_states = self.quant_method.apply( + layer=self, + x=hidden_states, + router_logits=router_logits, + top_k=self.top_k, + renormalize=self.renormalize, + use_grouped_topk=self.use_grouped_topk, + global_num_experts=self.global_num_experts, + expert_map=self.expert_map, + topk_group=self.topk_group, + num_expert_group=self.num_expert_group, + custom_routing_function=self.custom_routing_function, + scoring_func=self.scoring_func, + e_score_correction_bias=self.e_score_correction_bias, + activation=self.activation, + apply_router_weight_on_input=self.apply_router_weight_on_input, + ) + + if not skip_result_store: + full_final_hidden_states[chunk_start:chunk_end, :] = final_hidden_states + + # ctx = get_forward_context() + # max_tokens_across_dp = ctx.dp_metadata.max_tokens_across_dp_cpu + # moe_dp_chunk_size_per_rank = self.moe_config.max_num_tokens + num_tokens = ops.shape(full_hidden_states)[0] + max_tokens_across_dp = self.all_reduce_max_across_dp(ops.scalar_to_tensor(num_tokens, dtype=ms.int32)) + max_tokens_across_dp = max_tokens_across_dp.item() + moe_dp_chunk_size_per_rank = self.moe_config.max_num_tokens + + for chunk_start_ in range(0, max_tokens_across_dp, + moe_dp_chunk_size_per_rank): + chunk_start = chunk_start_ + chunk_end = min(chunk_start + moe_dp_chunk_size_per_rank, + max_tokens_across_dp) + # clamp start and end + chunk_start = min(chunk_start, num_tokens - 1) + chunk_end = min(chunk_end, num_tokens) + + process_chunk(chunk_start, + chunk_end, + skip_result_store=chunk_start_ >= num_tokens) + + return full_final_hidden_states + + @classmethod + def make_expert_params_mapping( + cls, ckpt_gate_proj_name: str, ckpt_down_proj_name: str, + ckpt_up_proj_name: str, + num_experts: int) -> list[tuple[str, str, int, str]]: + + return [ + # (param_name, weight_name, expert_id, shard_id) + ("experts.w13_" if weight_name + in [ckpt_gate_proj_name, ckpt_up_proj_name] else "experts.w2_", + f"experts.{expert_id}.{weight_name}.", expert_id, shard_id) + for expert_id in range(num_experts) for shard_id, weight_name in [ + ("w1", ckpt_gate_proj_name), + ("w2", ckpt_down_proj_name), + ("w3", ckpt_up_proj_name), + ] + ] + + def extra_repr(self) -> str: + + s = ( + f"global_num_experts={self.global_num_experts}, " + f"local_num_experts={self.local_num_experts}, " + f"top_k={self.top_k}, " + f"intermediate_size_per_partition={self.intermediate_size_per_partition}, " # noqa: E501 + f"tp_size={self.tp_size},\n" + f"ep_size={self.ep_size}, " + f"reduce_results={self.reduce_results}, " + f"renormalize={self.renormalize}, " + f"use_grouped_topk={self.use_grouped_topk}") + + if self.use_grouped_topk: + s += f", num_expert_group={self.num_expert_group}, topk_group={self.topk_group}" # noqa: E501 + + s += f", scoring_func='{self.scoring_func}', activation='{self.activation}'" # noqa: E501 + + return s diff --git a/vllm_mindspore/model_executor/layers/linear.py b/vllm_mindspore/model_executor/layers/linear.py index adfbc0861f9a5e0bc2d5b261954734ba387f0336..07f1f90f25f53f9dfc67dab51ec2f8212cb146c8 100644 --- a/vllm_mindspore/model_executor/layers/linear.py +++ b/vllm_mindspore/model_executor/layers/linear.py @@ -22,7 +22,7 @@ from abc import abstractmethod from typing import Optional, Union -from mindspore import Parameter, Tensor, mint, nn, ops +from mindspore import Parameter, Tensor, mint, nn, ops, from_numpy from mindspore._c_expression.typing import Type as MSDtype from vllm.config import get_current_vllm_config from vllm.distributed import (divide, get_tensor_model_parallel_rank, @@ -35,6 +35,8 @@ from vllm_mindspore.distributed.communication_op import ( from vllm_mindspore.model_executor.layers.quantization.base_config import ( QuantizationConfig, QuantizeMethodBase) from vllm_mindspore.model_executor.utils import set_weight_attrs +from vllm_mindspore.model_executor.model_loader.weight_utils import ( + split_loaded_weight) WEIGHT_LOADER_V2_SUPPORTED = [ "CompressedTensorsLinearMethod", "AWQMarlinLinearMethod", @@ -159,6 +161,88 @@ class LinearBase(nn.Cell): raise NotImplementedError +class ReplicatedLinear(LinearBase): + """Replicated linear layer. + + Args: + input_size: input dimension of the linear layer. + output_size: output dimension of the linear layer. + bias: If true, add bias. + skip_bias_add: If true, skip adding bias but instead return it. + params_dtype: Data type for the parameters. + quant_config: Quantization configure. + prefix: The name of the layer in the state dict, including all parents + (e.g. model.layers.0.qkv_proj) + return_bias: If true, return bias together with outputs in forward pass. + """ + + def __init__( + self, + input_size: int, + output_size: int, + bias: bool = True, + skip_bias_add: bool = False, + params_dtype = None, + quant_config: Optional[QuantizationConfig] = None, + prefix: str = "", + *, + return_bias: bool = True, + ): + super().__init__(input_size, + output_size, + skip_bias_add, + params_dtype, + quant_config, + prefix=prefix, + return_bias=return_bias) + + # All the linear layer supports quant method. + assert self.quant_method is not None + self.quant_method.create_weights(self, + self.input_size, [self.output_size], + self.input_size, + self.output_size, + self.params_dtype, + weight_loader=self.weight_loader) + + if bias: + self.bias = Parameter( + mint.empty(self.output_size, dtype=self.params_dtype)) + set_weight_attrs(self.bias, { + "output_dim": 0, + "weight_loader": self.weight_loader, + }) + else: + self.bias = None + + def weight_loader(self, param: Parameter, loaded_weight: Tensor): + loaded_weight = loaded_weight[:] + if len(loaded_weight.shape) == 0: + loaded_weight = loaded_weight.reshape(1) + + assert param.shape == loaded_weight.shape, ( + f"Tried to load weights of size {loaded_weight.size()}" + f"to a parameter of size {param.size()}") + param.set_data(from_numpy(loaded_weight)) + + def construct( + self, x: Tensor + ) -> Union[Tensor, tuple[Tensor, Optional[Parameter]]]: + bias = self.bias if not self.skip_bias_add else None + assert self.quant_method is not None + output = self.quant_method.apply(self, x, bias) + output_bias = self.bias if self.skip_bias_add else None + if not self.return_bias: + return output + return output, output_bias + + def extra_repr(self) -> str: + s = f"in_features={self.input_size}" + s += f", output_features={self.output_size}" + s += f", bias={self.bias is not None}" + return s + + class ColumnParallelLinear(LinearBase): """Linear layer with column parallelism. @@ -270,14 +354,16 @@ class ColumnParallelLinear(LinearBase): if output_dim is not None: shard_size = param.shape[output_dim] start_idx = tp_rank * shard_size - loaded_weight = loaded_weight.narrow(output_dim, start_idx, - shard_size).contiguous() + # loaded_weight = loaded_weight.narrow(output_dim, start_idx, + # shard_size).contiguous() + loaded_weight = split_loaded_weight(loaded_weight, output_dim, + start_idx, shard_size) if len(loaded_weight.shape) == 0: loaded_weight = loaded_weight.reshape(1) assert param.shape == loaded_weight.shape - param.set_data(loaded_weight) + param.set_data(from_numpy(loaded_weight)) class MergedColumnParallelLinear(ColumnParallelLinear): @@ -342,14 +428,16 @@ class MergedColumnParallelLinear(ColumnParallelLinear): assert loaded_shard_id < len(self.output_sizes) shard_offset = sum(self.output_sizes[:loaded_shard_id]) // tp_size shard_size = self.output_sizes[loaded_shard_id] // tp_size - param_data = param.data - param_data = param_data.narrow(output_dim, shard_offset, - shard_size) + # param_data = param.data + # param_data = param_data.narrow(output_dim, shard_offset, + # shard_size) start_idx = tp_rank * shard_size - loaded_weight = loaded_weight.narrow(output_dim, start_idx, - shard_size).contiguous() - assert param_data.shape == loaded_weight.shape - param[shard_offset:shard_offset + shard_size, :] = loaded_weight + # loaded_weight = loaded_weight.narrow(output_dim, start_idx, + # shard_size).contiguous() + # assert param_data.shape == loaded_weight.shape + loaded_weight = split_loaded_weight(loaded_weight, output_dim, + start_idx, shard_size) + param[shard_offset:shard_offset + shard_size, :] = from_numpy(loaded_weight) class QKVParallelLinear(ColumnParallelLinear): @@ -439,7 +527,7 @@ class QKVParallelLinear(ColumnParallelLinear): assert loaded_shard_id in ["q", "k", "v"] # If output dim is defined, use the default loading process. # if output_dim is not None: - param_data = param.data + # param_data = param.data if loaded_shard_id == "q": shard_offset = 0 shard_size = self.num_heads * self.head_size @@ -451,16 +539,19 @@ class QKVParallelLinear(ColumnParallelLinear): self.num_kv_heads) * self.head_size shard_size = self.num_kv_heads * self.head_size - param_data = param_data.narrow(output_dim, shard_offset, shard_size) + # param_data = param_data.narrow(output_dim, shard_offset, shard_size) if loaded_shard_id == "q": shard_id = tp_rank else: shard_id = tp_rank // self.num_kv_head_replicas start_idx = shard_id * shard_size - loaded_weight = loaded_weight.narrow(output_dim, start_idx, - shard_size).contiguous() - assert param_data.shape == loaded_weight.shape + # loaded_weight = loaded_weight.narrow(output_dim, start_idx, + # shard_size).contiguous() + loaded_weight = split_loaded_weight(loaded_weight, output_dim, + start_idx, shard_size) + loaded_weight = from_numpy(loaded_weight) + # assert param_data.shape == loaded_weight.shape if param.name.endswith("weight"): self.weight[shard_offset:shard_offset + shard_size, :] = loaded_weight @@ -592,9 +683,11 @@ class RowParallelLinear(LinearBase): if input_dim is not None and not is_sharded_weight: shard_size = param.shape[input_dim] start_idx = tp_rank * shard_size - loaded_weight = loaded_weight.narrow(input_dim, start_idx, - shard_size).contiguous() + # loaded_weight = loaded_weight.narrow(input_dim, start_idx, + # shard_size).contiguous() + loaded_weight = split_loaded_weight(loaded_weight, input_dim, + start_idx, shard_size) if len(loaded_weight.shape) == 0: loaded_weight = loaded_weight.reshape(1) assert param.shape == loaded_weight.shape - param.set_data(loaded_weight.contiguous()) + param.set_data(from_numpy(loaded_weight)) diff --git a/vllm_mindspore/model_executor/layers/vocab_parallel_embedding.py b/vllm_mindspore/model_executor/layers/vocab_parallel_embedding.py index 18530805915eac370f7040206e99cba2aaf1433e..b56952bba1d098ed6ebee99d126afe1526741582 100644 --- a/vllm_mindspore/model_executor/layers/vocab_parallel_embedding.py +++ b/vllm_mindspore/model_executor/layers/vocab_parallel_embedding.py @@ -22,7 +22,7 @@ from collections.abc import Sequence from dataclasses import dataclass from typing import Optional -from mindspore import Parameter, Tensor, mint, nn, ops +from mindspore import Parameter, Tensor, mint, nn, ops, from_numpy from mindspore.common.dtype import typing from vllm.config import get_current_vllm_config from vllm.distributed import (divide, get_tensor_model_parallel_rank, @@ -35,6 +35,8 @@ from vllm_mindspore.distributed.communication_op import ( from vllm_mindspore.model_executor.layers.quantization.base_config import ( QuantizeMethodBase, method_has_implemented_embedding) from vllm_mindspore.model_executor.utils import set_weight_attrs +from vllm_mindspore.model_executor.model_loader.weight_utils import ( + split_loaded_weight) DEFAULT_VOCAB_PADDING_SIZE = 64 @@ -343,28 +345,32 @@ class VocabParallelEmbedding(nn.Cell): # If parameter does not have output dim, then it should # be copied onto all gpus (e.g. g_idx for act_order gptq). if output_dim is None: + loaded_weight = loaded_weight[:] assert param.data.shape == loaded_weight.shape if param.data.shape != loaded_weight.shape: raise ValueError( f"'param.data.shape' should be equal " f"to 'loaded_weight.shape'," f" but got {param.data.shape} and {loaded_weight.shape}") - param.set_data(loaded_weight) + param.set_data(from_numpy(loaded_weight)) return # Shard indexes for loading the weight start_idx = self.shard_indices.org_vocab_start_index shard_size = self.shard_indices.org_vocab_end_index - start_idx - if loaded_weight.shape[output_dim] != self.org_vocab_size: - raise ValueError(f"'loaded_weight.shape[output_dim]' should " - f"be equal to 'org_vocab_size'," - f" but got {loaded_weight.shape[output_dim]} " - f"and {self.org_vocab_size}") + loaded_weight = split_loaded_weight(loaded_weight, output_dim, + start_idx, shard_size) + + # if loaded_weight.shape[output_dim] != self.org_vocab_size: + # raise ValueError(f"'loaded_weight.shape[output_dim]' should " + # f"be equal to 'org_vocab_size'," + # f" but got {loaded_weight.shape[output_dim]} " + # f"and {self.org_vocab_size}") # Copy the data. - loaded_weight = loaded_weight.narrow(output_dim, start_idx, - shard_size).contiguous() - param[:loaded_weight.shape[0]] = loaded_weight + # loaded_weight = loaded_weight.narrow(output_dim, start_idx, + # shard_size).contiguous() + param[:loaded_weight.shape[0]] = from_numpy(loaded_weight) param[loaded_weight.shape[0]:] = 0 diff --git a/vllm_mindspore/model_executor/model_loader/weight_utils.py b/vllm_mindspore/model_executor/model_loader/weight_utils.py index 4ec3a2ded040013c01f480eab7fae8eb078072de..8b12f771e5b533320d2189825870011d69483cf1 100644 --- a/vllm_mindspore/model_executor/model_loader/weight_utils.py +++ b/vllm_mindspore/model_executor/model_loader/weight_utils.py @@ -26,6 +26,31 @@ from mindspore import Parameter, Tensor from tqdm.auto import tqdm +def split_loaded_weight(loaded_weight, shard_dim, start_idx, shard_size): + + """ + Read numpy slice data based on axis and slice range. + :loaded_weight: PySafeSlice object + :shard_dim: axis of weight slice + :start_idx: start slice index + :shard_size: end slice index + """ + if shard_dim is None: + loaded_weight = loaded_weight[:] + return loaded_weight + + end_idx = start_idx + shard_size + if shard_dim == 0: + loaded_weight = loaded_weight[start_idx:end_idx] + elif shard_dim == 1: + loaded_weight = loaded_weight[:, start_idx:end_idx] + elif shard_dim == 2: + loaded_weight = loaded_weight[:, :, start_idx:end_idx] + else: + raise ValueError("shard_dim:{} is not supported.".format(shard_dim)) + return loaded_weight + + def safetensors_weights_iterator( hf_weights_files: list[str], use_tqdm_on_load: bool, @@ -43,10 +68,11 @@ def safetensors_weights_iterator( ): with safe_open(st_file, framework="np") as f: for name in f.keys(): # noqa: SIM118 - param = f.get_tensor(name) - yield name, ms.tensor(param) + param = f.get_slice(name) + yield name, param def default_weight_loader(param: Parameter, loaded_weight: Tensor) -> None: """Default weight loader.""" - param.set_data(loaded_weight) + loaded_weight = loaded_weight[:] + param.set_data(ms.Tensor(loaded_weight, dtype=param.dtype)) diff --git a/vllm_mindspore/model_executor/models/qwen3_moe.py b/vllm_mindspore/model_executor/models/qwen3_moe.py new file mode 100644 index 0000000000000000000000000000000000000000..da8ec9cfe13a7a3d9dd52aaf6c997058845e9c74 --- /dev/null +++ b/vllm_mindspore/model_executor/models/qwen3_moe.py @@ -0,0 +1,769 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 + +# Adapted from +# https://github.com/vllm-project/vllm/blob/main/vllm/model_executor/models/qwen3_moe.py +# +# Copyright 2025 Huawei Technologites Co., Ltd +# Copyright 2025 The vLLM team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""Inference-only Qwen3MoE model compatible with HuggingFace weights.""" +from collections.abc import Iterable +from typing import Any, Optional, Union, Dict, Tuple, List + +import numpy as np +import mindspore as ms +from mindspore import Tensor, nn, Parameter, mint +from mindspore import Tensor, nn, mutable +from mindspore.common import dtype as mstype + +from transformers import PretrainedConfig +from vllm.config import CacheConfig, VllmConfig +from vllm.distributed import (get_pp_group, get_tensor_model_parallel_world_size, + get_dp_group) +from vllm.logger import init_logger +from vllm.model_executor.layers.quantization import QuantizationConfig +from vllm.model_executor.models.interfaces import SupportsPP +from vllm.model_executor.sampling_metadata import SamplingMetadata +from vllm.sequence import IntermediateTensors +from vllm.forward_context import get_forward_context + +from vllm_mindspore.attention import Attention +from vllm_mindspore.model_executor.layers.activation import SiluAndMul +from vllm_mindspore.model_executor.layers.fused_moe import FusedMoE +from vllm_mindspore.model_executor.layers.layernorm import RMSNorm +from vllm_mindspore.model_executor.layers.linear import ( + MergedColumnParallelLinear, QKVParallelLinear, ReplicatedLinear, + RowParallelLinear) +from vllm_mindspore.model_executor.layers.logits_processor import ( + LogitsProcessor) +from vllm_mindspore.model_executor.layers.rotary_embedding import get_rope +from vllm_mindspore.model_executor.layers.vocab_parallel_embedding import ( + ParallelLMHead, VocabParallelEmbedding) +from vllm_mindspore.model_executor.model_loader.weight_utils import default_weight_loader + +from vllm_mindspore.model_executor.models.utils import ( + extract_layer_index, is_pp_missing_parameter, + make_empty_intermediate_tensors_factory, make_layers, maybe_prefix) +from vllm_mindspore.model_executor.models.model_base import NativeModel +from vllm_mindspore.model_executor.layers.sampler import (SamplerOutput, + get_sampler) +from vllm_mindspore.utils import STR_DTYPE_TO_MS_DTYPE + +logger = init_logger(__name__) + + +class Qwen3MoeMLP(nn.Cell): + + def __init__( + self, + hidden_size: int, + intermediate_size: int, + hidden_act: str, + quant_config: Optional[QuantizationConfig] = None, + reduce_results: bool = True, + prefix: str = "", + ) -> None: + super().__init__() + self.gate_up_proj = MergedColumnParallelLinear( + hidden_size, [intermediate_size] * 2, + bias=False, + quant_config=quant_config, + prefix=f"{prefix}.gate_up_proj") + self.down_proj = RowParallelLinear(intermediate_size, + hidden_size, + bias=False, + quant_config=quant_config, + reduce_results=reduce_results, + prefix=f"{prefix}.down_proj") + if hidden_act != "silu": + raise ValueError(f"Unsupported activation: {hidden_act}. " + "Only silu is supported for now.") + self.act_fn = SiluAndMul() + + def construct(self, x, dp_pad_index, dp_unpad_index, dp_unpad_index_total_with_offset): + gate_up, _ = self.gate_up_proj(x) + x = self.act_fn(gate_up) + x, _ = self.down_proj(x) + return x + + +class Qwen3MoeSparseMoeBlock(nn.Cell): + + def __init__( + self, + config: PretrainedConfig, + quant_config: Optional[QuantizationConfig] = None, + prefix: str = "", + ): + super().__init__() + self.tp_size = get_tensor_model_parallel_world_size() + + if self.tp_size > config.num_experts: + raise ValueError( + f"Tensor parallel size {self.tp_size} is greater than " + f"the number of experts {config.num_experts}.") + + self.experts = FusedMoE(num_experts=config.num_experts, + top_k=config.num_experts_per_tok, + hidden_size=config.hidden_size, + intermediate_size=config.moe_intermediate_size, + reduce_results=True, + renormalize=config.norm_topk_prob, + quant_config=quant_config, + prefix=f"{prefix}.experts") + + self.gate = ReplicatedLinear(config.hidden_size, + config.num_experts, + bias=False, + quant_config=None, + prefix=f"{prefix}.gate") + + def construct(self, hidden_states: Tensor, dp_pad_index, dp_unpad_index, + dp_pad_index_with_offset, dp_unpad_index_total_with_offset) -> Tensor: + # NOTE: hidden_states can have either 1D or 2D shape. + orig_shape = hidden_states.shape + hidden_dim = hidden_states.shape[-1] + hidden_states = hidden_states.view(-1, hidden_dim) + + # router_logits: (num_tokens, n_experts) + router_logits, _ = self.gate(hidden_states) + final_hidden_states = self.experts(hidden_states=hidden_states, + router_logits=router_logits, + dp_pad_index=dp_pad_index, + dp_unpad_index=dp_unpad_index, + dp_pad_index_with_offset=dp_pad_index_with_offset, + dp_unpad_index_total_with_offset=dp_unpad_index_total_with_offset) + + return final_hidden_states.view(orig_shape) + + +class Qwen3MoeAttention(nn.Cell): + + def __init__( + self, + hidden_size: int, + num_heads: int, + num_kv_heads: int, + rope_theta: float = 10000, + rope_scaling: Optional[dict[str, Any]] = None, + max_position_embeddings: int = 8192, + head_dim: Optional[int] = None, + rms_norm_eps: float = 1e-06, + qkv_bias: bool = False, + cache_config: Optional[CacheConfig] = None, + quant_config: Optional[QuantizationConfig] = None, + prefix: str = "", + ) -> None: + super().__init__() + self.hidden_size = hidden_size + tp_size = get_tensor_model_parallel_world_size() + self.total_num_heads = num_heads + assert self.total_num_heads % tp_size == 0 + self.num_heads = self.total_num_heads // tp_size + self.total_num_kv_heads = num_kv_heads + if self.total_num_kv_heads >= tp_size: + # Number of KV heads is greater than TP size, so we partition + # the KV heads across multiple tensor parallel GPUs. + assert self.total_num_kv_heads % tp_size == 0 + else: + # Number of KV heads is less than TP size, so we replicate + # the KV heads across multiple tensor parallel GPUs. + assert tp_size % self.total_num_kv_heads == 0 + self.num_kv_heads = max(1, self.total_num_kv_heads // tp_size) + self.head_dim = head_dim or (hidden_size // self.total_num_heads) + self.q_size = self.num_heads * self.head_dim + self.kv_size = self.num_kv_heads * self.head_dim + self.scaling = self.head_dim**-0.5 + self.rope_theta = rope_theta + self.max_position_embeddings = max_position_embeddings + + self.qkv_proj = QKVParallelLinear(hidden_size, + self.head_dim, + self.total_num_heads, + self.total_num_kv_heads, + bias=qkv_bias, + quant_config=quant_config, + prefix=f"{prefix}.qkv_proj") + + self.o_proj = RowParallelLinear(self.total_num_heads * self.head_dim, + hidden_size, + bias=False, + quant_config=quant_config, + prefix=f"{prefix}.o_proj") + + self.rotary_emb = get_rope( + self.head_dim, + rotary_dim=self.head_dim, + max_position=max_position_embeddings, + base=rope_theta, + rope_scaling=rope_scaling, + ) + self.attn = Attention(self.num_heads, + self.head_dim, + self.scaling, + num_kv_heads=self.num_kv_heads, + cache_config=cache_config, + quant_config=quant_config, + prefix=f"{prefix}.attn") + + self.q_norm = RMSNorm(self.head_dim, eps=rms_norm_eps) + self.k_norm = RMSNorm(self.head_dim, eps=rms_norm_eps) + + def construct( + self, + positions: Tensor, + hidden_states: Tensor, + key_cache: Tensor, + value_cache: Tensor, + is_prefill: bool, + slot_mapping: Tensor, + attn_mask: Tensor, + batch_valid_length: Tensor, + q_seq_lens: Tensor, + block_tables: Tensor, + ) -> Tensor: + qkv, _ = self.qkv_proj(hidden_states) + q, k, v = qkv.split([self.q_size, self.kv_size, self.kv_size], dim=-1) + # Add qk-norm + q_by_head = q.view(*q.shape[:-1], q.shape[-1] // self.head_dim, + self.head_dim) + q_by_head = self.q_norm(q_by_head) + q = q_by_head.view(q.shape) + + k_by_head = k.view(*k.shape[:-1], k.shape[-1] // self.head_dim, + self.head_dim) + k_by_head = self.k_norm(k_by_head) + k = k_by_head.view(k.shape) + q, k = self.rotary_emb(positions, q, k, batch_valid_length, is_prefill) + attn_output = self.attn(q, k, v, key_cache, value_cache, is_prefill, + slot_mapping, attn_mask, batch_valid_length, + q_seq_lens, block_tables) + output, _ = self.o_proj(attn_output) + return output + + +class Qwen3MoeDecoderLayer(nn.Cell): + + def __init__( + self, + config: PretrainedConfig, + cache_config: Optional[CacheConfig] = None, + quant_config: Optional[QuantizationConfig] = None, + prefix: str = "", + ) -> None: + super().__init__() + self.hidden_size = config.hidden_size + rope_theta = getattr(config, "rope_theta", 10000) + rope_scaling = getattr(config, "rope_scaling", None) + max_position_embeddings = getattr(config, "max_position_embeddings", + 8192) + self.self_attn = Qwen3MoeAttention( + hidden_size=self.hidden_size, + num_heads=config.num_attention_heads, + num_kv_heads=config.num_key_value_heads, + rope_theta=rope_theta, + rope_scaling=rope_scaling, + max_position_embeddings=max_position_embeddings, + rms_norm_eps=config.rms_norm_eps, + qkv_bias=getattr(config, 'attention_bias', False), + head_dim=getattr(config, 'head_dim', None), + cache_config=cache_config, + quant_config=quant_config, + prefix=f"{prefix}.self_attn", + ) + + # `mlp_only_layers` in the config. + layer_idx = extract_layer_index(prefix) + mlp_only_layers = ([] if not hasattr(config, "mlp_only_layers") else + config.mlp_only_layers) + if (layer_idx not in mlp_only_layers) and ( + config.num_experts > 0 and + (layer_idx + 1) % config.decoder_sparse_step == 0): + self.mlp = Qwen3MoeSparseMoeBlock(config=config, + quant_config=quant_config, + prefix=f"{prefix}.mlp") + else: + self.mlp = Qwen3MoeMLP(hidden_size=config.hidden_size, + intermediate_size=config.intermediate_size, + hidden_act=config.hidden_act, + quant_config=quant_config, + prefix=f"{prefix}.mlp") + self.input_layernorm = RMSNorm(config.hidden_size, + eps=config.rms_norm_eps) + self.post_attention_layernorm = RMSNorm(config.hidden_size, + eps=config.rms_norm_eps) + + def construct( + self, + positions: Tensor, + hidden_states: Tensor, + key_cache: Tensor, + value_cache: Tensor, + is_prefill: bool, + slot_mapping: Tensor, + attn_mask: Tensor, + batch_valid_length: Tensor, + q_seq_lens: Tensor, + block_tables: Tensor, + residual: Optional[Tensor], + dp_pad_index: Optional[bool] = None, + dp_unpad_index: Optional[Tensor] = None, + dp_pad_index_with_offset: Optional[Tensor] = None, + dp_unpad_index_total_with_offset: Optional[Tensor] = None, + ) -> Tensor: + # Self Attention + if residual is None: + residual = hidden_states + hidden_states = self.input_layernorm(hidden_states) + else: + hidden_states, residual = self.input_layernorm( + hidden_states, residual) + hidden_states = self.self_attn(positions, hidden_states, key_cache, + value_cache, is_prefill, slot_mapping, + attn_mask, batch_valid_length, + q_seq_lens, block_tables) + # Fully Connected + hidden_states, residual = self.post_attention_layernorm( + hidden_states, residual) + hidden_states = self.mlp(hidden_states, dp_pad_index, dp_unpad_index, + dp_pad_index_with_offset, dp_unpad_index_total_with_offset) + return hidden_states, residual + + +class Qwen3MoeModel(nn.Cell): + + def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): + super().__init__() + + config = vllm_config.model_config.hf_config + cache_config = vllm_config.cache_config + quant_config = vllm_config.quant_config + + self.padding_idx = config.pad_token_id + self.vocab_size = config.vocab_size + self.config = config + self.embed_tokens = VocabParallelEmbedding( + config.vocab_size, + config.hidden_size, + prefix=f"{prefix}.embed_tokens") + self.start_layer, self.end_layer, self.layers = make_layers( + config.num_hidden_layers, + lambda prefix: Qwen3MoeDecoderLayer(config=config, + cache_config=cache_config, + quant_config=quant_config, + prefix=prefix), + prefix=f"{prefix}.layers", + ) + self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) + self.make_empty_intermediate_tensors = ( + make_empty_intermediate_tensors_factory( + ["hidden_states", "residual"], config.hidden_size)) + + def get_input_embeddings(self, input_ids: Tensor) -> Tensor: + return self.embed_tokens(input_ids) + + def construct( + self, + input_ids: Tensor, + positions: Tensor, + key_caches: List[Tensor], + value_caches: List[Tensor], + is_prefill: bool, + slot_mapping: Tensor, + attn_mask: Tensor, + batch_valid_length: Tensor, + q_seq_lens: Tensor, + block_tables: Tensor, + intermediate_tensors: Optional[IntermediateTensors] = None, + inputs_embeds: Optional[Tensor] = None, + dp_pad_index = None, + dp_unpad_index: Optional[Tensor] = None, + dp_pad_index_total_with_offset: Optional[Tensor] = None, + dp_unpad_index_total_with_offset: Optional[Tensor] = None, + + ) -> Union[Tensor, IntermediateTensors]: + if get_pp_group().is_first_rank: + if inputs_embeds is not None: + hidden_states = inputs_embeds + else: + hidden_states = self.get_input_embeddings(input_ids) + residual = None + else: + assert intermediate_tensors is not None + hidden_states = intermediate_tensors["hidden_states"] + residual = intermediate_tensors["residual"] + for i in range(self.start_layer, self.end_layer): + layer = self.layers[i] + hidden_states, residual = layer(positions, hidden_states, + key_caches[i - self.start_layer], + value_caches[i - self.start_layer], + is_prefill, slot_mapping, + attn_mask, batch_valid_length, + q_seq_lens, block_tables, residual, + dp_pad_index, dp_unpad_index, + dp_pad_index_total_with_offset, + dp_unpad_index_total_with_offset) + if not get_pp_group().is_last_rank: + return IntermediateTensors({ + "hidden_states": hidden_states, + "residual": residual + }) + hidden_states, _ = self.norm(hidden_states, residual) + return hidden_states + + def load_weights(self, weights: Iterable[Tuple[str, Tensor]], + params_dict: Dict[str, Parameter]): + stacked_params_mapping = [ + # (param_name, shard_name, shard_id) + ("qkv_proj", "q_proj", "q"), + ("qkv_proj", "k_proj", "k"), + ("qkv_proj", "v_proj", "v"), + ("gate_up_proj", "gate_proj", 0), + ("gate_up_proj", "up_proj", 1), + ] + + # Params for weights, fp8 weight scales, fp8 activation scales + # (param_name, weight_name, expert_id, shard_id) + expert_params_mapping = FusedMoE.make_expert_params_mapping( + ckpt_gate_proj_name="gate_proj", + ckpt_down_proj_name="down_proj", + ckpt_up_proj_name="up_proj", + num_experts=self.config.num_experts) + + loaded_params: set[str] = set() + for name, loaded_weight in weights: + for (param_name, weight_name, shard_id) in stacked_params_mapping: + # Skip non-stacked layers and experts (experts handled below). + if weight_name not in name: + continue + # We have mlp.experts[0].gate_proj in the checkpoint. + # Since we handle the experts below in expert_params_mapping, + # we need to skip here BEFORE we update the name, otherwise + # name will be updated to mlp.experts[0].gate_up_proj, which + # will then be updated below in expert_params_mapping + # for mlp.experts[0].gate_gate_up_proj, which breaks load. + if "mlp.experts" in name: + continue + name = name.replace(weight_name, param_name) + # Skip loading extra bias for GPTQ models. + if ((name.endswith(".bias") or name.endswith("_bias")) + and name not in params_dict): + continue + # Skip layers on other devices. + if is_pp_missing_parameter(name, self): + continue + if name not in params_dict: + continue + + param = params_dict[name] + weight_loader = param.weight_loader + weight_loader(param, loaded_weight, shard_id) + break + else: + for mapping in expert_params_mapping: + param_name, weight_name, expert_id, shard_id = mapping + if weight_name not in name: + continue + name = name.replace(weight_name, param_name) + # Skip layers on other devices. + if is_pp_missing_parameter(name, self): + continue + # Skip loading extra bias for GPTQ models. + if ((name.endswith(".bias") or name.endswith("_bias")) + and name not in params_dict): + continue + param = params_dict[name] + weight_loader = param.weight_loader + weight_loader(param, + loaded_weight, + name, + shard_id=shard_id, + expert_id=expert_id) + break + else: + # Skip loading extra bias for GPTQ models. + if ((name.endswith(".bias") or name.endswith("_bias")) + and name not in params_dict): + continue + # Skip layers on other devices. + if is_pp_missing_parameter(name, self): + continue + # Remapping the name of FP8 kv-scale. + if name.endswith("kv_scale"): + remapped_kv_scale_name = name.replace( + ".kv_scale", ".attn.kv_scale") + if remapped_kv_scale_name not in params_dict: + logger.warning_once( + "Found kv scale in the checkpoint (e.g. %s), but not found the expected name in the model (e.g. %s). kv-scale is not loaded.", # noqa: E501 + name, + remapped_kv_scale_name, + ) + continue + else: + name = remapped_kv_scale_name + param = params_dict[name] + weight_loader = getattr(param, "weight_loader", + default_weight_loader) + weight_loader(param, loaded_weight) + loaded_params.add(name) + return loaded_params + + +class Qwen3MoeForCausalLM(NativeModel, SupportsPP): + packed_modules_mapping = { + "qkv_proj": [ + "q_proj", + "k_proj", + "v_proj", + ], + "gate_up_proj": [ + "gate_proj", + "up_proj", + ], + } + + fall_back_to_pt_during_load = False + + def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""): + super().__init__(vllm_config=vllm_config, prefix=prefix) + config = vllm_config.model_config.hf_config + quant_config = vllm_config.quant_config + self.config = config + self.quant_config = quant_config + self.model = Qwen3MoeModel(vllm_config=vllm_config, + prefix=maybe_prefix(prefix, "model")) + self.lm_head = ParallelLMHead(config.vocab_size, + config.hidden_size, + quant_config=quant_config) + if self.config.tie_word_embeddings: + self.lm_head.weight = self.model.embed_tokens.weight + self.logits_processor = LogitsProcessor(config.vocab_size) + self.make_empty_intermediate_tensors = ( + self.model.make_empty_intermediate_tensors) + + self.sampler = get_sampler() + + self.common_preprocess(vllm_config, prefix) + + self.dp_pad_input = False + custom_ep_size = False + if vllm_config.additional_config is not None: + custom_ep_size = vllm_config.additional_config.get("ep_size", None) + if get_dp_group().world_size > 1 and ( + not self.parallel_config.enable_expert_parallel or custom_ep_size + ): + self.dp_pad_input = True + self.dp_group = get_dp_group().device_group._name + self.dp_world_size = get_dp_group().world_size + self.dp_rank = get_dp_group().rank_in_group + + def get_input_embeddings(self, input_ids: Tensor) -> Tensor: + return self.model.get_input_embeddings(input_ids) + + def forward( + self, + input_ids: Tensor, + positions: Tensor, + intermediate_tensors: Optional[IntermediateTensors] = None, + inputs_embeds: Optional[Tensor] = None, + **kwargs + ) -> Union[Tensor, IntermediateTensors]: + hidden_states = self.exec_model(input_ids, positions, intermediate_tensors, + inputs_embeds) + return hidden_states + + def sample(self, logits: Tensor, + sampling_metadata: SamplingMetadata) -> Optional[SamplerOutput]: + next_tokens = self.sampler(logits, sampling_metadata) + return next_tokens + + def compute_logits( + self, + hidden_states: Tensor, + sampling_metadata: SamplingMetadata, + ) -> Optional[Tensor]: + logits = self.logits_processor(self.lm_head, hidden_states, + sampling_metadata) + return logits + + def load_weights(self, weights: Iterable[tuple[str, + Tensor]]) -> set[str]: + params_dict = self.get_params_dict() + return self.model.load_weights(weights, params_dict) + + def exec_model(self, + input_ids: Tensor, + positions: Tensor, + intermediate_tensors: IntermediateTensors = None, + inputs_embeds: Tensor = None, + **kwargs): + model_inputs, is_prefill = self.prepare_inputs(input_ids, positions, + intermediate_tensors, + inputs_embeds) + + if self.prev_prefill != is_prefill and self.is_graph_mode: + self.set_model_inputs(input_ids, positions, intermediate_tensors, + inputs_embeds, is_prefill) + self.prev_prefill = is_prefill + + # for dummy_attention_metadata + if is_prefill and not self.set_flags: + self.set_flags = True + + if self.run_model is None: + self.run_model = ms.jit( + function=self.model, # type: ignore[attr-defined] + jit_level='O0' + ) if self.is_graph_mode else self.model # type: ignore[attr-defined] + + if self.dp_pad_input: + # if dp and not ep, should pad input to gather. + token_num_total = mint.empty((self.dp_world_size, 1), dtype=ms.int32) + send_tensor = ms.Tensor([[input_ids.shape[0]]], dtype=ms.int32) + mint.distributed.all_gather_into_tensor(token_num_total, send_tensor, + group=self.dp_group) + token_num_total = token_num_total.reshape(-1) + # tokens_cumulative = mint.cumsum(token_num_total, dim=0) + # start = 0 if self.dp_rank == 0 else tokens_cumulative[self.dp_rank - 1].item() + # end = tokens_cumulative[self.dp_rank].item() + # end2 = tokens_cumulative[-1].item() - end + # dp_pad_index = ms.Tensor([0, 0, start, end2], dtype=ms.int32) + token_num_total = token_num_total.asnumpy() + token_num_total_cumsum = np.cumsum(token_num_total) + max_token_num = token_num_total.max() + total_pad_num = max_token_num - token_num_total + this_pad_num = total_pad_num[self.dp_rank] + + dp_unpad_index = ms.Tensor(np.arange(token_num_total[self.dp_rank]), dtype=ms.int32) + dp_pad_index = ms.Tensor(np.pad(dp_unpad_index, (0, this_pad_num)), dtype=ms.int32) + + # dp_pad_index_total_with_offset = [np.pad(np.arange(token_num_total[rank]), (0, total_pad_num[rank])) + # for rank in range(self.dp_world_size)] + dp_pad_index_total_with_offset = [np.pad(np.arange(0 if rank == 0 else token_num_total_cumsum[rank - 1], + token_num_total_cumsum[rank]), (0, total_pad_num[rank])) + for rank in range(self.dp_world_size)] + + dp_pad_index_total_with_offset = np.concatenate(dp_pad_index_total_with_offset, axis=0) + dp_pad_index_total_with_offset = ms.Tensor(dp_pad_index_total_with_offset, dtype=mstype.int32) + + + dp_unpad_index_total_with_offset = [np.arange(token_num_total[rank]) + rank * max_token_num + for rank in range(self.dp_world_size)] + dp_unpad_index_total_with_offset = np.concatenate(dp_unpad_index_total_with_offset, axis=0) + dp_unpad_index_total_with_offset = ms.Tensor(dp_unpad_index_total_with_offset, dtype=mstype.int32) + + + model_output = self.run_model( # type: ignore[misc] + input_ids=model_inputs["input_ids"], + positions=model_inputs["position_ids"], + key_caches=model_inputs["key_cache"], + value_caches=model_inputs["value_cache"], + is_prefill=is_prefill, + slot_mapping=model_inputs["slot_mapping"], + attn_mask=model_inputs["attention_mask"], + batch_valid_length=model_inputs["batch_valid_length"], + q_seq_lens=model_inputs["q_seq_lens"], + block_tables=model_inputs["block_tables"], + intermediate_tensors=model_inputs["intermediate_tensors"], + inputs_embeds=model_inputs["inputs_embeds"], + dp_pad_index=dp_pad_index if self.dp_pad_input else None, + dp_unpad_index=dp_unpad_index if self.dp_pad_input else None, + dp_pad_index_total_with_offset=dp_pad_index_total_with_offset if self.dp_pad_input else None, + dp_unpad_index_total_with_offset=dp_unpad_index_total_with_offset if self.dp_pad_input else None + ) + + return model_output + + + def set_model_inputs(self, input_ids, position_ids, intermediate_tensors, + inputs_embeds, is_prefill): + if input_ids is None: + dyn_input_ids = None + else: + dyn_input_ids = ms.Tensor(shape=[None] * input_ids.ndim, + dtype=mstype.int32) + + if position_ids is None: + dyn_position_ids = None + else: + dyn_position_ids = ms.Tensor(shape=[None] * position_ids.ndim, + dtype=mstype.int32) + + if inputs_embeds is None: + dyn_inputs_embeds = None + else: + dyn_inputs_embeds = ms.Tensor(shape=[None] * inputs_embeds.ndim, + dtype=inputs_embeds.dtype) + + if intermediate_tensors is None: + dyn_intermediate_tensors = None + else: + dyn_intermediate_tensors = ms.Tensor( + shape=[None] * intermediate_tensors.ndim, + dtype=intermediate_tensors.dtype) + + block_size = self.cache_config.block_size + num_kv_heads = self.model_config.get_num_kv_heads(self.parallel_config) + head_size = self.model_config.get_head_size() + kv_cache_shape = (None, block_size, num_kv_heads, head_size) + + kv_cache_dtype = self.model_config.dtype if self.cache_config.cache_dtype == "auto" \ + else self.cache_config.cache_dtype + if kv_cache_dtype in STR_DTYPE_TO_MS_DTYPE: + kv_cache_dtype = STR_DTYPE_TO_MS_DTYPE[kv_cache_dtype] + + num_layers = self.model_config.get_num_layers(self.parallel_config) + + dyn_key_cache = Tensor(shape=kv_cache_shape, dtype=kv_cache_dtype) + dyn_value_cache = Tensor(shape=kv_cache_shape, dtype=kv_cache_dtype) + dyn_key_caches = mutable([dyn_key_cache for _ in range(num_layers)]) + dyn_value_caches = mutable( + [dyn_value_cache for _ in range(num_layers)]) + + dyn_slot_mapping = Tensor(shape=[None], dtype=mstype.int32) + dynamic_attention_mask = Tensor(shape=[None, None], + dtype=self.model_config.dtype) + dyn_batch_valid_length = Tensor(shape=[None], dtype=mstype.int32) + dyn_q_seq_lens = Tensor(shape=[None], dtype=mstype.int32) + dyn_block_tables = Tensor(shape=[None, None], dtype=mstype.int32) + dyn_dp_pad_index = Tensor(shape=[None], dtype=mstype.int32) if self.dp_pad_input else None + dyn_dp_unpad_index = Tensor(shape=[None], dtype=mstype.int32) if self.dp_pad_input else None + dyn_dp_pad_index_with_offset = Tensor(shape=[None], dtype=mstype.int32) if self.dp_pad_input else None + dp_unpad_index_total_with_offset = Tensor(shape=[None], dtype=mstype.int32) if self.dp_pad_input else None + + + self.model.set_inputs( + dyn_input_ids, + dyn_position_ids, + dyn_key_caches, # type: ignore[attr-defined] + dyn_value_caches, + is_prefill, + dyn_slot_mapping, + dynamic_attention_mask, + dyn_batch_valid_length, + dyn_q_seq_lens, + dyn_block_tables, + dyn_intermediate_tensors, + dyn_inputs_embeds, + dyn_dp_pad_index, + dyn_dp_unpad_index, + dyn_dp_pad_index_with_offset, + dp_unpad_index_total_with_offset) + + dynamic_hidden_states = Tensor(shape=[None, None], + dtype=self.model_config.dtype) + self.lm_head.set_inputs( + dynamic_hidden_states) # type: ignore[attr-defined] diff --git a/vllm_mindspore/model_executor/models/registry.py b/vllm_mindspore/model_executor/models/registry.py index 13da5f4b25cef1ae461be4194a6d0fbb756c5fda..35b79132f5e42f52b3a2194817a78fa620609b76 100644 --- a/vllm_mindspore/model_executor/models/registry.py +++ b/vllm_mindspore/model_executor/models/registry.py @@ -27,6 +27,9 @@ from vllm_mindspore.utils import (is_mindformers_model_backend, _NATIVE_MODELS = { "LlamaForCausalLM": ("llama", "LlamaForCausalLM"), "Qwen2ForCausalLM": ("qwen2", "Qwen2ForCausalLM"), + "Qwen2_5_VLForConditionalGeneration": + ("qwen2_5_vl", "Qwen2_5_VLForConditionalGeneration"), + "Qwen3MoeForCausalLM": ("qwen3_moe", "Qwen3MoeForCausalLM"), } _MINDFORMERS_MODELS = { diff --git a/vllm_mindspore/model_executor/models/utils.py b/vllm_mindspore/model_executor/models/utils.py index 66792cc040397d2310428e73ee08e21a44a41dc4..dba16ce9311dceccaa7f15fdf98d702181707126 100644 --- a/vllm_mindspore/model_executor/models/utils.py +++ b/vllm_mindspore/model_executor/models/utils.py @@ -23,7 +23,7 @@ from dataclasses import dataclass, field from typing import Optional, Union import mindspore as ms -from mindspore import mint, ops +from mindspore import mint, ops, nn from vllm.sequence import IntermediateTensors from vllm_mindspore.multimodal.inputs import NestedTensors @@ -265,3 +265,34 @@ def merge_multimodal_embeddings( (input_ids == placeholder_token_id), multimodal_embeddings, ) + + +_model_to_pp_missing_layer_names: dict[int, list[str]] = {} + + +def get_pp_missing_layer_names(model: nn.Cell) -> list[str]: + """Get the names of the missing layers in a pipeline parallel model.""" + model_id = id(model) + if model_id in _model_to_pp_missing_layer_names: + return _model_to_pp_missing_layer_names[model_id] + + missing_layer_names = [] + for name, cell in model.cells_and_names(): + if isinstance(cell, PPMissingLayer): + # NOTE: the trailing dot is used to match the prefix of the layer. + # without the dot, we could match a layer that is not missing, + # e.g., 'encoder.layer.1' would match 'encoder.layer.11' + missing_layer_names.append(name + '.') + _model_to_pp_missing_layer_names[model_id] = missing_layer_names + + return missing_layer_names + + +def is_pp_missing_parameter(name: str, model: nn.Cell) -> bool: + """Check if a parameter is missing in a pipeline parallel model.""" + if isinstance(model, PPMissingLayer): + return True + + return any( + name.startswith(missing_layer_name) + for missing_layer_name in get_pp_missing_layer_names(model)) diff --git a/vllm_mindspore/utils.py b/vllm_mindspore/utils.py index f46fd19f60483bdd0454592404e644801e5f327a..08706bc7fd4d916a2c75d424a45643f495b1ee47 100644 --- a/vllm_mindspore/utils.py +++ b/vllm_mindspore/utils.py @@ -301,3 +301,24 @@ def ms_memory_profiling( result.non_torch_increase = diff_from_create.non_torch_memory result.profile_time = diff_profile.timestamp result.non_kv_cache_memory = result.non_torch_increase + result.torch_peak_increase + result.weights_memory # noqa + + +def get_ascend_soc_version(): + """Get ascend soc version.""" + if is_version_ge(ms.__version__, "2.2.0"): + from mindspore._c_expression import MSContext + return MSContext.get_instance().get_ascend_soc_version() + ascend_chip_type = os.getenv("ASCEND_CHIP_TYPE", "UNSET") + if ascend_chip_type not in ["910a", "910b", "UNSET"]: + raise EnvironmentError(f"ASCEND_CHIP_TYPE should be in ['910a', '910b'],but get {ascend_chip_type}") + if ascend_chip_type == "UNSET": + logger.info("Environment variables need to be set manually to obtain the chip type," + "which can be set as follows: \n" + "For Atlas 800, run 'export ASCEND_CHIP_TYPE=910a' before the program runs.\n" + "For Atlas 800T A2, run 'export ASCEND_CHIP_TYPE=910b' before the program runs.\n" + "If you need to get chip information automatically, MindSpore 2.2 and above is recommended") + return ascend_chip_type + +def is_910b(): + device = get_ascend_soc_version() + return device in ['910b', 'ascend910b'] diff --git a/vllm_mindspore/v1/worker/gpu_model_runner.py b/vllm_mindspore/v1/worker/gpu_model_runner.py index 42228ea78667f0d715896f4d89590ca40cf642ea..7b6bc2ef3847bcd65477f22023e76942df529775 100644 --- a/vllm_mindspore/v1/worker/gpu_model_runner.py +++ b/vllm_mindspore/v1/worker/gpu_model_runner.py @@ -37,6 +37,8 @@ from vllm.v1.worker.gpu_input_batch import CachedRequestState from vllm_mindspore.utils import get_valid_dtype from vllm_mindspore.v1.attention.backends.ms_attn import MsAttentionMetadata +from vllm_mindspore.config import get_layers_from_vllm_config +from vllm_mindspore.model_executor.models.model_base import AttentionWrapper logger = init_logger(__name__) @@ -443,7 +445,8 @@ def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]: block_size = self.vllm_config.cache_config.block_size use_mla = self.vllm_config.model_config.use_mla kv_cache_spec: dict[str, KVCacheSpec] = {} - for layer_name, attn_module in forward_ctx.items(): + attn_layers = get_layers_from_vllm_config(self.vllm_config, AttentionWrapper) + for layer_name, attn_module in attn_layers.items(): # vllm-mindspore AttentionWrapper is not an Attention isinstance # assert isinstance(attn_module, Attention) if attn_module.attn_type == AttentionType.DECODER: