diff --git a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_alibi_bf16.py b/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_alibi_bf16.py deleted file mode 100644 index b1b1adea3cbf0e0f6428937f8fa7a03051b62e3d..0000000000000000000000000000000000000000 --- a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_alibi_bf16.py +++ /dev/null @@ -1,57 +0,0 @@ -# -# Copyright (c) 2024 Huawei Technologies Co., Ltd. -# This file is a part of the CANN Open Software. -# Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). -# Please refer to the License for details. You may not use this file except in compliance with the License. -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, -# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. -# See LICENSE in the root of the software repository for the full text of the License. -# -import json -import math -import os -import sys -import unittest -import random -import numpy as np -import torch -import torch_npu - -np.random.seed(0) - -sys.path.append(os.path.join(os.path.dirname(__file__), "../")) -import operation_test # NOQA: E402 -from self_attention.self_attention_test_data_generator import SelfAttentionTestDataGenerator - -data_generator = SelfAttentionTestDataGenerator() - -data = data_generator.test_flash_attention_case_fa_encoder_nocache_bf16_alibi() -param_seqlen = data[4] -data[4] = torch.from_numpy(np.array(data[4]).astype(np.int32)) - -in_tensors = [tensor.npu().contiguous() for tensor in data] - -OP_NAME = "SelfAttentionOperation" -PARAM = json.dumps({"headNum": 12, "qkScale": 1, "kvHeadNum": 1, - "calcType": 3, "maskType": 2, "isTriuMask": 1, "kernelType": 0}) -RUN_PARAM = json.dumps({"seqLen": param_seqlen}) - -class TestFlashAttentionEncoderOperationAlibiBf16(operation_test.OperationTest): - def golden_calc(self, input_tensors): - return [in_tensors[5]] - - def golden_compare(self, out_tensor, golden_out_tensor): - ratios = [0.001, 0.001, 0.005, 0.005] - return data_generator.compare_output_data(out_tensor.cpu(), golden_out_tensor.cpu(), ratios) - - def test(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - self.execute_with_param(OP_NAME, PARAM, RUN_PARAM, [ - in_tensors[0], in_tensors[1], in_tensors[2], in_tensors[3], in_tensors[4] - ]) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_compress_mask_swa_cycle_cache.py b/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_compress_mask_swa_cycle_cache.py deleted file mode 100644 index cb5d40a5f762837d69cfc782fec02bd37312581a..0000000000000000000000000000000000000000 --- a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_compress_mask_swa_cycle_cache.py +++ /dev/null @@ -1,275 +0,0 @@ -# -# Copyright (c) 2024 Huawei Technologies Co., Ltd. -# This file is a part of the CANN Open Software. -# Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). -# Please refer to the License for details. You may not use this file except in compliance with the License. -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, -# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. -# See LICENSE in the root of the software repository for the full text of the License. -# -import json -import math -import os -import sys -import unittest -import random -import numpy as np -import torch -import torch_npu - -np.random.seed(0) - -sys.path.append(os.path.join(os.path.dirname(__file__), "../")) -import operation_test # NOQA: E402 - - -def gen_seq_len(batch, max_seq, variate_seq=False): - if variate_seq: - num = max_seq // 16 - seqlen_aligned_arange = np.arange(1, num) * 16 - if batch > num: - seqlen_aligned_remain = np.random.randint(1, max_seq, size=(batch - num)) - seqlen_aligned_remain[:] = ((seqlen_aligned_remain[:] + 15) // 16) * 16 - seqlen_aligned = np.concatenate((seqlen_aligned_arange, seqlen_aligned_remain), 0) - else: - seqlen_aligned = seqlen_aligned_arange - sp_list = np.random.randint(0, 15, size=(num - 1)) - seqlen = seqlen_aligned - sp_list - seqlen = seqlen[-batch:] - seqlen_aligned = seqlen_aligned[-batch:] - print(seqlen) - else: - max_seq_aligned = (max_seq + 15) // 16 * 16 - sp_list = np.ones((batch,)) * (max_seq_aligned - max_seq) - sp_list = sp_list.astype(np.int32) - seqlen = np.ones((batch,)) * max_seq - seqlen = seqlen.astype(np.int32) - print(seqlen) - seqlen_aligned = np.ones((batch,)) * max_seq_aligned - seqlen_aligned = seqlen_aligned.astype(np.int32) - - ntokens = seqlen.sum() - print("ntokens:", ntokens) - return seqlen, seqlen_aligned, ntokens - - -def group_matmul(heads, group_num, A, B): - group_head = heads // group_num - score = None - for i in range(group_num): - group_score = np.matmul(A[i * group_head: (i + 1) * group_head, :, :].astype(np.float32), - B[i:(i + 1), :, :].astype(np.float32)).astype(np.float16) - if score is None: - score = group_score - else: - score = np.concatenate((score, group_score), 0) - print(score.shape) - return score - -def gen_swa_cmp(window_size, embeddim): - swa_mask = np.ones(shape=(1, 512, 512)) * -10000.0 - pp_n = 128 if embeddim <= 128 else 64 - # pp_n = 128 - if window_size <= pp_n * 3: - true_size = window_size - else: - if window_size % pp_n == 0: - true_size = pp_n * 3 - else: - true_size = pp_n * 2 + window_size % pp_n - triu_mask = np.triu(swa_mask, 1) - tril_mask = np.tril(swa_mask, -true_size) - swa_mask = triu_mask + tril_mask - # swa_mask = torch.from_numpy(swa_mask).to(torch.float16) - swa_mask = swa_mask.reshape(512,512) - return swa_mask - -def calc_expect_func(batch, seqlen, heads, embed, window_size, mask_type, group_num=32): - is_mask = True - variate_seq = False - is_decoder = False - max_seq = 2048 - src_type = 'float16' - fp32 = True - print(f"group_num: {group_num}") - print("q_seq is:") - if is_decoder: - q_seqlen, q_seqlen_aligned, q_ntokens = gen_seq_len(batch, 1, variate_seq) - kv_seqlen, kv_seqlen_aligned, kv_ntokens = gen_seq_len(batch, seqlen, variate_seq) - else: - q_seqlen, q_seqlen_aligned, q_ntokens = gen_seq_len(batch, seqlen, variate_seq) - kv_seqlen, kv_seqlen_aligned, kv_ntokens = q_seqlen, q_seqlen_aligned, q_ntokens # crossattention时,q_seqlen != k_seqlen - - max_s = np.max(q_seqlen) - ntokens2 = (q_seqlen * kv_seqlen).sum() - embed_v = embed - - q = np.random.uniform(-1.0, 1.0, size=(q_ntokens, heads * embed)).astype(np.float16) - k = np.random.uniform(-1.0, 1.0, size=(kv_ntokens, group_num * embed)).astype(np.float16) - v = np.random.uniform(-1.0, 1.0, size=(kv_ntokens, group_num * embed_v)).astype(np.float16) - - mask = np.ones(shape=(1, max_s, max_s)).astype(np.float16) # 使用当前最大seqlen生成mask - mask_u = np.triu(mask, 1) - mask_l = np.tril(mask, -window_size) - mask = mask_u + mask_l - mask *= -10000.0 - - # print(mask) - - q_offset = 0 - k_offset = 0 - v_offset = 0 - - s = None - _p = None - out = None - - for idx in range(batch): - q_s = q_seqlen[idx] - kv_s = kv_seqlen[idx] - q_slice = q[q_offset:q_offset + q_s][:] - q_slice = q_slice.reshape(q_s, heads, embed) - q_slice = np.transpose(q_slice, (1, 0, 2)) # (heads, q_seq, embed) - k_slice = k[k_offset:k_offset + kv_s][:] - k_slice = k_slice.reshape(kv_s, group_num, embed) - k_slice = np.transpose(k_slice, (1, 0, 2)) - k_slice_t = np.transpose(k_slice, (0, 2, 1)) # get K^T (kv_heads, embed, k_seq) - v_slice = v[v_offset:v_offset + kv_s][:] - v_slice = v_slice.reshape(kv_s, group_num, embed_v) - v_slice = np.transpose(v_slice, (1, 0, 2)) - score = group_matmul(heads, group_num, q_slice, k_slice_t) - if s is None: - s = score.reshape([-1, ]) - else: - s = np.concatenate((s, score.reshape([-1, ])), 0) - - tor = np.float16(1.0 / math.sqrt(1.0 * embed)) - score = score * tor - if is_mask: - score = score + mask[:, :q_s, :kv_s] - score_max = np.max(score, axis=-1) - score = score - score_max.reshape((heads, q_s, 1)) - score_exp = np.exp(score.astype(np.float32)) - if not fp32: - score_sum = np.sum(score_exp.astype(np.float16), axis=-1) - if _p is None: - _p = score_exp.astype(np.float16).reshape([-1, ]) - else: - _p = np.concatenate((_p, score_exp.astype(np.float16).reshape([-1, ])), 0) - p = score_exp.astype(np.float16) / score_sum.reshape((heads, q_s, 1)).astype(np.float16) - out_sub = group_matmul(heads, group_num, p, v_slice) - else: - score_sum = np.sum(score_exp, axis=-1) - if _p is None: - _p = score_exp.astype(np.float16).reshape([-1, ]) - else: - _p = np.concatenate((_p, score_exp.astype(np.float16).reshape([-1, ])), 0) - p = score_exp.astype(np.float16) - out_sub = group_matmul(heads, group_num, p, v_slice) - out_sub = out_sub / score_sum.reshape((heads, q_s, 1)).astype(np.float16) - - out_sub = out_sub.reshape(heads, q_s, embed_v) - out_sub = np.transpose(out_sub, (1, 0, 2)) - out_sub = np.ascontiguousarray(out_sub) - if out is None: - out = out_sub - else: - out = np.concatenate((out, out_sub), 0) - - q_offset += q_s - k_offset += kv_s - v_offset += kv_s - - print("==> data generate finished!") - - q = q.astype(src_type).reshape(-1, heads, embed) - k = k.astype(src_type).reshape(-1, group_num, embed) - v = v.astype(src_type).reshape(-1, group_num, embed_v) - # mask = mask.astype(src_type).reshape(max_s, max_s) - mask = gen_swa_cmp(window_size, embed).astype(src_type) - q_len = q_seqlen.astype(np.int32) - out = out.astype(src_type).reshape(-1, heads, embed_v) - ret_data = q, k, v, mask, q_len, out - return ret_data - -if operation_test.get_soc_version() == 'Ascend910B': - kv_head = 2 - window_size = 32 - mask_type = 8 - data = calc_expect_func(2, 1024, 2, 128, window_size, mask_type, group_num=kv_head) - param_seqlen = data[4].tolist() - in_tensors = [torch.from_numpy(tensor) for tensor in data] - in_tensors = [tensor.npu() for tensor in in_tensors] - a = [print(tensor.dtype, tensor.device) for tensor in in_tensors] - - OP_NAME = "SelfAttentionOperation" - PARAM = json.dumps({"headNum": kv_head, "qkScale": (1 / float(math.sqrt(128))), "kvHeadNum": kv_head, \ - "maskType": mask_type, "calcType": 3, "windowSize": 32, "cacheType": 1}) - RUN_PARAM = json.dumps({"seqLen": param_seqlen}) - print(PARAM, RUN_PARAM) - - -class TestFlashAttentionEncoderOperation(operation_test.OperationTest): - def golden_calc(self, input_tensors): - return [in_tensors[5]] - - def golden_compare(self, out_tensor, golden_out_tensor): - # print(out_tensor.cpu()) - # return torch.allclose(out_tensor.cpu(), golden_out_tensor.cpu(), rtol=0.001, atol=0.001) - out = out_tensor - golden = golden_out_tensor - ratios = [0.001, 0.001, 0.003, 0.003, 0.005, 0.005] - embeddim = 128 - max_seq = 1024 - error_count = 0 - strict_error_count = 0 - alibi_error_count = 0 - fp16_min_normal = 1.0 / (1 << 14) - out = out.flatten() - golden = golden.flatten() - out_len = out.shape[0] - diff = torch.abs(golden - out) - # max_diff = diff.max().item() - # print("maxDiff: " , max_diff) - golden = golden.to(torch.float32) - out = out.to(torch.float32) - - - limit_error = torch.maximum( - torch.abs(golden * ratios[0]), torch.tensor(ratios[1])) - strict_limit_error = torch.maximum( - torch.abs(golden * ratios[2]), torch.tensor(ratios[3])) - error_count = torch.gt(diff, limit_error).sum().item() - strict_error_count = torch.gt( - diff, strict_limit_error).sum().item() - print("1/1000 Accuracy is: ", - 1 - float(error_count) / out_len) - print("3/1000 Accuracy is ", 1 - - float(strict_error_count) / out_len) - print("accuracy is correct: ", (float(strict_error_count) / out_len) <= ratios[0]) - # 新精度标准fp16 参考精度标准v0.3浮点计算单标杆 - # 计算次数 两个matmul + 一个softmax - calc_times = embeddim * max_seq + 4 - # import pdb;pdb.set_trace() - if calc_times < 2048: - error = 2**(-8) - else : - error = 2**(-7) - error_threshold = torch.clamp(torch.abs(golden), min = 1) * error - return (diff <= error_threshold).all() - - - def test(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - - - self.execute_with_param(OP_NAME, PARAM, RUN_PARAM, [ - in_tensors[0], in_tensors[1], in_tensors[2], in_tensors[3], in_tensors[4] - ]) - - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_logn.py b/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_logn.py deleted file mode 100644 index 9c69032f664affc5fa9936f2ff7539a4e6f0526e..0000000000000000000000000000000000000000 --- a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_logn.py +++ /dev/null @@ -1,226 +0,0 @@ -# -# Copyright (c) 2024 Huawei Technologies Co., Ltd. -# This file is a part of the CANN Open Software. -# Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). -# Please refer to the License for details. You may not use this file except in compliance with the License. -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, -# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. -# See LICENSE in the root of the software repository for the full text of the License. -# -import json -import math -import os -import sys -import unittest -import random -import numpy as np -import torch -import torch_npu - -np.random.seed(0) - -sys.path.append(os.path.join(os.path.dirname(__file__), "../")) -import operation_test # NOQA: E402 -scaleType = 1 - -def gen_seq_len(batch, max_seq, variate_seq=False): - if variate_seq: - num = max_seq // 16 - seqlen_aligned_arange = np.arange(1, num) * 16 - if batch > num: - seqlen_aligned_remain = np.random.randint(1, max_seq, size=(batch - num)) - seqlen_aligned_remain[:] = ((seqlen_aligned_remain[:] + 15) // 16) * 16 - seqlen_aligned = np.concatenate((seqlen_aligned_arange, seqlen_aligned_remain), 0) - else: - seqlen_aligned = seqlen_aligned_arange - sp_list = np.random.randint(0, 15, size=(num - 1)) - seqlen = seqlen_aligned - sp_list - seqlen = seqlen[-batch:] - seqlen_aligned = seqlen_aligned[-batch:] - print(seqlen) - else: - max_seq_aligned = (max_seq + 15) // 16 * 16 - sp_list = np.ones((batch,)) * (max_seq_aligned - max_seq) - sp_list = sp_list.astype(np.int32) - seqlen = np.ones((batch,)) * max_seq - seqlen = seqlen.astype(np.int32) - print(seqlen) - seqlen_aligned = np.ones((batch,)) * max_seq_aligned - seqlen_aligned = seqlen_aligned.astype(np.int32) - - ntokens = seqlen.sum() - print("ntokens:", ntokens) - return seqlen, seqlen_aligned, ntokens - - -def group_matmul(heads, group_num, A, B): - group_head = heads // group_num - score = None - for i in range(group_num): - group_score = np.matmul(A[i * group_head: (i + 1) * group_head, :, :].astype(np.float32), - B[i:(i + 1), :, :].astype(np.float32)).astype(np.float16) - if score is None: - score = group_score - else: - score = np.concatenate((score, group_score), 0) - print(score.shape) - return score - - -def calc_expect_func(batch, seqlen, heads, embed, group_num=32): - is_mask = False - variate_seq = False - is_decoder = False - max_seq = 2048 - src_type = 'float16' - fp32 = True - print(f"group_num: {group_num}") - print("q_seq is:") - if is_decoder: - q_seqlen, q_seqlen_aligned, q_ntokens = gen_seq_len(batch, 1, variate_seq) - kv_seqlen, kv_seqlen_aligned, kv_ntokens = gen_seq_len(batch, seqlen, variate_seq) - else: - q_seqlen, q_seqlen_aligned, q_ntokens = gen_seq_len(batch, seqlen, variate_seq) - kv_seqlen, kv_seqlen_aligned, kv_ntokens = q_seqlen, q_seqlen_aligned, q_ntokens # crossattention时,q_seqlen != k_seqlen - - max_s = np.max(q_seqlen) - ntokens2 = (q_seqlen * kv_seqlen).sum() - embed_v = np.random.randint(1,embed) - - q = np.random.uniform(-1.0, 1.0, size=(q_ntokens, heads * embed)).astype(np.float16) - k = np.random.uniform(-1.0, 1.0, size=(kv_ntokens, group_num * embed)).astype(np.float16) - v = np.random.uniform(-1.0, 1.0, size=(kv_ntokens, group_num * embed)).astype(np.float16) - mask = np.ones(shape=(1, max_s, max_s)).astype(np.float16) # 使用当前最大seqlen生成mask - mask = np.triu(mask, 1) - mask *= -10000.0 - # print(mask) - - q_offset = 0 - k_offset = 0 - v_offset = 0 - # logn功能开关 - if scaleType: - m = 8192 # seq_length in qwen config - base = 2 * m - logn_arr = np.array([ - math.log(i, m) if i > m else 1 - for i in range(base, base + seqlen)# max_seq_length in qwen config is (1,32768) - ]).astype(np.float32) - else: - logn_arr=[] - s = None - _p = None - out = None - - for idx in range(batch): - q_s = q_seqlen[idx] - kv_s = kv_seqlen[idx] - q_slice = q[q_offset:q_offset + q_s][:] - q_slice = q_slice.reshape(q_s, heads, embed) - q_slice = np.transpose(q_slice, (1, 0, 2)) # (heads, q_seq, embed) - k_slice = k[k_offset:k_offset + kv_s][:] - k_slice = k_slice.reshape(kv_s, group_num, embed) - k_slice = np.transpose(k_slice, (1, 0, 2)) - k_slice_t = np.transpose(k_slice, (0, 2, 1)) # get K^T (kv_heads, embed, k_seq) - v_slice = v[v_offset:v_offset + kv_s][:] - v_slice = v_slice.reshape(kv_s, group_num, embed) - v_slice = np.transpose(v_slice, (1, 0, 2)) - score = group_matmul(heads, group_num, q_slice, k_slice_t) - if s is None: - s = score.reshape([-1, ]) - else: - s = np.concatenate((s, score.reshape([-1, ])), 0) - - tor = np.float16(1.0 / math.sqrt(1.0 * embed)) - score = score * tor - if scaleType: - score = score * logn_arr[None, :seqlen, None] - if is_mask: - score = score + mask[:, :q_s, :kv_s] - score_max = np.max(score, axis=-1) - score = score - score_max.reshape((heads, q_s, 1)) - score_exp = np.exp(score.astype(np.float32)) - if not fp32: - score_sum = np.sum(score_exp.astype(np.float16), axis=-1) - if _p is None: - _p = score_exp.astype(np.float16).reshape([-1, ]) - else: - _p = np.concatenate((_p, score_exp.astype(np.float16).reshape([-1, ])), 0) - p = score_exp.astype(np.float16) / score_sum.reshape((heads, q_s, 1)).astype(np.float16) - out_sub = group_matmul(heads, group_num, p, v_slice) - else: - score_sum = np.sum(score_exp, axis=-1) - if _p is None: - _p = score_exp.astype(np.float16).reshape([-1, ]) - else: - _p = np.concatenate((_p, score_exp.astype(np.float16).reshape([-1, ])), 0) - p = score_exp.astype(np.float16) - out_sub = group_matmul(heads, group_num, p, v_slice) - out_sub = out_sub / score_sum.reshape((heads, q_s, 1)).astype(np.float16) - - out_sub = out_sub.reshape(heads, q_s, embed) - out_sub = np.transpose(out_sub, (1, 0, 2)) - out_sub = np.ascontiguousarray(out_sub) - if out is None: - out = out_sub - else: - out = np.concatenate((out, out_sub), 0) - - q_offset += q_s - k_offset += kv_s - v_offset += kv_s - - print("==> data generate finished!") - - q = q.astype(src_type).reshape(-1, heads, embed) - k = k.astype(src_type).reshape(-1, group_num, embed) - v = v.astype(src_type).reshape(-1, group_num, embed) - mask = mask.astype(src_type).reshape(max_s, max_s) - q_len = q_seqlen.astype(np.int32) - out = out.astype(src_type).reshape(-1, heads, embed) - if scaleType: - ret_data = q, k, v, mask, q_len, out, logn_arr - else: - ret_data = q, k, v, mask, q_len, out - return ret_data - - -kv_head = 32 -data = calc_expect_func(16, 128, 32, 128, group_num=kv_head) -param_seqlen = data[4].tolist() -in_tensors = [torch.from_numpy(tensor) for tensor in data] -in_tensors = [tensor.npu() for tensor in in_tensors] -if scaleType: - in_tensors[6] = torch.tensor(in_tensors[6], dtype=torch.float32) -a = [print(tensor.dtype, tensor.device) for tensor in in_tensors] - -OP_NAME = "SelfAttentionOperation" -PARAM = json.dumps({"headNum": 32, "qkScale": (1 / float(math.sqrt(128))), "kvHeadNum": kv_head, \ - "calcType": 3,"kernelType": 1,"scaleType": scaleType}) -RUN_PARAM = json.dumps({"seqLen": param_seqlen, "scaleType": scaleType}) -print(PARAM, RUN_PARAM) - - -class TestFlashAttentionEncoderOperation(operation_test.OperationTest): - def golden_calc(self, input_tensors): - return [in_tensors[5]] - - def golden_compare(self, out_tensor, golden_out_tensor): - return torch.allclose(out_tensor, golden_out_tensor, rtol=0.001, atol=0.001) - - def test(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - if scaleType: - self.execute_with_param(OP_NAME, PARAM, RUN_PARAM, [ - in_tensors[0], in_tensors[1], in_tensors[2], in_tensors[4], in_tensors[6] - ]) - else: - self.execute_with_param(OP_NAME, PARAM, RUN_PARAM, [ - in_tensors[0], in_tensors[1], in_tensors[2], in_tensors[4] - ]) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_mask_free_fp16.py b/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_mask_free_fp16.py deleted file mode 100644 index 834da0e42fce62334e1810a1301558413f58314b..0000000000000000000000000000000000000000 --- a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_encoder_operation_mask_free_fp16.py +++ /dev/null @@ -1,57 +0,0 @@ -# -# Copyright (c) 2024 Huawei Technologies Co., Ltd. -# This file is a part of the CANN Open Software. -# Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). -# Please refer to the License for details. You may not use this file except in compliance with the License. -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, -# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. -# See LICENSE in the root of the software repository for the full text of the License. -# -import json -import math -import os -import sys -import unittest -import random -import numpy as np -import torch -import torch_npu - -np.random.seed(0) - -sys.path.append(os.path.join(os.path.dirname(__file__), "../")) -import operation_test # NOQA: E402 -from self_attention.self_attention_test_data_generator import SelfAttentionTestDataGenerator - -data_generator = SelfAttentionTestDataGenerator() - -data = data_generator.test_flash_attention_case_fa_encoder_nocache_bf16_alibi_compress() -param_seqlen = data[4] -data[4] = torch.from_numpy(np.array(data[4]).astype(np.int32)) - -in_tensors = [tensor.npu().contiguous() for tensor in data] - -OP_NAME = "SelfAttentionOperation" -PARAM = json.dumps({"headNum": 12, "qkScale": 1, "kvHeadNum": 1, - "calcType": 3, "maskType": 4, "isTriuMask": 1, "kernelType": 1}) -RUN_PARAM = json.dumps({"seqLen": param_seqlen}) - -class TestFlashAttentionEncoderOperationMaskFreeFp16(operation_test.OperationTest): - def golden_calc(self, input_tensors): - return [in_tensors[6]] - - def golden_compare(self, out_tensor, golden_out_tensor): - ratios = [0.001, 0.001, 0.005, 0.005] - return data_generator.compare_output_data(out_tensor.cpu(), golden_out_tensor.cpu(), ratios) - - def test(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - self.execute_with_param(OP_NAME, PARAM, RUN_PARAM, [ - in_tensors[0], in_tensors[1], in_tensors[2], in_tensors[3], in_tensors[4], in_tensors[5] - ]) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_operation_bypass_swa.py b/tests/apitest/opstest/python/operations/self_attention/test_self_attention_operation_bypass_swa.py deleted file mode 100644 index b503f2372726995c9b5ca0028317f4aa937715a2..0000000000000000000000000000000000000000 --- a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_operation_bypass_swa.py +++ /dev/null @@ -1,624 +0,0 @@ -# -# Copyright (c) 2024 Huawei Technologies Co., Ltd. -# This file is a part of the CANN Open Software. -# Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). -# Please refer to the License for details. You may not use this file except in compliance with the License. -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, -# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. -# See LICENSE in the root of the software repository for the full text of the License. -# -import sys -import os -import unittest -import json -import math -import torch -import torch_npu -import numpy as np -import logging -sys.path.append(os.path.join(os.path.dirname(__file__), "../")) -import operation_test # NOQA: E402 -import pdb - -OP_NAME = "SelfAttentionOperation" - -MASK_TYPE_NO_HEAD_DECODER = 5 -class TestUnpadSelfAttentionOperation(operation_test.OperationTest): - def test_swa_decoder(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - mask_type = MASK_TYPE_NO_HEAD_DECODER - self.data_type = torch.float16 - data_type = self.data_type - self.batch = 8 - batch = self.batch - self.kv_head = 32 # kv_head num - kv_head = self.kv_head - self.is_decoder = 1 # prefill or decoder - self.heads = 32 # llama7b hidden_size 4096 - self.embeddim = 128 - self.embeddim_v = self.embeddim - self.max_seq = 256 - tor = 1 - self.dynamic_batch = False - kv_seqLen = [114] * batch - qSeqLen = [1] * batch - self.window_size = 16 - self.cacheType = 0 - self.is_clamp = 0 - self.clamp_min = 0 - self.clamp_max = 0 - self.is_triu_mask = False - self.long_seq = False - self.is_alibi = False - self.q_seqlen, self.q_ntokens = self.gen_seq_len(batch, [1] * batch) - self.kv_seqlen, self.kv_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.layer_id = torch.from_numpy(np.array([0], dtype=np.int32)).to(torch.int32) - self.q_max_seq = np.max(self.q_seqlen) - self.kv_max_seq = np.max(self.kv_seqlen) - q = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.q_ntokens, self.heads * self.embeddim))) - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - #self.q = (q * tor).to(data_type) - self.q = q.to(data_type) - self.k = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim))).to(data_type) - self.v = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim_v))).to(data_type) - - - self.q_scale = 1 - self.qk_scale = tor - param = json.dumps({"headNum": self.heads, "qScale": float(self.q_scale), "qkScale": float(self.qk_scale), "maskType": 7, "kvcacheCfg":1,"calcType":2, - "windowSize":self.window_size}) - self.param_seqlen = self.q_seqlen - self.param_token_offset = self.kv_seqlen - run_param = json.dumps({"tokenOffset": self.param_token_offset, "seqLen": self.param_seqlen, "maskType": 7}) - #pdb.set_trace() - self.execute_with_param(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),torch.tensor(self.kv_seqlen).to(torch.int32).npu(), torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu()]) - - def gen_seq_len(self, batch, seq_len): - ntokens = sum(seq_len) - return seq_len, ntokens - - def gen_mask(self, batch, heads, data_type, mask_type): - import random - q_max_seq = self.max_seq - kv_max_seq = self.max_seq - mask_type_dict = { - # 三维的alibi mask - #MASK_TYPE_NO_HEAD : ((batch, q_max_seq, kv_max_seq), (lambda mask, idx, q_s, kv_s: (mask[idx, :q_s, :kv_s]))), - MASK_TYPE_NO_HEAD_DECODER : ((batch, 1, kv_max_seq), (lambda mask, idx, q_s, kv_s: (mask[idx, :q_s, :kv_s]))), - - } - # kernel中mask的系数 - if data_type == torch.float16: - post_mask_coff = 1 - pre_mask_coff = -10000.0 - elif data_type == torch.bfloat16 and self.is_alibi: - post_mask_coff = 1 - pre_mask_coff = -float("inf") - elif data_type == torch.float32 and self.is_alibi: - post_mask_coff = 1 - pre_mask_coff = 1 - else: - post_mask_coff = -3e38 - pre_mask_coff = 1 - if data_type == torch.float16: - if self.is_alibi or self.long_seq: - select_zero = False - else: - select_zero = True - elif data_type == torch.bfloat16: - if self.is_alibi: - select_zero = False - elif self.dynamic_batch or self.is_decoder: - select_zero = True - else: - select_zero = False - else: - if self.is_alibi or self.is_decoder: - select_zero = True - else: - select_zero = False - if self.is_triu_mask: - select_zero = False - - self.mask_info = mask_type_dict[mask_type] - # print("-------------------",self.mask_info[0]) - mask = np.ones(shape=self.mask_info[0]) * pre_mask_coff - mask = np.triu(mask, 1) - zero_indice = random.choices(range(self.max_seq), k = 300) - if self.is_alibi: - self.alibi_bias = self.get_alibi_bias(heads, self.max_seq) - mask += self.alibi_bias.numpy() - if select_zero: - mask.flat[zero_indice] = 0 - self.mask = torch.from_numpy(mask).to(torch.float32) - #self.mask[0]=self.mask[1] - #self.mask = torch.zeros(self.mask.shape) - self.post_mask_coff = post_mask_coff - self.pre_mask_coff = pre_mask_coff - - def group_mm_torch(self, heads, group_num, A, B): - group_head = heads // group_num - score = None - for i in range(group_num): - group_score = torch.matmul(A[i * group_head: (i + 1) * group_head, :, :].to(torch.float32), B[i:(i + 1), :, :].to(torch.float32)) - if score is None: - score = group_score - else: - score = torch.cat((score, group_score), 0) - return score - - def compare_output_data(self, out, golden, ratios): - error_count = 0 - strict_error_count = 0 - alibi_error_count = 0 - fp16_min_normal = 1.0 / (1 << 14) - len = out.shape[0] * out.shape[1] - diff = torch.abs(golden - out) - max_diff = diff.max().item() - print("maxDiff:", max_diff) - - - limit_error = torch.maximum(torch.abs(golden * ratios[0]), torch.tensor(ratios[1])) - strict_limit_error = torch.maximum(torch.abs(golden * ratios[2]), torch.tensor(ratios[3])) - error_count = torch.gt(diff, limit_error).sum().item() - strict_error_count = torch.gt(diff, strict_limit_error).sum().item() - print("1/1000 Accuracy is ", 1 - float(error_count) / len) - print("3/1000 Accuracy is ", 1 - float(strict_error_count) / len) - if self.data_type == torch.bfloat16 or not self.is_decoder: - return (float(strict_error_count) / len) <= ratios[2] - else: - return (float(error_count) / len) <= ratios[0] - - def golden_calc(self, in_tensors): - q_offset = 0 - k_offset = 0 - v_offset = 0 - isdecoder = 1 - batch = self.batch - heads = self.heads - embed = self.embeddim - embed_v = self.embeddim_v - max_seq = self.max_seq - q_seqlen = self.q_seqlen - kv_seqlen = self.kv_seqlen - kv_head = self.kv_head - - is_mask = True - q = self.q - k = self.k - v = self.v - q_ntokens = self.q_ntokens - kv_ntokens = self.kv_ntokens - layer_id = self.layer_id[0] - self.is_multi_layer = True - s = None - _p = None - out = None - - for idx in range(batch): - q_s = q_seqlen[idx] - kv_s = kv_seqlen[idx] - q_slice = q[q_offset:q_offset + q_s][:] - q_slice = q_slice.view(q_s, heads, embed) - q_slice = torch.permute(q_slice, (1, 0, 2)) - k_slice = k[layer_id][idx][:kv_s][:] - k_slice = k_slice.view(kv_s, kv_head, embed) - k_slice_t = torch.permute(k_slice, (1, 2, 0)) # get K^T - v_slice = v[layer_id][idx][:kv_s][:] - v_slice = v_slice.view(kv_s, kv_head, embed_v) - v_slice = torch.permute(v_slice, (1, 0, 2)) - - score = self.group_mm_torch(heads, kv_head, q_slice, k_slice_t) - - if s is None: - s = score.view([-1, ]) - else: - s = torch.cat((s, score.view([-1, ])), 0) - - scale = 1 - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - if not self.is_multi_layer: - # 当前scale和tor保持一致,模型侧可能传入scale = np.float32(layer_id + 1) - scale = np.float32(layer_id + 1) - score = score * tor - - if self.is_clamp == 1: - clamp_min_brc = np.ones((score.shape)) * self.clamp_min - clamp_max_brc = np.ones((score.shape)) * self.clamp_max - score = np.float16(np.maximum(score, clamp_min_brc)) - score = torch.from_numpy(np.float16(np.minimum(score, clamp_max_brc))) - if len(in_tensors) == 6: - attention_mask = np.ones(shape=(1, kv_s)).astype(np.float16) * -10000.0 # 使用当前最大seqlen生成mask - # attention_mask[:, :self.window_size] = 0 - if self.cacheType == 0: - attention_mask[:, kv_s - self.window_size: kv_s] = 0 - else: - attention_mask[:, :self.window_size] = 0 - attention_mask = torch.from_numpy(attention_mask) - else: - attention_mask = in_tensors[3].cpu() - if attention_mask.shape[0] == 512 and attention_mask.shape[1] == 512: - mask = np.ones(shape=(kv_s, kv_s)).astype(np.float16) # 使用当前最大seqlen生成mask - mask_u = np.triu(mask, 1) - mask_l = np.tril(mask, -self.window_size) - mask = mask_u + mask_l - if attention_mask.dtype == torch.float16: - mask *= -10000.0 - else: - mask *= -3e38 - attention_mask = torch.from_numpy(mask) - - else: - if attention_mask.dtype == torch.bfloat16: - attention_mask *= -3e38 - - score = score + attention_mask[:q_s, :kv_s] - score = score.numpy().astype(np.float32) - score_max = np.max(score, axis=-1) - score = score - score_max.reshape((heads, q_s, 1)) - score_exp = np.exp(score) - score_sum = np.sum(score_exp, axis=-1) - - if _p is None: - _p = score_exp.astype(np.float32).reshape([-1, ]) - else: - _p = np.concatenate( - (_p, score_exp.astype(np.float32).reshape([-1, ])), 0) - - p = (score_exp / score_sum.reshape((heads, q_s, 1))) - p = torch.from_numpy(p).to(torch.bfloat16) - o = self.group_mm_torch(heads, kv_head, p, v_slice) - o = o.view(heads, q_s, embed_v) - o = torch.permute(o, (1, 0, 2)).contiguous() - if out is None: - out = o - else: - out = torch.cat((out, o), 0) - - q_offset += q_s - k_offset += max_seq - v_offset += max_seq - - # golden data - out = out.view(q_ntokens, heads * embed_v) - self.golden_out = out.to(self.data_type) - return [self.golden_out] - - def golden_compare(self, out_tensor, golden_out_tensor): - # print("out_tensor", out_tensor.cpu()) - # print("golden_out_tensor", golden_out_tensor.cpu()) - return self.compare_output_data(out_tensor.cpu(), golden_out_tensor, [0.001, 0.001, 0.003, 0.003, 0.005, 0.005]) - #return torch.allclose(out_tensor.cpu(), golden_out_tensor, rtol=0.001, atol=0.001) - - def test_swa_decoder_cache(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - mask_type = MASK_TYPE_NO_HEAD_DECODER - self.data_type = torch.bfloat16 - data_type = self.data_type - self.batch = 8 - batch = self.batch - self.kv_head = 32 # kv_head num - kv_head = self.kv_head - self.is_decoder = 1 # prefill or decoder - self.heads = 32 # llama7b hidden_size 4096 - self.embeddim = 128 - self.embeddim_v = self.embeddim - self.max_seq = 1024 - tor = 1 - self.dynamic_batch = False - kv_seqLen = [32, 1024] * 4 - qSeqLen = [1] * batch - self.window_size = 64 - self.is_clamp = 0 - self.clamp_min = 0 - self.cacheType = 1 - self.clamp_max = 0 - self.is_triu_mask = False - self.long_seq = False - self.is_alibi = False - self.q_seqlen, self.q_ntokens = self.gen_seq_len(batch, [1] * batch) - self.kv_seqlen, self.kv_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.layer_id = torch.from_numpy(np.array([0], dtype=np.int32)).to(torch.int32) - self.q_max_seq = np.max(self.q_seqlen) - self.kv_max_seq = np.max(self.kv_seqlen) - q = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.q_ntokens, self.heads * self.embeddim))) - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - #self.q = (q * tor).to(data_type) - self.q = q.to(data_type) - self.k = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim))).to(data_type) - self.v = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim_v))).to(data_type) - self.gen_mask(self.batch, self.heads, data_type, mask_type) - - self.q_scale = 1 - self.qk_scale = tor - param = json.dumps({"headNum": self.heads, "qScale": float(self.q_scale), "qkScale": float(self.qk_scale), "maskType": 7, "kvcacheCfg":1,"calcType":2, - "windowSize":self.window_size, "cacheType":1}) - self.param_seqlen = self.q_seqlen - self.param_token_offset = self.kv_seqlen - run_param = json.dumps({"tokenOffset": self.param_token_offset, "seqLen": self.param_seqlen, "maskType": 7}) - #pdb.set_trace() - self.execute_with_param(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),torch.tensor(self.kv_seqlen).to(torch.int32).npu(), torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu()]) - def gen_swa_mask(self, max_seq, window_size, pre_mask_coff, cache_type=0): - swa_mask = np.ones(shape=(max_seq, max_seq)) * pre_mask_coff - - if window_size < max_seq or self.is_compress: - triu_mask = np.triu(swa_mask, 1) - tril_mask = np.tril(swa_mask, -window_size) - swa_mask = triu_mask + tril_mask - else: - swa_mask = np.triu(swa_mask, 1) - - return swa_mask - def gen_swa_cmp(self, window_size, embeddim, pre_mask_coff): - swa_mask = np.ones(shape=(1, 512, 512)) * pre_mask_coff - pp_n = 128 if embeddim <= 128 else 64 - # pp_n = 128 - if window_size <= pp_n * 3: - true_size = window_size - else: - if window_size % pp_n == 0: - true_size = pp_n * 3 - else: - true_size = pp_n * 2 + window_size % pp_n - triu_mask = np.triu(swa_mask, 1) - tril_mask = np.tril(swa_mask, -true_size) - swa_mask = triu_mask + tril_mask - swa_mask = torch.from_numpy(swa_mask).to(torch.float16) - swa_mask = swa_mask.reshape(512,512) - return swa_mask - def test_swa_encoder_cache(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - mask_type = MASK_TYPE_NO_HEAD_DECODER - self.data_type = torch.float16 - data_type = self.data_type - self.batch = 8 - batch = self.batch - self.kv_head = 32 # kv_head num - kv_head = self.kv_head - self.is_decoder = 1 # prefill or decoder - self.heads = 32 # llama7b hidden_size 4096 - self.embeddim = 128 - self.embeddim_v = self.embeddim - self.max_seq = 1024 - tor = 1 - self.dynamic_batch = False - kv_seqLen = [32, 1024] * 4 - qSeqLen = kv_seqLen - self.window_size = 16 - self.is_clamp = 0 - self.clamp_min = 0 - self.cacheType = 1 - self.clamp_max = 0 - self.is_triu_mask = False - self.long_seq = False - self.is_alibi = False - self.q_seqlen, self.q_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.kv_seqlen, self.kv_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.layer_id = torch.from_numpy(np.array([0], dtype=np.int32)).to(torch.int32) - self.q_max_seq = np.max(self.q_seqlen) - self.kv_max_seq = np.max(self.kv_seqlen) - q = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.q_ntokens, self.heads * self.embeddim))) - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - #self.q = (q * tor).to(data_type) - self.q = q.to(data_type) - self.k = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim))).to(data_type) - self.v = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim_v))).to(data_type) - mask = np.ones(shape=(self.q_max_seq, self.kv_max_seq)).astype(np.float16) # 使用当前最大seqlen生成mask - mask_u = np.triu(mask, 1) - mask_l = np.tril(mask, -self.window_size) - mask = mask_u + mask_l - # mask *= -3e38 - - - mask *= -10000.0 - # mask = self.gen_swa_mask(self.kv_max_seq, self.window_size, -10000.0, self.cacheType) - # print(torch.from_numpy(mask).to(data_type)) - attention_mask = torch.from_numpy(mask).to(data_type).npu() - - - self.q_scale = 1 - self.qk_scale = tor - param = json.dumps({"headNum": self.heads, "qScale": float(self.q_scale), "qkScale": float(self.qk_scale), "maskType": 7, "kvcacheCfg":1,"calcType":1, - "windowSize":self.window_size, "cacheType":self.cacheType}) - self.param_seqlen = self.q_seqlen - self.param_token_offset = self.kv_seqlen - run_param = json.dumps({"tokenOffset": self.param_token_offset, "seqLen": self.param_seqlen, "maskType": 7}) - #pdb.set_trace() - self.execute_with_param(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),attention_mask,torch.tensor(self.kv_seqlen).to(torch.int32).npu(), torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu()]) - - def test_swa_encoder(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - mask_type = MASK_TYPE_NO_HEAD_DECODER - self.data_type = torch.bfloat16 - data_type = self.data_type - self.batch = 8 - batch = self.batch - self.kv_head = 32 # kv_head num - kv_head = self.kv_head - self.is_decoder = 1 # prefill or decoder - self.heads = 32 # llama7b hidden_size 4096 - self.embeddim = 128 - self.embeddim_v = self.embeddim - self.max_seq = 1024 - tor = 1 - self.dynamic_batch = False - kv_seqLen = [32, 256] * 4 - qSeqLen = kv_seqLen - self.window_size = 16 - self.is_clamp = 0 - self.clamp_min = 0 - self.cacheType = 0 - self.clamp_max = 0 - self.is_triu_mask = False - self.long_seq = False - self.is_alibi = False - self.q_seqlen, self.q_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.kv_seqlen, self.kv_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.layer_id = torch.from_numpy(np.array([0], dtype=np.int32)).to(torch.int32) - self.q_max_seq = np.max(self.q_seqlen) - self.kv_max_seq = np.max(self.kv_seqlen) - q = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.q_ntokens, self.heads * self.embeddim))) - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - #self.q = (q * tor).to(data_type) - self.q = q.to(data_type) - self.k = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim))).to(data_type) - self.v = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim_v))).to(data_type) - # mask = np.ones(shape=(self.q_max_seq, self.kv_max_seq)).astype(np.float16) # 使用当前最大seqlen生成mask - # mask_u = np.triu(mask, 1) - # mask_l = np.tril(mask, -self.window_size) - # mask = mask_u + mask_l - # mask *= -3e38 - - - mask = self.gen_swa_mask(self.kv_max_seq, self.window_size, 1, self.cacheType) - # print(torch.from_numpy(mask).to(data_type)) - attention_mask = torch.from_numpy(mask).to(data_type).npu() - - self.q_scale = 1 - self.qk_scale = tor - param = json.dumps({"headNum": self.heads, "qScale": float(self.q_scale), "qkScale": float(self.qk_scale), "maskType": 7, "kvcacheCfg":1,"calcType":1, - "windowSize":self.window_size, "cacheType":self.cacheType}) - self.param_seqlen = self.q_seqlen - self.param_token_offset = self.kv_seqlen - run_param = json.dumps({"tokenOffset": self.param_token_offset, "seqLen": self.param_seqlen, "maskType": 7}) - #pdb.set_trace() - self.execute_with_param(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),attention_mask,torch.tensor(self.kv_seqlen).to(torch.int32).npu(), torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu()]) - - def test_swa_encoder_compress_mask(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - mask_type = MASK_TYPE_NO_HEAD_DECODER - self.data_type = torch.bfloat16 - data_type = self.data_type - self.batch = 8 - batch = self.batch - self.kv_head = 32 # kv_head num - kv_head = self.kv_head - self.is_decoder = 1 # prefill or decoder - self.heads = 32 # llama7b hidden_size 4096 - self.embeddim = 128 - self.embeddim_v = self.embeddim - self.max_seq = 1024 - tor = 1 - self.dynamic_batch = False - kv_seqLen = [32, 256] * 4 - qSeqLen = kv_seqLen - self.window_size = 16 - self.is_clamp = 0 - self.clamp_min = 0 - self.cacheType = 0 - self.clamp_max = 0 - self.is_triu_mask = False - self.long_seq = False - self.is_alibi = False - self.q_seqlen, self.q_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.kv_seqlen, self.kv_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.layer_id = torch.from_numpy(np.array([0], dtype=np.int32)).to(torch.int32) - self.q_max_seq = np.max(self.q_seqlen) - self.kv_max_seq = np.max(self.kv_seqlen) - q = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.q_ntokens, self.heads * self.embeddim))) - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - #self.q = (q * tor).to(data_type) - self.q = q.to(data_type) - self.k = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim))).to(data_type) - self.v = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim_v))).to(data_type) - # mask = np.ones(shape=(self.q_max_seq, self.kv_max_seq)).astype(np.float16) # 使用当前最大seqlen生成mask - # mask_u = np.triu(mask, 1) - # mask_l = np.tril(mask, -self.window_size) - # mask = mask_u + mask_l - # mask *= -10000.0 - # mask *= -3e38 - - pre_mask_coff = 1 - attention_mask = self.gen_swa_cmp(self.window_size, self.embeddim, pre_mask_coff).to(data_type).npu() - # print(attention_mask) - - - self.q_scale = 1 - self.qk_scale = tor - param = json.dumps({"headNum": self.heads, "qScale": float(self.q_scale), "qkScale": float(self.qk_scale), "maskType": 8, "kvcacheCfg":1,"calcType":1, - "windowSize":self.window_size, "cacheType":self.cacheType}) - self.param_seqlen = self.q_seqlen - self.param_token_offset = self.kv_seqlen - run_param = json.dumps({"tokenOffset": self.param_token_offset, "seqLen": self.param_seqlen, "maskType": 7}) - #pdb.set_trace() - self.execute_with_param(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),attention_mask,torch.tensor(self.kv_seqlen).to(torch.int32).npu(), torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu()]) - - def test_swa_encoder_compress_mask_cache(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - mask_type = MASK_TYPE_NO_HEAD_DECODER - self.data_type = torch.float16 - data_type = self.data_type - self.batch = 8 - batch = self.batch - self.kv_head = 32 # kv_head num - kv_head = self.kv_head - self.is_decoder = 1 # prefill or decoder - self.heads = 32 # llama7b hidden_size 4096 - self.embeddim = 128 - self.embeddim_v = self.embeddim - self.max_seq = 1024 - tor = 1 - self.dynamic_batch = False - kv_seqLen = [32, 1024] * 4 - qSeqLen = kv_seqLen - self.window_size = 16 - self.is_clamp = 0 - self.clamp_min = 0 - self.cacheType = 1 - self.clamp_max = 0 - self.is_triu_mask = False - self.long_seq = False - self.is_alibi = False - self.q_seqlen, self.q_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.kv_seqlen, self.kv_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.layer_id = torch.from_numpy(np.array([0], dtype=np.int32)).to(torch.int32) - self.q_max_seq = np.max(self.q_seqlen) - self.kv_max_seq = np.max(self.kv_seqlen) - q = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.q_ntokens, self.heads * self.embeddim))) - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - #self.q = (q * tor).to(data_type) - self.q = q.to(data_type) - self.k = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim))).to(data_type) - self.v = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim_v))).to(data_type) - # mask = np.ones(shape=(self.q_max_seq, self.kv_max_seq)).astype(np.float16) # 使用当前最大seqlen生成mask - # mask_u = np.triu(mask, 1) - # mask_l = np.tril(mask, -self.window_size) - # mask = mask_u + mask_l - # mask *= -10000.0 - # mask *= -3e38 - - pre_mask_coff = -10000.0 - attention_mask = self.gen_swa_cmp(self.window_size, self.embeddim, pre_mask_coff).to(data_type).npu() - # print(attention_mask) - - - self.q_scale = 1 - self.qk_scale = tor - param = json.dumps({"headNum": self.heads, "qScale": float(self.q_scale), "qkScale": float(self.qk_scale), "maskType": 8, "kvcacheCfg":1,"calcType":1, - "windowSize":self.window_size, "cacheType":self.cacheType}) - self.param_seqlen = self.q_seqlen - self.param_token_offset = self.kv_seqlen - run_param = json.dumps({"tokenOffset": self.param_token_offset, "seqLen": self.param_seqlen, "maskType": 7}) - #pdb.set_trace() - self.execute_with_param(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),attention_mask,torch.tensor(self.kv_seqlen).to(torch.int32).npu(), torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu()]) - -if __name__ == '__main__': - unittest.main() diff --git a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_operation_logn.py b/tests/apitest/opstest/python/operations/self_attention/test_self_attention_operation_logn.py deleted file mode 100644 index ec4d19c12b2ed3dbc0ca3d579f52f1fea451dcc1..0000000000000000000000000000000000000000 --- a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_operation_logn.py +++ /dev/null @@ -1,345 +0,0 @@ -# -# Copyright (c) 2024 Huawei Technologies Co., Ltd. -# This file is a part of the CANN Open Software. -# Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). -# Please refer to the License for details. You may not use this file except in compliance with the License. -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, -# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. -# See LICENSE in the root of the software repository for the full text of the License. -# -import sys -import os -import unittest -import json -import math -import torch -import torch_npu -import numpy as np -import logging -sys.path.append(os.path.join(os.path.dirname(__file__), "../")) -import operation_test # NOQA: E402 -import pdb - -OP_NAME = "SelfAttentionOperation" - -MASK_TYPE_NO_HEAD_DECODER = 5 -class TestUnpadSelfAttentionOperation(operation_test.OperationTest): - def test(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - mask_type = MASK_TYPE_NO_HEAD_DECODER - self.data_type = torch.float16 - data_type = self.data_type - self.batch = 8 - batch = self.batch - self.kv_head = 32 # kv_head num - kv_head = self.kv_head - self.is_decoder = 1 # prefill or decoder - self.heads = 32 # llama7b hidden_size 4096 - self.embeddim = 128 - self.embeddim_v = 128 # np.random.randint(1,self.embeddim) - self.max_seq = 256 - tor = 1 - self.dynamic_batch = False - kv_seqLen = [114] * batch - qSeqLen = [1] * batch - self.is_clamp = 0 - self.clamp_min = 0 - self.clamp_max = 0 - self.is_triu_mask = False - self.long_seq = False - self.is_alibi = False - self.q_seqlen, self.q_ntokens = self.gen_seq_len(batch, [1] * batch) - self.kv_seqlen, self.kv_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.layer_id = torch.from_numpy(np.array([0], dtype=np.int32)).to(torch.int32) - self.q_max_seq = np.max(self.q_seqlen) - self.kv_max_seq = np.max(self.kv_seqlen) - q = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.q_ntokens, self.heads * self.embeddim))) - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - #self.q = (q * tor).to(data_type) - self.q = q.to(data_type) - self.k = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim))).to(data_type) - self.v = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim_v))).to(data_type) - self.gen_mask(self.batch, self.heads, data_type, mask_type) - # logn功能开关 - self.scaleType = 1 - if self.scaleType: - m = 8192 # seq_length in qwen config - base = int(2 * m - batch / 2) # 3 situations: base < m ; base > m ; base < m < base + batch - self.logn_arr = np.array([ - 1.2 if i > m else 1 - for i in range(base, base + batch) - ]).astype(np.float32) - else: - self.logn_arr=[] - logn = torch.tensor(self.logn_arr, dtype=torch.float32).npu() - - self.q_scale = 1 - self.qk_scale = tor - param = json.dumps({"headNum": self.heads, "qScale": float(self.q_scale), "qkScale": float(self.qk_scale), - "maskType": 1, "kvcacheCfg":1, "calcType":2, "kernelType":self.scaleType, "scaleType":self.scaleType}) - self.param_seqlen = self.q_seqlen - self.param_token_offset = self.kv_seqlen - run_param = json.dumps({"tokenOffset": self.param_token_offset, "seqLen": self.param_seqlen, "scaleType":self.scaleType}) - #pdb.set_trace() - if self.scaleType: - self.execute_with_param(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),self.mask.to(data_type).npu(),torch.tensor(self.kv_seqlen).to(torch.int32).npu(), - torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu(), logn]) - else: - self.execute_with_param(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),self.mask.to(data_type).npu(),torch.tensor(self.kv_seqlen).to(torch.int32).npu(), - torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu()]) - - def gen_seq_len(self, batch, seq_len): - ntokens = sum(seq_len) - return seq_len, ntokens - - def gen_mask(self, batch, heads, data_type, mask_type): - import random - q_max_seq = self.max_seq - kv_max_seq = self.max_seq - mask_type_dict = { - # 三维的alibi mask - #MASK_TYPE_NO_HEAD : ((batch, q_max_seq, kv_max_seq), (lambda mask, idx, q_s, kv_s: (mask[idx, :q_s, :kv_s]))), - MASK_TYPE_NO_HEAD_DECODER : ((batch, 1, kv_max_seq), (lambda mask, idx, q_s, kv_s: (mask[idx, :q_s, :kv_s]))), - - } - # kernel中mask的系数 - if data_type == torch.float16: - post_mask_coff = 1 - pre_mask_coff = -10000.0 - elif data_type == torch.bfloat16 and self.is_alibi: - post_mask_coff = 1 - pre_mask_coff = -float("inf") - elif data_type == torch.float32 and self.is_alibi: - post_mask_coff = 1 - pre_mask_coff = 1 - else: - post_mask_coff = -3e38 - pre_mask_coff = 1 - if data_type == torch.float16: - if self.is_alibi or self.long_seq: - select_zero = False - else: - select_zero = True - elif data_type == torch.bfloat16: - if self.is_alibi: - select_zero = False - elif self.dynamic_batch or self.is_decoder: - select_zero = True - else: - select_zero = False - else: - if self.is_alibi or self.is_decoder: - select_zero = True - else: - select_zero = False - if self.is_triu_mask: - select_zero = False - - self.mask_info = mask_type_dict[mask_type] - print("-------------------",self.mask_info[0]) - mask = np.ones(shape=self.mask_info[0]) * pre_mask_coff - mask = np.triu(mask, 1) - zero_indice = random.choices(range(self.max_seq), k = 300) - if self.is_alibi: - self.alibi_bias = self.get_alibi_bias(heads, self.max_seq) - mask += self.alibi_bias.numpy() - if select_zero: - mask.flat[zero_indice] = 0 - self.mask = torch.from_numpy(mask).to(torch.float32) - #self.mask[0]=self.mask[1] - #self.mask = torch.zeros(self.mask.shape) - self.post_mask_coff = post_mask_coff - self.pre_mask_coff = pre_mask_coff - - def group_mm_torch(self, heads, group_num, A, B): - group_head = heads // group_num - score = None - for i in range(group_num): - group_score = torch.matmul(A[i * group_head: (i + 1) * group_head, :, :].to(torch.float32), B[i:(i + 1), :, :].to(torch.float32)) - if score is None: - score = group_score - else: - score = torch.cat((score, group_score), 0) - return score - - def compare_output_data(self, out, golden, ratios): - error_count = 0 - strict_error_count = 0 - alibi_error_count = 0 - fp16_min_normal = 1.0 / (1 << 14) - len = out.shape[0] * out.shape[1] - diff = torch.abs(golden - out) - max_diff = diff.max().item() - logging.info(f"maxDiff {max_diff}") - if self.is_alibi: - alibi_limit_error = torch.maximum(torch.abs(golden * ratios[4]), torch.tensor(ratios[5])) - alibi_error_count = torch.gt(diff, alibi_limit_error).sum().item() - logging.info("5/1000 Accuracy is %f", 1 - float(alibi_error_count) / len) - return (float(alibi_error_count) / len) <= ratios[4] - else: - limit_error = torch.maximum(torch.abs(golden * ratios[0]), torch.tensor(ratios[1])) - strict_limit_error = torch.maximum(torch.abs(golden * ratios[2]), torch.tensor(ratios[3])) - error_count = torch.gt(diff, limit_error).sum().item() - strict_error_count = torch.gt(diff, strict_limit_error).sum().item() - logging.info("1/1000 Accuracy is %f", 1 - float(error_count) / len) - logging.info("3/1000 Accuracy is %f", 1 - float(strict_error_count) / len) - if self.data_type == torch.bfloat16 or not self.is_decoder: - return (float(strict_error_count) / len) <= ratios[2] - else: - return (float(error_count) / len) <= ratios[0] - - def golden_calc(self, in_tensors): - q_offset = 0 - k_offset = 0 - v_offset = 0 - isdecoder = 1 - batch = self.batch - heads = self.heads - embed = self.embeddim - embed_v = self.embeddim_v - max_seq = self.max_seq - q_seqlen = self.q_seqlen - kv_seqlen = self.kv_seqlen - kv_head = self.kv_head - mask = self.mask - is_mask = True - q = self.q - k = self.k - v = self.v - q_ntokens = self.q_ntokens - kv_ntokens = self.kv_ntokens - layer_id = self.layer_id[0] - self.is_multi_layer = True - s = None - _p = None - out = None - - for idx in range(batch): - q_s = q_seqlen[idx] - kv_s = kv_seqlen[idx] - q_slice = q[q_offset:q_offset + q_s][:] - q_slice = q_slice.view(q_s, heads, embed) - q_slice = torch.permute(q_slice, (1, 0, 2)) - k_slice = k[layer_id][idx][:kv_s][:] - k_slice = k_slice.view(kv_s, kv_head, embed) - k_slice_t = torch.permute(k_slice, (1, 2, 0)) # get K^T - v_slice = v[layer_id][idx][:kv_s][:] - v_slice = v_slice.view(kv_s, kv_head, embed_v) - v_slice = torch.permute(v_slice, (1, 0, 2)) - - score = self.group_mm_torch(heads, kv_head, q_slice, k_slice_t) - - if s is None: - s = score.view([-1, ]) - else: - s = torch.cat((s, score.view([-1, ])), 0) - - scale = 1 - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - if not self.is_multi_layer: - # 当前scale和tor保持一致,模型侧可能传入scale = np.float32(layer_id + 1) - scale = np.float32(layer_id + 1) - score = score * tor - # logn缩放计算 - if self.scaleType: - score = score * self.logn_arr[idx] - - if self.is_clamp == 1: - clamp_min_brc = np.ones((score.shape)) * self.clamp_min - clamp_max_brc = np.ones((score.shape)) * self.clamp_max - score = np.float16(np.maximum(score, clamp_min_brc)) - score = torch.from_numpy(np.float16(np.minimum(score, clamp_max_brc))) - if is_mask: - score = score + self.mask_info[1](self.mask, idx, q_s, kv_s)*self.post_mask_coff - #score = score + self.mask[idx, :q_s, :kv_s] - score = score.numpy().astype(np.float32) - score_max = np.max(score, axis=-1) - score = score - score_max.reshape((heads, q_s, 1)) - score_exp = np.exp(score) - score_sum = np.sum(score_exp, axis=-1) - - if _p is None: - _p = score_exp.astype(np.float32).reshape([-1, ]) - else: - _p = np.concatenate( - (_p, score_exp.astype(np.float32).reshape([-1, ])), 0) - - p = (score_exp / score_sum.reshape((heads, q_s, 1))) - p = torch.from_numpy(p).to(torch.bfloat16) - o = self.group_mm_torch(heads, kv_head, p, v_slice) - o = o.view(heads, q_s, embed_v) - o = torch.permute(o, (1, 0, 2)).contiguous() - if out is None: - out = o - else: - out = torch.cat((out, o), 0) - - q_offset += q_s - k_offset += max_seq - v_offset += max_seq - - # golden data - out = out.view(q_ntokens, heads * embed_v) - self.golden_out = out.to(self.data_type) - return [self.golden_out] - - def golden_compare(self, out_tensor, golden_out_tensor): - return self.compare_output_data(out_tensor.cpu(), golden_out_tensor, [0.001, 0.001, 0.003, 0.003, 0.005, 0.005]) - #return torch.allclose(out_tensor.cpu(), golden_out_tensor, rtol=0.001, atol=0.001) - - def testbf16(self): - return - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - mask_type = MASK_TYPE_NO_HEAD_DECODER - self.data_type = torch.bfloat16 - data_type = self.data_type - self.batch = 8 - batch = self.batch - self.kv_head = 32 # kv_head num - kv_head = self.kv_head - self.is_decoder = 1 # prefill or decoder - self.heads = 32 # llama7b hidden_size 4096 - self.embeddim = 128 - self.embeddim_v = np.random.randint(1,self.embeddim) - self.max_seq = 1024 - tor = 1 - self.dynamic_batch = False - kv_seqLen = [32, 1024] * 4 - qSeqLen = [1] * batch - self.is_clamp = 0 - self.clamp_min = 0 - self.clamp_max = 0 - self.is_triu_mask = False - self.long_seq = False - self.is_alibi = False - self.q_seqlen, self.q_ntokens = self.gen_seq_len(batch, [1] * batch) - self.kv_seqlen, self.kv_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.layer_id = torch.from_numpy(np.array([0], dtype=np.int32)).to(torch.int32) - self.q_max_seq = np.max(self.q_seqlen) - self.kv_max_seq = np.max(self.kv_seqlen) - q = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.q_ntokens, self.heads * self.embeddim))) - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - #self.q = (q * tor).to(data_type) - self.q = q.to(data_type) - self.k = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim))).to(data_type) - self.v = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.layer_id[0] + 1, self.batch, self.max_seq, kv_head * self.embeddim_v))).to(data_type) - self.gen_mask(self.batch, self.heads, data_type, mask_type) - - self.q_scale = 1 - self.qk_scale = tor - param = json.dumps({"headNum": self.heads, "qScale": float(self.q_scale), "qkScale": float(self.qk_scale), "maskType": 1, "kvcacheCfg":1,"calcType":2}) - self.param_seqlen = self.q_seqlen - self.param_token_offset = self.kv_seqlen - run_param = json.dumps({"tokenOffset": self.param_token_offset, "seqLen": self.param_seqlen}) - #pdb.set_trace() - self.execute_with_param(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),self.mask.to(data_type).npu(),torch.tensor(self.kv_seqlen).to(torch.int32).npu(), torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu()]) - -if __name__ == '__main__': - unittest.main() diff --git a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_operation_split_kvcache.py b/tests/apitest/opstest/python/operations/self_attention/test_self_attention_operation_split_kvcache.py deleted file mode 100644 index 4c26c1f8a74eea8a3059898243b254cf81b8c963..0000000000000000000000000000000000000000 --- a/tests/apitest/opstest/python/operations/self_attention/test_self_attention_operation_split_kvcache.py +++ /dev/null @@ -1,362 +0,0 @@ -# -# Copyright (c) 2024 Huawei Technologies Co., Ltd. -# This file is a part of the CANN Open Software. -# Licensed under CANN Open Software License Agreement Version 1.0 (the "License"). -# Please refer to the License for details. You may not use this file except in compliance with the License. -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, -# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. -# See LICENSE in the root of the software repository for the full text of the License. -# -import sys -import os -import unittest -import json -import math -import torch -import torch_npu -import numpy as np -sys.path.append(os.path.join(os.path.dirname(__file__), "../")) -import operation_test # NOQA: E402 -import pdb -import logging -import self_attention.golden_compare_cv as golden_compare_cv - -OP_NAME = "SelfAttentionOperation" - -MASK_TYPE_NO_HEAD_DECODER = 5 -class TestUnpadSelfAttentionOperation(operation_test.OperationTest): - def test_success_float16(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - mask_type = MASK_TYPE_NO_HEAD_DECODER - self.data_type = torch.float16 - data_type = self.data_type - self.batch = 22 - batch = self.batch - self.kv_head = 44 # kv_head num - kv_head = self.kv_head - self.is_decoder = 1 # prefill or decoder - self.heads = 44 # llama7b hidden_size 4096 - self.embeddim = 256 - self.embeddim_v = 16 * np.random.randint(8,16) - self.max_seq = 256 - tor = 1 - self.dynamic_batch = False - kv_seqLen = [114] * batch - qSeqLen = [1] * batch - self.is_clamp = 0 - self.clamp_min = 0 - self.clamp_max = 0 - self.is_triu_mask = False - self.long_seq = False - self.is_alibi = False - self.q_seqlen, self.q_ntokens = self.gen_seq_len(batch, [1] * batch) - self.kv_seqlen, self.kv_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.layer_id = torch.from_numpy(np.array([0], dtype=np.int32)).to(torch.int32) - self.q_max_seq = np.max(self.q_seqlen) - self.kv_max_seq = np.max(self.kv_seqlen) - q = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.q_ntokens, self.heads * self.embeddim))) - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - #self.q = (q * tor).to(data_type) - self.q = q.to(data_type) - self.k_list = [] - self.v_list = [] - for i in range(self.batch): - self.k_list.append(torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(1, 1, self.max_seq, kv_head * self.embeddim))).to(data_type).npu()) - self.v_list.append(torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(1, 1, self.max_seq, kv_head * self.embeddim_v))).to(data_type).npu()) - - self.k = torch.cat(self.k_list, 1).cpu() - self.v = torch.cat(self.v_list, 1).cpu() - - for i in range(self.batch): - self.k_list[i] = self.k_list[i].squeeze().npu() - self.v_list[i] = self.v_list[i].squeeze().npu() - - self.gen_mask(self.batch, self.heads, data_type, mask_type) - - self.q_scale = 1 - self.qk_scale = tor - param = json.dumps({"headNum": self.heads, "qScale": float(self.q_scale), "qkScale": float(self.qk_scale), "maskType": 1, "kvcacheCfg":1, "calcType":2}) - self.param_seqlen = self.q_seqlen - self.param_token_offset = self.kv_seqlen - run_param = json.dumps({"tokenOffset": self.param_token_offset, "seqLen": self.param_seqlen, "byPass": "true"}) - #pdb.set_trace() - self.execute_with_param_and_tensor_list(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),self.mask.to(data_type).npu(),torch.tensor(self.kv_seqlen).to(torch.int32).npu(), torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu()], - [self.k_list, self.v_list], ["kCache", "vCache"]) - - def test_success_bfloat16(self): - if not operation_test.get_soc_version() == 'Ascend910B': - print("this testcase only supports Ascend910B") - return - mask_type = MASK_TYPE_NO_HEAD_DECODER - self.data_type = torch.bfloat16 - data_type = self.data_type - self.batch = 22 - batch = self.batch - self.kv_head = 44 # kv_head num - kv_head = self.kv_head - self.is_decoder = 1 # prefill or decoder - self.heads = 44 # llama7b hidden_size 4096 - self.embeddim = 256 - self.embeddim_v = 16 * np.random.randint(8,16) - self.max_seq = 256 - tor = 1 - self.dynamic_batch = False - kv_seqLen = [114] * batch - qSeqLen = [1] * batch - self.is_clamp = 0 - self.clamp_min = 0 - self.clamp_max = 0 - self.is_triu_mask = False - self.long_seq = False - self.is_alibi = False - self.q_seqlen, self.q_ntokens = self.gen_seq_len(batch, [1] * batch) - self.kv_seqlen, self.kv_ntokens = self.gen_seq_len(batch, kv_seqLen) - self.layer_id = torch.from_numpy(np.array([0], dtype=np.int32)).to(torch.int32) - self.q_max_seq = np.max(self.q_seqlen) - self.kv_max_seq = np.max(self.kv_seqlen) - q = torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(self.q_ntokens, self.heads * self.embeddim))) - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - #self.q = (q * tor).to(data_type) - self.q = q.to(data_type) - self.k_list = [] - self.v_list = [] - for i in range(self.batch): - self.k_list.append(torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(1, 1, self.max_seq, kv_head * self.embeddim))).to(data_type).npu()) - self.v_list.append(torch.from_numpy(np.random.uniform(-1.0, 1.0, size=(1, 1, self.max_seq, kv_head * self.embeddim_v))).to(data_type).npu()) - - self.k = torch.cat(self.k_list, 1).cpu() - self.v = torch.cat(self.v_list, 1).cpu() - - for i in range(self.batch): - self.k_list[i] = self.k_list[i].squeeze().npu() - self.v_list[i] = self.v_list[i].squeeze().npu() - - self.gen_mask(self.batch, self.heads, data_type, mask_type) - - self.q_scale = 1 - self.qk_scale = tor - param = json.dumps({"headNum": self.heads, "qScale": float(self.q_scale), "qkScale": float(self.qk_scale), "maskType": 1, "kvcacheCfg":1, "calcType":2}) - self.param_seqlen = self.q_seqlen - self.param_token_offset = self.kv_seqlen - run_param = json.dumps({"tokenOffset": self.param_token_offset, "seqLen": self.param_seqlen, "byPass": "true"}) - #pdb.set_trace() - self.execute_with_param_and_tensor_list(OP_NAME, param, run_param, - [self.q.npu(), self.k.npu(), self.v.npu(),self.mask.to(data_type).npu(),torch.tensor(self.kv_seqlen).to(torch.int32).npu(), torch.tensor(self.q_seqlen).to(torch.int32).npu(), self.layer_id.npu()], - [self.k_list, self.v_list], ["kCache", "vCache"]) - - def gen_seq_len(self, batch, seq_len): - ntokens = sum(seq_len) - return seq_len, ntokens - - def gen_mask(self, batch, heads, data_type, mask_type): - import random - q_max_seq = self.max_seq - kv_max_seq = self.max_seq - mask_type_dict = { - # 三维的alibi mask - #MASK_TYPE_NO_HEAD : ((batch, q_max_seq, kv_max_seq), (lambda mask, idx, q_s, kv_s: (mask[idx, :q_s, :kv_s]))), - MASK_TYPE_NO_HEAD_DECODER : ((batch, 1, kv_max_seq), (lambda mask, idx, q_s, kv_s: (mask[idx, :q_s, :kv_s]))), - - } - # kernel中mask的系数 - if data_type == torch.float16: - post_mask_coff = 1 - pre_mask_coff = -10000.0 - elif data_type == torch.bfloat16 and self.is_alibi: - post_mask_coff = 1 - pre_mask_coff = -float("inf") - elif data_type == torch.float32 and self.is_alibi: - post_mask_coff = 1 - pre_mask_coff = 1 - else: - post_mask_coff = -3e38 - pre_mask_coff = 1 - if data_type == torch.float16: - if self.is_alibi or self.long_seq: - select_zero = False - else: - select_zero = True - elif data_type == torch.bfloat16: - if self.is_alibi: - select_zero = False - elif self.dynamic_batch or self.is_decoder: - select_zero = True - else: - select_zero = False - else: - if self.is_alibi or self.is_decoder: - select_zero = True - else: - select_zero = False - if self.is_triu_mask: - select_zero = False - - self.mask_info = mask_type_dict[mask_type] - mask = np.ones(shape=self.mask_info[0]) * pre_mask_coff - mask = np.triu(mask, 1) - zero_indice = random.choices(range(self.max_seq), k = 300) - if self.is_alibi: - self.alibi_bias = self.get_alibi_bias(heads, self.max_seq) - mask += self.alibi_bias.numpy() - if select_zero: - mask.flat[zero_indice] = 0 - self.mask = torch.from_numpy(mask).to(torch.float32) - #self.mask[0]=self.mask[1] - self.post_mask_coff = post_mask_coff - self.pre_mask_coff = pre_mask_coff - - def group_mm_torch(self, heads, group_num, A, B): - group_head = heads // group_num - score = None - for i in range(group_num): - group_score = torch.matmul(A[i * group_head: (i + 1) * group_head, :, :].to(torch.float32), B[i:(i + 1), :, :].to(torch.float32)) - if score is None: - score = group_score - else: - score = torch.cat((score, group_score), 0) - return score - - def compare_output_data(self, out, golden, ratios): - error_count = 0 - strict_error_count = 0 - alibi_error_count = 0 - fp16_min_normal = 1.0 / (1 << 14) - len = out.shape[0] * out.shape[1] - diff = torch.abs(golden - out) - max_diff = diff.max().item() - logging.info(f"maxDiff {max_diff}") - if self.is_alibi: - alibi_limit_error = torch.maximum(torch.abs(golden * ratios[4]), torch.tensor(ratios[5])) - alibi_error_count = torch.gt(diff, alibi_limit_error).sum().item() - logging.info("5/1000 Accuracy is %f", 1 - float(alibi_error_count) / len) - return (float(alibi_error_count) / len) <= ratios[4] - else: - limit_error = torch.maximum(torch.abs(golden * ratios[0]), torch.tensor(ratios[1])) - strict_limit_error = torch.maximum(torch.abs(golden * ratios[2]), torch.tensor(ratios[3])) - error_count = torch.gt(diff, limit_error).sum().item() - strict_error_count = torch.gt(diff, strict_limit_error).sum().item() - logging.info("1/1000 Accuracy is %f", 1 - float(error_count) / len) - logging.info("3/1000 Accuracy is %f", 1 - float(strict_error_count) / len) - if self.data_type == torch.bfloat16 or not self.is_decoder: - return (float(strict_error_count) / len) <= ratios[2] - else: - return (float(error_count) / len) <= ratios[0] - - - - - def golden_calc(self, in_tensors): - q_offset = 0 - k_offset = 0 - v_offset = 0 - isdecoder = 1 - batch = self.batch - heads = self.heads - embed = self.embeddim - embed_v = self.embeddim_v - max_seq = self.max_seq - q_seqlen = self.q_seqlen - kv_seqlen = self.kv_seqlen - kv_head = self.kv_head - mask = self.mask - is_mask = True - q = self.q - k = self.k - v = self.v - q_ntokens = self.q_ntokens - kv_ntokens = self.kv_ntokens - layer_id = self.layer_id[0] - self.is_multi_layer = True - s = None - _p = None - out_low = None - out_high = None - - for idx in range(batch): - q_s = q_seqlen[idx] - kv_s = kv_seqlen[idx] - q_slice = q[q_offset:q_offset + q_s][:] - q_slice = q_slice.view(q_s, heads, embed) - q_slice = torch.permute(q_slice, (1, 0, 2)) - k_slice = k[layer_id][idx][:kv_s][:] - k_slice = k_slice.view(kv_s, kv_head, embed) - k_slice_t = torch.permute(k_slice, (1, 2, 0)) # get K^T - v_slice = v[layer_id][idx][:kv_s][:] - v_slice = v_slice.view(kv_s, kv_head, embed_v) - v_slice = torch.permute(v_slice, (1, 0, 2)) - - score = self.group_mm_torch(heads, kv_head, q_slice, k_slice_t) - - if s is None: - s = score.view([-1, ]) - else: - s = torch.cat((s, score.view([-1, ])), 0) - - scale = 1 - tor = np.float32(1.0 / math.sqrt(1.0 * self.embeddim)) - if not self.is_multi_layer: - # 当前scale和tor保持一致,模型侧可能传入scale = np.float32(layer_id + 1) - scale = np.float32(layer_id + 1) - score = score * tor - - if self.is_clamp == 1: - clamp_min_brc = np.ones((score.shape)) * self.clamp_min - clamp_max_brc = np.ones((score.shape)) * self.clamp_max - score = np.float16(np.maximum(score, clamp_min_brc)) - score = torch.from_numpy(np.float16(np.minimum(score, clamp_max_brc))) - if is_mask: - score = score + self.mask_info[1](self.mask, idx, q_s, kv_s) * self.post_mask_coff - #score = score + self.mask[idx, :q_s, :kv_s] - score = score.numpy().astype(np.float32) - score_max = np.max(score, axis=-1) - score = score - score_max.reshape((heads, q_s, 1)) - score_exp = np.exp(score) - score_sum = np.sum(score_exp, axis=-1) - - if _p is None: - _p = score_exp.astype(np.float32).reshape([-1, ]) - else: - _p = np.concatenate( - (_p, score_exp.astype(np.float32).reshape([-1, ])), 0) - - p_high = (score_exp / score_sum.reshape((heads, q_s, 1))) - p_high = torch.from_numpy(p_high) - p_low = p_high.to(torch.bfloat16) - - o_low = self.group_mm_torch(heads, kv_head, p_low, v_slice) - o_high = self.group_mm_torch(heads, kv_head, p_high, v_slice) - - o_low = o_low.view(heads, q_s, embed_v) - o_low = torch.permute(o_low, (1, 0, 2)).contiguous() - o_high = o_high.view(heads, q_s, embed_v) - o_high = torch.permute(o_high, (1, 0, 2)).contiguous() - - if out_low is None: - out_low = o_low - out_high = o_high - else: - out_low = torch.cat((out_low, o_low), 0) - out_high = torch.cat((out_high, o_high), 0) - - q_offset += q_s - k_offset += max_seq - v_offset += max_seq - - # golden data - out_low = out_low.view(q_ntokens, heads * embed_v) - self.golden_out_low = out_low.to(self.data_type) - - out_high = out_high.view(q_ntokens, heads * embed_v) - self.golden_out_high = out_high.to(torch.float32) - return [self.golden_out_low, self.golden_out_high] - - def golden_compare(self, out_tensor, golden_out_tensor): - return golden_compare_cv.compare_cv(golden_out_tensor[1], golden_out_tensor[0], out_tensor[0].cpu()) - #return self.compare_output_data(out_tensor.cpu(), golden_out_tensor, [0.001, 0.001, 0.003, 0.003, 0.005, 0.005]) - #return torch.allclose(out_tensor.cpu(), golden_out_tensor, rtol=0.001, atol=0.001) - -if __name__ == '__main__': - unittest.main() \ No newline at end of file