From e3efc79c5740266fad0fe28a3e670a9d2bdb0d73 Mon Sep 17 00:00:00 2001 From: guanguan Date: Thu, 31 Jul 2025 17:03:19 +0800 Subject: [PATCH] moe test --- .../linear_parallel_moe_common.py | 79 ++++-------- ...near_parallel_test_moe_alltoallv_matmul.py | 117 ++++-------------- ...near_parallel_test_moe_matmul_alltoallv.py | 94 ++++---------- 3 files changed, 65 insertions(+), 225 deletions(-) diff --git a/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_moe_common.py b/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_moe_common.py index 54a3177f..cbf113e6 100644 --- a/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_moe_common.py +++ b/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_moe_common.py @@ -14,6 +14,7 @@ import torch from collections import namedtuple import collections import numpy +import numpy as np class CommType(Enum): @@ -194,7 +195,7 @@ class QuantInfo: class MoeTestDate: def __init__(self, comm_type, rank_size, batch_size, m, k, n, trans_b, expert_per_rank, - coc_dtype_desc, quant_info, EP, TP, maxOutputSize=-1): + coc_dtype_desc, quant_info, EP, TP, maxOutputSize=-1, mode=0): activation_dtype, weight_dtype, l0c_dtype, output_dtype, l0c_dtype_low = supported_coc_data_type_dict[coc_dtype_desc] self.matrix_a_list = [] self.matrix_b_list = [] @@ -219,6 +220,7 @@ class MoeTestDate: self.m = m self.k = k self.n = n + self.mode = mode self.input_splits, self.output_splits, self.num_local_tokens_per_expert = self.get_moe_input_output_splits( expert_per_rank, EP) @@ -242,10 +244,18 @@ class MoeTestDate: self.matrix_b_list.append(self.matrix_b) def get_num_local_tokens_per_expert(self): - numpy.random.seed(0) - indices = numpy.random.randint(self.expert_num, size=self.sequence_length) - item_dict = collections.Counter(indices) - num_local_tokens_per_expert = [item_dict.get(i, 0) for i in range(self.expert_num)] + if self.mode == 1: + numpy.random.seed(0) + indices = numpy.random.randint(self.expert_num, size=self.sequence_length) + item_dict = collections.Counter(indices) + num_local_tokens_per_expert = [item_dict.get(i, 0) for i in range(self.expert_num)] + else: + p = np.zeros(self.expert_num) + p[0] = 0.9 + p[1:] = 0.1 / (self.expert_num - 1) + indices = numpy.random.choice(self.expert_num, size = self.sequence_length, p = p) + item_dict = collections.Counter(indices) + num_local_tokens_per_expert = [item_dict.get(i, 0) for i in range(self.expert_num)] return num_local_tokens_per_expert, indices def get_moe_input_output_splits(self, expert_per_rank, EP): @@ -419,26 +429,11 @@ class MoeTestDate: return pValue def generate_matrix_c_for_moe_309(self, coc_dtype_desc, rank_size, TP, EP, l0c_dtype, output_dtype, quant_info): - if l0c_dtype == torch.int32: - data_type_len = 2 - else: - data_type_len = 2 - pValue = int(self.get_pvalue(data_type_len)) - print("pvalue!!!!!!!!!!!!!!", pValue) if coc_dtype_desc in [CoCDataTypeDesc.FP16FP16_FP32_FP16, CoCDataTypeDesc.BF16BF16_FP32_BF16]: for i in range(rank_size): ep_idx = i // TP - matrix_c_low = torch.zeros((1,self.matrix_a_i_list[ep_idx].size(1),self.n)).to(output_dtype) - loop = math.ceil(self.k / (pValue * 256)) - for j in range(loop): - st = j * pValue * 256 - ed = min(self.k, (j + 1) * pValue * 256) - matrix_c_j = torch.matmul(self.matrix_a_i_list[ep_idx][:,:,st:ed].to(torch.float32), self.matrix_b[:,st:ed,:].to(torch.float32)) - matrix_c_j_low = matrix_c_j.to(output_dtype) - matrix_c_low = matrix_c_low + matrix_c_j_low matrix_c = torch.matmul(self.matrix_a_i_list[ep_idx].to(l0c_dtype), self.matrix_b.to(l0c_dtype)) self.matrix_c_list.append(matrix_c) - self.matrix_c_low_list.append(matrix_c_low) elif coc_dtype_desc in [CoCDataTypeDesc.INT8INT8_INT32_FP16, CoCDataTypeDesc.INT8INT8_INT32_BF16]: assert quant_info.dequant_granularity in [QuantGranularity.PER_CHANNEL, QuantGranularity.PER_TENSOR, @@ -465,33 +460,16 @@ class MoeTestDate: quant_scale_alltoall[ep_idx] = quant_scale_alltoall[ep_idx][:self.maxOutputSize, :] self.quant_scale_list.append(quant_scale) - for i in range(rank_size): ep_idx = i // TP - matrix_c_low = torch.zeros((1,self.matrix_a_i_list[ep_idx].size(1),self.n)).to(output_dtype) broadcast_offset, broadcast_scale = quant_info.get_moe_dequant_tensor(self.matrix_a_i_list[ep_idx].size(1), self.input_info[2], TP, l0c_dtype) - loop = math.ceil(self.k / (pValue * 256)) - for j in range(loop): - st = j * pValue * 256 - ed = min(self.k, (j + 1) * pValue * 256) - matrix_c_j = torch.matmul(self.matrix_a_i_list[ep_idx][:,:,st:ed].to(torch.float32), self.matrix_b[:,st:ed,:].to(torch.float32)) - matrix_c_j = (matrix_c_j * broadcast_scale) - matrix_c_j_low = matrix_c_j.to(output_dtype) - matrix_c_low = matrix_c_low + matrix_c_j_low - matrix_c_low = matrix_c_low.to(torch.float32) matrix_c = torch.matmul(self.matrix_a_i_list[ep_idx].to(torch.float32), self.matrix_b.to(torch.float32)).to(l0c_dtype) matrix_c = ((matrix_c + broadcast_offset).to(torch.float32) * broadcast_scale) - # matrix_c_low = ((matrix_c_low + broadcast_offset).to(torch.float32) * broadcast_scale) if quant_info.dequant_granularity is QuantGranularity.PER_TOKEN: - # print("!"*30, quant_scale_alltoall[ep_idx].shape) broadcast_quant_scale = quant_info.broadcast_quant_args(quant_scale_alltoall[ep_idx], [self.matrix_a_i_list[ep_idx].size(1), self.input_info[2]]) matrix_c = (matrix_c * broadcast_quant_scale) - matrix_c_low = (matrix_c_low * broadcast_quant_scale) - # self.matrix_c = matrix_c self.matrix_c_list.append(matrix_c) - self.matrix_c_low_list.append(matrix_c_low.to(output_dtype)) - # self.matrix_c_low = matrix_c_low.to(output_dtype) def cal_trunc(self, EP): self.global_tokens_per_expert_matrix_temp = self.global_tokens_per_expert_matrix.clone() @@ -506,7 +484,6 @@ class MoeTestDate: sum_tokens = self.maxOutputSize else: sum_tokens += self.global_tokens_per_expert_matrix_temp[j * self.expert_num + expert_id] - # print("self.global_tokens_per_expert_matrix_temp", self.global_tokens_per_expert_matrix_temp) def generate_matrix_c_for_moe_310(self, coc_dtype_desc, rank_size, TP, EP, l0c_dtype, l0c_dtype_low, quant_info): @@ -514,25 +491,18 @@ class MoeTestDate: self.cal_trunc(EP) if coc_dtype_desc in [CoCDataTypeDesc.FP16FP16_FP32_FP16, CoCDataTypeDesc.BF16BF16_FP32_BF16]: matrix_c_out = torch.matmul(self.matrix_a.to(l0c_dtype), self.matrix_b.to(l0c_dtype)) - # if coc_dtype_desc == CoCDataTypeDesc.BF16BF16_FP32_BF16: - # matrix_c_out_low = torch.matmul(self.matrix_a, self.matrix_b) - # else: - # matrix_c_out_low = matrix_c_out.to(l0c_dtype_low) - tmp_offset = 0 for rank in range(rank_size): + matrix_c = matrix_c_out.clone() + tmp_offset = 0 for ep_idx in range(EP): - # tmp_offset = 0 for local_expert_id in range(self.expert_per_rank): expert_id = local_expert_id + ep_idx * self.expert_per_rank if self.global_tokens_per_expert_matrix_temp[rank * self.expert_num + expert_id] < self.num_local_tokens_per_expert[rank][expert_id]: l = tmp_offset + self.global_tokens_per_expert_matrix_temp[rank * self.expert_num + expert_id] r = tmp_offset + self.num_local_tokens_per_expert[rank][expert_id] - matrix_c_out[:,l:r,:] = 0 - # matrix_c_out_low[:,l:r,:] = 0 + matrix_c[:,l:r,:] = 0 tmp_offset += self.num_local_tokens_per_expert[rank][expert_id] - self.matrix_c_list.append(matrix_c_out) - # self.matrix_c_low_list.append(matrix_c_out_low) - # print("self.matrix_c:", self.matrix_c) + self.matrix_c_list.append(matrix_c) elif coc_dtype_desc in [CoCDataTypeDesc.INT8INT8_INT32_FP16, CoCDataTypeDesc.INT8INT8_INT32_BF16]: assert quant_info.dequant_granularity in [QuantGranularity.PER_CHANNEL, QuantGranularity.PER_TENSOR, @@ -561,20 +531,17 @@ class MoeTestDate: if quant_info.dequant_granularity is QuantGranularity.PER_TOKEN: matrix_c_out = (matrix_c_out * broadcast_quant_scale).to(torch.float32) - # matrix_c_out_low = matrix_c_out.to(l0c_dtype_low) - tmp_offset = 0 for rank in range(rank_size): + matrix_c = matrix_c_out.clone() + tmp_offset = 0 for ep_idx in range(EP): - # tmp_offset = 0 for local_expert_id in range(self.expert_per_rank): expert_id = local_expert_id + ep_idx * self.expert_per_rank if self.global_tokens_per_expert_matrix_temp[rank * self.expert_num + expert_id] < self.num_local_tokens_per_expert[rank][expert_id]: l = tmp_offset + self.global_tokens_per_expert_matrix_temp[rank * self.expert_num + expert_id] r = tmp_offset + self.num_local_tokens_per_expert[rank][expert_id] - matrix_c_out[:,l:r,:] = 0 - # matrix_c_out_low[:,l:r,:] = 0 + matrix_c[:,l:r,:] = 0 tmp_offset += self.num_local_tokens_per_expert[rank][expert_id] - self.matrix_c_list.append(matrix_c_out) - # self.matrix_c_low_list.append(matrix_c_out_low) + self.matrix_c_list.append(matrix_c) diff --git a/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_test_moe_alltoallv_matmul.py b/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_test_moe_alltoallv_matmul.py index bfbf422b..081a487c 100644 --- a/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_test_moe_alltoallv_matmul.py +++ b/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_test_moe_alltoallv_matmul.py @@ -9,6 +9,7 @@ # import os +import re import json import unittest import sys @@ -69,12 +70,20 @@ def get_eb(golden:torch.Tensor, actual:torch.Tensor): return EB def one_golden_compare(tensor_a, tensor_b): + print("npu!!!!", tensor_a.shape) + print("cpu!!!!!!!!!!", tensor_b.shape) + tensor_a = tensor_a.to(torch.float32).reshape(-1) + tensor_b = tensor_b.to(torch.float32).reshape(-1) err = get_err_threshold_for_one_golden(tensor_a.dtype) if torch.isnan(tensor_a).any(): print("********Warning: npu result contains NaN!*************") return 1 tensor_a = tensor_a.to(torch.float32) tensor_b = tensor_b.to(torch.float32) + + if tensor_a.size(0) == 0 and tensor_b.size(0) == 0: + print("result is same with expect") + return 1 # 确定性计算要求2次npu计算结果完全一致 if os.getenv('LCCL_DETERMINISTIC', '0') == "1": if torch.equal(tensor_a, tensor_b): @@ -89,39 +98,14 @@ def one_golden_compare(tensor_a, tensor_b): max_relative_error_value_a = tensor_a[temp_id] max_relative_error_value_b = tensor_b[temp_id] result = (abs_error <= err * golden_nmax).all() - print("re!!!!!!!!!!!!!!npu,cpu, id", result, max_relative_error_value_a, max_relative_error_value_b, temp_id) + print("re!!!!!!!!!!!!!!npu,cpu, id", temp_tensor[temp_id], max_relative_error_value_a, max_relative_error_value_b, temp_id) if result: - return 0 - else: + print("result is same with expect") return 1 + else: + print("result is error") + return 0 -def read_binary_file(file_path, dtype=torch.float32): - try: - with open(file_path, "rb") as f: - binary_data = f.read() - writable_data = bytearray(binary_data) - if len(writable_data) == 0: - return None - tensor = torch.frombuffer(writable_data, dtype=dtype) - return tensor - except FileNotFoundError: - print(f"The file {file_path} does not exist!") - return None - -def write_to_bin(tensor, prefix): - file_path = f"{prefix}" - if tensor is None: - return - untyped_dict = { - torch.float16: torch.int16, - torch.bfloat16: torch.int16, - torch.int8: torch.int8, - torch.float32: torch.int32, - torch.int32: torch.int32, - torch.int64: torch.int64 - } - print(tensor.shape, tensor.dtype, file_path) - tensor.view(untyped_dict[tensor.dtype]).numpy().tofile(file_path) def main_worker(rank, comm_type, world_size, batch, M, K, N, trans_b, local_expert_nums, data_type, quant_info, EP, TP, quant_type, out_data_ype, matrix_a_list, matrix_b_list, dequant_scale_list, @@ -175,69 +159,9 @@ def main_worker(rank, comm_type, world_size, batch, M, K, N, trans_b, local_expe torch.npu.synchronize() golden_out_tensor = matrix_c_list[rank] - golden_out_tensor_low = matrix_c_low_list[rank] out_tensor_compare = out_tensor[0].to(torch.device('cpu'))[:golden_out_tensor.shape[1], :] - assert check_precision_new(out_tensor_compare, golden_out_tensor, golden_out_tensor_low) - - -def check_precision_new(tensor_a, tensor_b, tensor_c): - if torch.isnan(tensor_a).any(): - print("********Warning: npu result contains NaN!*************") - return 1 - epsilon = 1e-7 - d_type = tensor_a.dtype - err_threshold = get_err_threshold_for_two_golden(d_type) - eb_threshold = get_eb_threshold(d_type) - - tensor_a = tensor_a.to(torch.float32).reshape(-1) - tensor_b = tensor_b.to(torch.float32).reshape(-1) - tensor_c = tensor_c.to(torch.float32).reshape(-1) - - relative_error_npu = torch.abs(tensor_a - tensor_b) / (torch.abs(tensor_b) + epsilon) - relative_error_cpu = torch.abs(tensor_c - tensor_b) / (torch.abs(tensor_b) + epsilon) - if relative_error_npu.size(0) == 0 and relative_error_cpu.size(0) == 0: - print("result is same with expect") - return 1 - max_relative_error_npu = torch.max(relative_error_npu) - max_relative_error_cpu = torch.max(relative_error_cpu) - mean_relative_error_npu = torch.mean(relative_error_npu) - mean_relative_error_cpu = torch.mean(relative_error_cpu) - # 计算均方根误差 - mse_npu = torch.mean((tensor_a - tensor_b) ** 2) - rmse_npu = torch.sqrt(mse_npu) - mse_cpu = torch.mean((tensor_c - tensor_b) ** 2) - rmse_cpu = torch.sqrt(mse_cpu) - - EB = torch.abs(get_eb(tensor_b, tensor_a)) - - print("最大相对误差npu:", max_relative_error_npu) - print("最大相对误差cpu:", max_relative_error_cpu) - print("平均相对误差npu:", mean_relative_error_npu) - print("平均相对误差cpu:", mean_relative_error_cpu) - print("均方根误差npu:", rmse_npu) - print("均方根误差cpu:", rmse_cpu) - print("误差均衡性EB:", EB) - - max_relative_error_idx = torch.argmax(relative_error_npu) - max_relative_error_value_a = tensor_a[max_relative_error_idx] - max_relative_error_value_b = tensor_b[max_relative_error_idx] - max_relative_error_value_c = tensor_c[max_relative_error_idx] - - # 打印最大相对误差对应的tensor值 - # print(f"Max Relative Error Value: npu, golden, cpu: {max_relative_error_value_a.item()}, {max_relative_error_value_b.item()}, {max_relative_error_value_c.item()}") - - if max_relative_error_npu / max(max_relative_error_cpu, err_threshold) >= 10: - print(f"Max Relative Error Value: npu, golden, cpu, id: {max_relative_error_value_a.item()}, {max_relative_error_value_b.item()}, {max_relative_error_value_c.item()}, {max_relative_error_idx}") - if one_golden_compare(tensor_a, tensor_b): - print("resule is error") - return 0 - - if mean_relative_error_npu / max(mean_relative_error_cpu, err_threshold) >= 2 or rmse_npu / max(rmse_cpu, err_threshold) >= 2 or EB >= eb_threshold: - print("result is error") - return 0 - print("result is same with expect") - return 1 + assert one_golden_compare(out_tensor_compare, golden_out_tensor) def find_nearest_multiple(n: int, k: int = 512) -> int: r = n % k @@ -270,10 +194,11 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): dequant_granularity = 1 has_dequant_offset = -1 data_type = 2 + mode = 0 quant_info = QuantInfo(QuantGranularity(quant_granularity), quant_group_size, has_quant_offset, QuantGranularity(dequant_granularity), dequant_group_size, has_dequant_offset) moedata = MoeTestDate(CommType(comm_type), world_size, batch, M, K, N, trans_b, local_expert_nums, - CoCDataTypeDesc(data_type), quant_info, EP, TP, M*2) + CoCDataTypeDesc(data_type), quant_info, EP, TP, M*2, mode) matrix_a_list = moedata.matrix_a_list matrix_b_list = moedata.matrix_b_list dequant_scale_list = moedata.dequant_scale_list @@ -327,7 +252,7 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): has_quant_offset = -1 dequant_group_size = -1 # local_expert_nums = random.randint(1, 16) # 1- 16 - local_expert_nums = random.randint(1, 6) + local_expert_nums = random.randint(1, 16) # EP = 8 # EP * TP = WORLDSIZE EP = world_size TP = world_size // EP @@ -344,6 +269,7 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): data_type = 2 has_dequant_offset = -1 + mode = random.randint(0, 1) if data_type == 2: kalign = find_nearest_multiple(K, 512) else: @@ -354,11 +280,11 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): continue i += 1 print( - f"--M:{M}--N:{N}--K:{K}--world_size:{world_size}--local_expert_nums:{local_expert_nums}--EP:{EP}--TP:{TP}--dequant_granularity:{dequant_granularity}--out_data_type:{out_data_type}--data_type:{data_type}--i:{i}") + f"--M:{M}--N:{N}--K:{K}--world_size:{world_size}--local_expert_nums:{local_expert_nums}--EP:{EP}--TP:{TP}--dequant_granularity:{dequant_granularity}--out_data_type:{out_data_type}--data_type:{data_type}--mode:{mode}--i:{i}") quant_info = QuantInfo(QuantGranularity(quant_granularity), quant_group_size, has_quant_offset, QuantGranularity(dequant_granularity), dequant_group_size, has_dequant_offset) moedata = MoeTestDate(CommType(comm_type), world_size, batch, M, K, N, trans_b, local_expert_nums, - CoCDataTypeDesc(data_type), quant_info, EP, TP, M*2) + CoCDataTypeDesc(data_type), quant_info, EP, TP, M*2, mode) matrix_a_list = moedata.matrix_a_list matrix_b_list = moedata.matrix_b_list dequant_scale_list = moedata.dequant_scale_list @@ -367,7 +293,6 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): matrix_c_list = moedata.matrix_c_list matrix_c_low_list = moedata.matrix_c_low_list - mp.spawn(main_worker, nprocs=world_size, args=(comm_type, world_size, batch, M, K, N, trans_b, local_expert_nums, CoCDataTypeDesc(data_type), quant_info, EP, TP, dequant_granularity, out_data_type, diff --git a/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_test_moe_matmul_alltoallv.py b/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_test_moe_matmul_alltoallv.py index fab28204..5053f504 100644 --- a/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_test_moe_matmul_alltoallv.py +++ b/tests/apitest/opstest/python/operations/linear_parallel/linear_parallel_test_moe_matmul_alltoallv.py @@ -166,67 +166,9 @@ def main_worker(rank, comm_type, world_size, batch, M, K, N, trans_b, local_expe torch.npu.synchronize() golden_out_tensor = matrix_c_list[rank] - # golden_out_tensor_low = matrix_c_low_list[rank] out_tensor_compare = out_tensor[0].to(torch.device('cpu'))[:golden_out_tensor.shape[1], :] - # assert check_precision_new(out_tensor_compare, golden_out_tensor, golden_out_tensor_low) assert one_golden_compare(out_tensor_compare, golden_out_tensor) - -def check_precision_new(tensor_a, tensor_b, tensor_c): - if torch.isnan(tensor_a).any(): - print("********Warning: npu result contains NaN!*************") - return 1 - epsilon = 1e-7 - d_type = tensor_a.dtype - err_threshold = get_err_threshold_for_two_golden(d_type) - eb_threshold = get_eb_threshold(d_type) - - tensor_a = tensor_a.to(torch.float32).reshape(-1) - tensor_b = tensor_b.to(torch.float32).reshape(-1) - tensor_c = tensor_c.to(torch.float32).reshape(-1) - - relative_error_npu = torch.abs(tensor_a - tensor_b) / (torch.abs(tensor_b) + epsilon) - relative_error_cpu = torch.abs(tensor_c - tensor_b) / (torch.abs(tensor_b) + epsilon) - if relative_error_npu.size(0) == 0 and relative_error_cpu.size(0) == 0: - print("result is same with expect") - return 1 - max_relative_error_npu = torch.max(relative_error_npu) - max_relative_error_cpu = torch.max(relative_error_cpu) - mean_relative_error_npu = torch.mean(relative_error_npu) - mean_relative_error_cpu = torch.mean(relative_error_cpu) - # 计算均方根误差 - mse_npu = torch.mean((tensor_a - tensor_b) ** 2) - rmse_npu = torch.sqrt(mse_npu) - mse_cpu = torch.mean((tensor_c - tensor_b) ** 2) - rmse_cpu = torch.sqrt(mse_cpu) - - EB = torch.abs(get_eb(tensor_b, tensor_a)) - - print("最大相对误差npu:", max_relative_error_npu) - print("最大相对误差cpu:", max_relative_error_cpu) - print("平均相对误差npu:", mean_relative_error_npu) - print("平均相对误差cpu:", mean_relative_error_cpu) - print("均方根误差npu:", rmse_npu) - print("均方根误差cpu:", rmse_cpu) - print("误差均衡性EB:", EB) - - max_relative_error_idx = torch.argmax(relative_error_npu) - max_relative_error_value_a = tensor_a[max_relative_error_idx] - max_relative_error_value_b = tensor_b[max_relative_error_idx] - max_relative_error_value_c = tensor_c[max_relative_error_idx] - - if max_relative_error_npu / max(max_relative_error_cpu, err_threshold) >= 10: - print(f"Max Relative Error Value: npu, golden, cpu, id: {max_relative_error_value_a.item()}, {max_relative_error_value_b.item()}, {max_relative_error_value_c.item()}, {max_relative_error_idx}") - if one_golden_compare(tensor_a, tensor_b): - print("resule is error") - return 0 - - if mean_relative_error_npu / max(mean_relative_error_cpu, err_threshold) >= 2 or rmse_npu / max(rmse_cpu, err_threshold) >= 2 or EB >= eb_threshold: - print("result is error") - return 0 - print("result is same with expect") - return 1 - def find_nearest_multiple(n: int, k: int = 512) -> int: if n % k == 0: return n @@ -235,29 +177,31 @@ def find_nearest_multiple(n: int, k: int = 512) -> int: class LinearParallelCoverOperationTest(operation_test.OperationTest): - def test_linear_paraller_fp16_qunat(self): + def test_linear_paraller_fp16_qunat_3(self): if not operation_test.get_soc_version() == 'Ascend910B': return print(f"———————— LinearParallelCoverOp test start ————————") print("------------MATMUL REDUCESCATTER ALLTOALLVC Quantify scenarios-----------") - world_size = 8 + world_size = 4 comm_type = 310 batch = 1 - M = 25 - K = 9957 - N = 868 - trans_b = 1 + M = 5514 + N = 3866 + # K = 4096 + K = 8 + trans_b = 0 quant_granularity = -1 quant_group_size = -1 has_quant_offset = -1 dequant_group_size = -1 - local_expert_nums = 15 - EP = 8 + local_expert_nums = 12 + EP = 4 TP = 1 out_data_type = 1 - dequant_granularity = 1 + dequant_granularity = 3 has_dequant_offset = -1 data_type = 2 + mode = 0 quant_info = QuantInfo(QuantGranularity(quant_granularity), quant_group_size, has_quant_offset, QuantGranularity(dequant_granularity), dequant_group_size, has_dequant_offset) @@ -267,7 +211,7 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): outpusize = M * 2 moedata = MoeTestDate(CommType(comm_type), world_size, batch, M, K, N, trans_b, local_expert_nums, - CoCDataTypeDesc(data_type), quant_info, EP, TP, outpusize) + CoCDataTypeDesc(data_type), quant_info, EP, TP, outpusize, mode) matrix_a_list = moedata.matrix_a_i_list matrix_b_list = moedata.matrix_b_list dequant_scale_list = moedata.dequant_scale_list @@ -275,7 +219,6 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): global_tokens_per_expert_matrix = moedata.global_tokens_per_expert_matrix matrix_c_list = moedata.matrix_c_list matrix_c_low_list = moedata.matrix_c_low_list - mp.spawn(main_worker, nprocs=world_size, args=(comm_type, world_size, batch, M, K, N, trans_b, local_expert_nums, @@ -305,7 +248,7 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): return batch batch = 1 - M = random.randint(1, 128) + M = random.randint(1, 129) K = generate_batch(32) @@ -337,6 +280,11 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): data_type = 2 has_dequant_offset = -1 + mode = random.randint(0, 1) + if M <= 128: + outpusize = M * 8 + else: + outpusize = M * 2 if data_type == 2: kalign = find_nearest_multiple(K, 512) else: @@ -347,13 +295,13 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): i += 1 print( - f"--M:{M}--N:{N}--K:{K}--world_size:{world_size}--local_expert_nums:{local_expert_nums}--EP:{EP}--TP:{TP}--dequant_granularity:{dequant_granularity}--out_data_type:{out_data_type}--data_type:{data_type}--i:{i}") + f"--M:{M}--N:{N}--K:{K}--world_size:{world_size}--local_expert_nums:{local_expert_nums}--EP:{EP}--TP:{TP}--dequant_granularity:{dequant_granularity}--out_data_type:{out_data_type}--data_type:{data_type}--mode:{mode}--i:{i}") quant_info = QuantInfo(QuantGranularity(quant_granularity), quant_group_size, has_quant_offset, QuantGranularity(dequant_granularity), dequant_group_size, has_dequant_offset) moedata = MoeTestDate(CommType(comm_type), world_size, batch, M, K, N, trans_b, local_expert_nums, - CoCDataTypeDesc(data_type), quant_info, EP, TP, M*2) + CoCDataTypeDesc(data_type), quant_info, EP, TP, outpusize, mode) matrix_a_list = moedata.matrix_a_i_list matrix_b_list = moedata.matrix_b_list dequant_scale_list = moedata.dequant_scale_list @@ -367,7 +315,7 @@ class LinearParallelCoverOperationTest(operation_test.OperationTest): args=(comm_type, world_size, batch, M, K, N, trans_b, local_expert_nums, CoCDataTypeDesc(data_type), quant_info, EP, TP, dequant_granularity, out_data_type, matrix_a_list, matrix_b_list, dequant_scale_list, quant_scale_list, - global_tokens_per_expert_matrix, matrix_c_list, matrix_c_low_list)) + global_tokens_per_expert_matrix, matrix_c_list, matrix_c_low_list, outpusize)) if i >= 700: break -- Gitee