diff --git a/community/cv/scBERT/README_CN.md b/community/cv/scBERT/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..e2592fe1f9efd43645400c55672b037bbac3a428 --- /dev/null +++ b/community/cv/scBERT/README_CN.md @@ -0,0 +1,298 @@ +# 目录 + +- [目录](#目录) +- [scBERT描述](#scBERT描述) +- [模型架构](#模型架构) +- [数据集](#数据集) +- [特性](#特性) +- [环境要求](#环境要求) +- [快速入门](#快速入门) +- [脚本说明](#脚本说明) + - [脚本及样例代码](#脚本及样例代码) + - [脚本参数](#脚本参数) + - [训练过程](#训练过程) + - [单卡训练](#单卡训练) + - [分布式训练](#分布式训练) + - [微调过程](#微调过程) + - [微调](#微调) + - [python命令启动](#python命令启动) + - [shell脚本启动](#shell脚本启动) + - [推理过程](#推理过程) + - [用法](#用法) + - [结果](#结果) +- [随机情况说明](#随机情况说明) +- [ModelZoo主页](#modelzoo主页) + +# scBERT描述 + +细胞类型的可靠注释是单细胞RNA测序数据下游分析的前提条件。现有的注释算法通常面临批次效应处理不当、缺乏精心筛选的标记基因列表,或难以利用基因-基因之间潜在相互作用信息等问题。受大规模预训练语言模型的启发,我们提出了一种基于预训练深度神经网络的模型scBERT(单细胞双向编码器表示转换模型),以克服上述挑战。scBERT采用了深度学习领域的预训练和微调的最新范式。在scBERT的第一个阶段,它通过在大量未标记的scRNA-seq数据上进行预训练,获得了对基因-基因相互作用的广泛理解。然后,经过预训练的scBERT可以通过监督微调用于对未见的和特定用户的scRNA-seq数据进行细胞注释任务。更多信息请参考 [https://www.biorxiv.org/content/10.1101/2021.12.05.471261v1](https://www.biorxiv.org/content/10.1101/2021.12.05.471261v1)。 + +# 模型架构 + +本模型主要包含以下组件: + +1. Gene2vec位置编码模块: + - 使用预训练的基因向量进行编码 + - 维度: 200 + +2. Performer核心编码器: + - 多头注意力机制: 10个头 + - 前馈网络层 + - Layer Normalization层 + - Dropout正则化 + +3. 预训练任务: + - 掩码语言建模(MLM) + - 掩码概率: 0.15 + - 替换概率: 0.9 + +4. 下游分类头: + - 全连接层 + - ReLU激活 + - Dropout层 + +# 数据集 + +使用的数据集:[panglao_10000 Zheng68k](https://drive.weixin.qq.com/s?k=AJEAIQdfAAozQt5B8k) + +1. 预训练数据集: +- 名称: Panglao scRNA-seq数据集 +- 格式: H5AD文件 +- 路径: ./data/panglao_10000.h5ad +- 特征数: 16906个基因 +- 数据大小: 99.3MB + +2. 微调数据集: +- 名称: Zheng68k scRNA-seq数据集 +- 格式: H5AD文件 +- 路径: ./data/Zheng68k_prepeocessed.h5ad +- 特征数: 17053个基因 +- 细胞类型数: 11 +- 数据大小: 262MB + +支持的数据集:panglao_10000 Zheng68k 或者与 AnnData 格式相同的数据集 + +- 目录结构如下,由用户定义目录和文件的名称 + +![image](demo/predict-demo.jpg) + +- 如果用户需要自定义数据集,则需要将数据集格式转化为AnnData数据格式。 + +# 特性 + +1. 分布式训练支持 + - 数据并行(Data Parallel) + - 流水线并行(Pipeline Parallel) + +2. 动态学习率 + - 使用ExponentialDecayLR进行学习率调整 + +3. 混合精度训练 + - 支持FP16训练 + +4. 昇腾硬件适配 + - 支持昇腾910训练推理 + +# 环境要求 + +- 硬件(Ascend) + - 使用Ascend处理器来搭建硬件环境。 +- 框架 + - [MindSpore](https://www.mindspore.cn/install) +- 如需查看详情,请参见如下资源 + - [MindSpore教程](https://www.mindspore.cn/tutorials/zh-CN/master/index.html) + - [MindSpore Python API](https://www.mindspore.cn/docs/zh-CN/master/api_python/mindspore.html) + +# 快速入门 + +- 通过官方网站安装Mindspore后,您可以按照如下步骤进行预训练和微调 + +```shell +# 单卡训练 +python pretrain.py \ + --data_path='./data/panglao_10000.h5ad' \ + --epoch=100 \ + --batch_size=4 \ + --learning_rate=1e-4 +``` + +```shell +# 通过shell脚本进行8卡训练 +bash run_distribute_pretrain.sh +``` + +```shell +# 单卡微调 +python finetune.py \ + --data_path='./data/Zheng68k_prepeocessed.h5ad' \ + --model_path='./ckpt/ckpt-0.ckpt' \ + --epoch=100 \ + --batch_size=1 \ + --learning_rate=1e-4 +``` + +```shell +# 多卡微调 +bash run_distribute_finetune.sh +``` + +# 脚本说明 + +## 脚本及样例代码 + +```text + |----README_CN.md + |----ckpt + |----data + |----demo + | |----AnnData.png + |----run_distribute_pretrain.sh + |----run_distribute_finetune.sh + |----pretrain.py + |----finetune.py + |----performer.py + |----dataset_pretrain.py + |----dataset_finetune.py + |----layers.py + |----utils.py +``` + +## 脚本参数 + +train.py中主要的参数如下: + +```text + +--enable_pipeline 是否启用流水线并行,默认为True +--device_id 设备ID +--bin_num 分箱数量,默认值:5 +--gene_num 基因数量,默认值:16906 +--epoch 训练轮数,默认值:100 +--seed 随机种子,默认值:2021 +--batch_size 批次大小,默认值:4 +--learning_rate 学习率,默认值:1e-4 +--valid_every 验证间隔,默认值:1 +--mask_prob 掩码概率,默认值:0.15 +--replace_prob 替换概率,默认值:0.9 +--pos_embed 是否使用Gene2vec编码,默认为True +--data_path 数据路径 +--model_name 模型名称,默认为panglao_pretrain +``` + +## 训练过程 + +### 单卡训练 + +在Ascend设备上,使用python脚本直接开始训练(单卡) + +```shell +# 单卡训练 +python pretrain.py --device_id 0 +``` + +### 分布式训练 + +在Ascend设备上,使用shell脚本执行分布式训练示例(8卡) + +```shell +# 通过shell脚本进行8卡训练 +bash run_distribute_pretrain.sh +``` + +```log + + 上述shell脚本将在后台运行分布式训练。您可以通过training_log.txt文件查看结果。得到如下损失值: + + ```log + + ... + == Epoch: 1 | Training Loss: 0.029950 | Accuracy: 17.1237% == + == Epoch: 1 | Validation Loss: 1.671589 | Accuracy: 0.0000% == + == Epoch: 2 | Training Loss: 0.022785 | Accuracy: 32.4212% == + == Epoch: 2 | Validation Loss: 1.253894 | Accuracy: 3.1250% == + == Epoch: 3 | Training Loss: 0.017635 | Accuracy: 61.4334% == + == Epoch: 3 | Validation Loss: 0.898995 | Accuracy: 75.6098% == + ... + +``` + +## 微调过程 + +### 微调 + +#### python命令启动 + +```shell + +python finetune.py \ + --model_path='./ckpt/pretrain-99.ckpt' \ + --data_path='./data/Zheng68k_prepeocessed.h5ad' + +``` + +#### shell脚本启动 + +```shell + +bash run_distribute_finetune.sh + +``` + +```text + + == Epoch: 1 | Training Loss: 2.027127 | Accuracy: 28.5007% == + == Epoch: 1 | Validation Loss: 1.894380 | Accuracy: 0.300657 == + == Epoch: 2 | Training Loss: 1.293512 | Accuracy: 54.2020% == + == Epoch: 2 | Validation Loss: 0.852387 | Accuracy: 0.695179 == + == Epoch: 3 | Training Loss: 0.617621 | Accuracy: 78.1191% == + == Epoch: 3 | Validation Loss: 0.685155 | Accuracy: 0.738422 == + == Epoch: 4 | Training Loss: 0.395844 | Accuracy: 86.8700% == + == Epoch: 4 | Validation Loss: 0.698182 | Accuracy: 0.741563 == + == Epoch: 5 | Training Loss: 0.249119 | Accuracy: 92.2498% == + == Epoch: 5 | Validation Loss: 0.716395 | Accuracy: 0.756903 == + == Epoch: 6 | Training Loss: 0.163563 | Accuracy: 95.0767% == + == Epoch: 6 | Validation Loss: 0.801939 | Accuracy: 0.752739 == + +``` + +## 推理过程 + +**推理前需使用finetune.py文件生成的模型检查点文件。** + +### 用法 + +执行完整的推理脚本如下: + +```shell + +python predict.py + +``` + +### 结果 + +推理结果保存在当前路径,通过prediction_log.log中看到最终预测结果。 + +```text + +2024-10-29 15:20:13,565 - INFO - Predictions: ['CD19+ B', 'CD19+ B', 'CD19+ B', 'CD19+ B', 'CD19+ B', 'CD19+ B', 'CD19+ B', 'CD19+ B', 'CD19+ B', 'CD19+ B'] + + +``` +# 随机情况说明 + +在训练中存在以下随机性来源: + +1. 数据集随机切分 +2. 随机初始化模型参数 +3. Dropout随机失活 +4. Performer中的随机投影矩阵 +5. 训练数据的随机打乱 + +为了确保结果可复现,我们: +- 使用固定随机种子(--seed参数) +- 使用确定性计算模式 + +# ModelZoo主页 + +请浏览官网[主页](https://gitee.com/mindspore/models)。 \ No newline at end of file diff --git a/community/cv/scBERT/dataset_finetune.py b/community/cv/scBERT/dataset_finetune.py new file mode 100644 index 0000000000000000000000000000000000000000..a52c24828d43167d532eb2558f5494b8cd22ada1 --- /dev/null +++ b/community/cv/scBERT/dataset_finetune.py @@ -0,0 +1,110 @@ +""" +dataset_finetune.py + +该模块定义了一个用于处理单细胞数据的自定义数据集类 SCDataset,并包含了数据加载函数 `load_data`,用于生成训练和验证集。 + +主要功能: +- 自定义数据集类 SCDataset,支持 MindSpore 框架的数据集生成。 +- `load_data` 函数,用于加载和预处理数据,并生成 MindSpore 格式的训练和验证集。 + +依赖库: +- mindspore, numpy, scanpy, sklearn, pickle +""" +import pickle as pkl +import mindspore +from mindspore import Tensor +import numpy as np +import scanpy as sc +from sklearn.model_selection import train_test_split + +# 微调用的带有标签的类型 +class SCDataset: + """ + 自定义单细胞数据集类,用于处理带标签的数据,并支持转换为 MindSpore 数据集格式。 + + 参数: + - data: 数据集 (假设为稀疏矩阵格式) + - labels: 数据标签 + - n_class: 类别数量,用于限制类别索引 + + 方法: + - __getitem__(self, index): 根据索引返回一个样本的数据和标签。 + - __len__(self): 返回数据集的总样本数。 + - to_mind_dataset(self, batch_size=32, repeat_size=1): 将数据集转换为 MindSpore 格式的数据集。 + + 用途: + 该类主要用于单细胞数据的预处理和封装,便于在 MindSpore 框架中进行训练和验证。 + """ + def __init__(self, data, labels, n_class): + self.data = data + self.labels = labels + self.n_class = n_class + + def __getitem__(self, index): + full_seq = self.data[index].toarray()[0] # 假设输入data是稀疏矩阵格式 + full_seq[full_seq > (self.n_class - 2)] = self.n_class - 2 + full_seq = np.append(full_seq, 0).astype(np.int32) # 添加额外的类别 + label = self.labels[index] + label = np.array(label, dtype=np.int32) + return Tensor(full_seq), Tensor(label) + + def __len__(self): + return self.data.shape[0] + + def to_mind_dataset(self, batch_size=32, repeat_size=1): + """ + 将数据集转换为 MindSpore 格式的数据集,以便进行批处理和重复训练。 + + 参数: + - batch_size: 每个批次的样本数量 (默认值为 32)。 + - repeat_size: 重复次数,用于数据集重复 (默认值为 1)。 + + 返回: + - 一个 MindSpore 数据集对象,适用于模型训练和验证。 + + 功能: + 该方法通过生成器函数提供数据样本,并将数据格式调整为适合 MindSpore 的输入格式。 + """ + def generator(): + for i in range(len(self)): + # yield self[i], + data, label = self[i] # 假设 self[i] 返回一个 (data, label) 元组 + yield (data, label) + + # 创建数据集 + types = [mindspore.int32, mindspore.int32] + c_names = ["data", "label"] + ds = mindspore.dataset.GeneratorDataset(generator, column_names=c_names, column_types=types) + ds = ds.batch(batch_size).repeat(repeat_size) + return ds + +def load_data(data_path, n_class, seed, batch_size): + """ + 加载并预处理单细胞数据,将其划分为训练集和验证集,并转换为 MindSpore 格式的数据集。 + + 参数: + - data_path: 数据文件路径,指定单细胞数据文件的路径(.h5ad 格式)。 + - n_class: 类别数量,用于限制数据中的类别索引。 + - seed: 随机种子,用于数据集划分的可重复性。 + - batch_size: 每个批次的样本数量,用于生成 MindSpore 数据集。 + + 返回: + - train_dataset: 训练数据集,以 MindSpore 格式返回。 + - val_dataset: 验证数据集,以 MindSpore 格式返回。 + + 功能: + 该函数读取指定路径的单细胞数据文件,提取和编码标签信息,并将数据划分为训练集和验证集。 + 最终返回适用于 MindSpore 的批量训练数据集和验证数据集。 + """ + data = sc.read_h5ad(data_path) + label_dict, label = np.unique(np.array(data.obs['celltype']), return_inverse=True) + with open('label_dict', 'wb') as fp: + pkl.dump(label_dict, fp) + with open('label', 'wb') as fp: + pkl.dump(label, fp) + data = data.X + X_train, X_test, y_train, y_test = train_test_split(data, label, test_size=0.1, random_state=42) + train_dataset = SCDataset(X_train, y_train, n_class).to_mind_dataset(batch_size=batch_size) + val_dataset = SCDataset(X_test, y_test, n_class).to_mind_dataset(batch_size=batch_size) + print("load data success, train num is {}, val num is {}".format(len(train_dataset), len(val_dataset))) + return train_dataset, val_dataset diff --git a/community/cv/scBERT/dataset_pretrain.py b/community/cv/scBERT/dataset_pretrain.py new file mode 100644 index 0000000000000000000000000000000000000000..0be63c3db2a453a816bdaeff566eccb7a495bc23 --- /dev/null +++ b/community/cv/scBERT/dataset_pretrain.py @@ -0,0 +1,94 @@ +""" +dataset_pretrain.py + +该模块定义了用于预训练的自定义单细胞数据集类 `SCDataset`,以及数据加载函数 `load_data`。 +此模块旨在支持单细胞数据的处理和 MindSpore 框架的数据集生成,尤其是针对分布式训练的支持。 + +主要功能: +- `SCDataset` 类:负责处理数据和标签,并支持生成 MindSpore 数据集。 +- `load_data` 函数:加载数据文件并分割成训练集和验证集,返回 MindSpore 格式的数据集。 + +依赖库: +- mindspore, numpy, scanpy, sklearn +""" +import numpy as np +import scanpy as sc +from sklearn.model_selection import train_test_split + +import mindspore +from mindspore import Tensor +from mindspore.communication import get_group_size, get_rank + +class SCDataset: + """ + 自定义数据集类,用于单细胞数据的预训练处理。 + + 参数: + - data: 数据集,假设为稀疏矩阵格式。 + - n_class: 类别数量,用于限制类别索引范围。 + - seq_len: 序列长度,控制返回的序列数据的最大长度。 + + 方法: + - __getitem__(self, index): 根据索引返回指定样本的数据,限制类别范围并确保序列长度。 + - __len__(self): 返回数据集中的样本数量。 + - to_mind_dataset(self, batch_size=32, repeat_size=1, DP=False): 将数据集转换为 MindSpore 格式,支持分布式处理 (DP)。 + + 功能: + 该类支持单细胞数据的格式化和预处理,为后续模型训练提供 MindSpore 格式的数据集。 + """ + def __init__(self, data, n_class, seq_len): + self.data = data + self.n_class = n_class + self.seq_len = seq_len + + def __getitem__(self, index): + full_seq = self.data[index].toarray()[0] # 假设输入data是稀疏矩阵格式 + full_seq[full_seq > (self.n_class - 2)] = self.n_class - 2 + full_seq = np.append(full_seq, 0).astype(np.int32) # 添加额外的类别 + return Tensor(full_seq[:self.seq_len]) + + def __len__(self): + return self.data.shape[0] + + # MindSpore特定: 转换为MindSpore数据集 + def to_mind_dataset(self, batch_size=32, repeat_size=1, DP=False): + """ + 将数据集转换为 MindSpore 格式的数据集,支持批处理、重复训练和分布式处理。 + + 参数: + - batch_size: 每个批次的样本数量,默认为 32。 + - repeat_size: 重复次数,用于数据集的重复训练,默认为 1。 + - DP: 是否启用数据并行 (Data Parallel, DP) 模式,默认为 False。启用时,按分布式训练的要求划分数据集。 + + 返回: + - 一个 MindSpore 格式的数据集对象,适用于模型训练和验证。 + + 功能: + 该方法利用生成器将数据格式化为 MindSpore 可处理的数据集,并提供分布式训练的支持。 + """ + + # 创建数据集 + types = [mindspore.int32,] + if DP: + group_size = get_group_size() + rank_id = get_rank() + ds = mindspore.dataset.GeneratorDataset( + self, + column_names=["data"], + column_types=types, + num_shards=group_size, + shard_id=rank_id + ) + else: + ds = mindspore.dataset.GeneratorDataset(self, column_names=["data"], column_types=types) + ds = ds.batch(batch_size).repeat(repeat_size) + return ds + +def load_data(data_path, n_class, seed, batch_size, seq_len, args): + data = sc.read_h5ad(data_path) + data = data.X + data_train, data_val = train_test_split(data, test_size=0.1, random_state=seed) + train_dataset = SCDataset(data_train, n_class, seq_len).to_mind_dataset(batch_size=batch_size, DP=args.enable_dp) + val_dataset = SCDataset(data_val, n_class, seq_len).to_mind_dataset(batch_size=batch_size, DP=args.enable_dp) + print("load data success, train num is {}, val num is {}".format(len(train_dataset), len(val_dataset))) + return train_dataset, val_dataset diff --git a/community/cv/scBERT/demo/AnnData.png b/community/cv/scBERT/demo/AnnData.png new file mode 100644 index 0000000000000000000000000000000000000000..e6aa27577a0d28323dcadf6deffab18faf0dbf1c Binary files /dev/null and b/community/cv/scBERT/demo/AnnData.png differ diff --git a/community/cv/scBERT/finetune.py b/community/cv/scBERT/finetune.py new file mode 100644 index 0000000000000000000000000000000000000000..dc5fb25105f046fdfa2a8a45fc8a3c994d260f84 --- /dev/null +++ b/community/cv/scBERT/finetune.py @@ -0,0 +1,378 @@ +""" +finetune.py + +该模块实现了单细胞数据的微调训练过程,包括模型加载、优化器和损失函数的配置、训练和验证过程,以及保存微调后的模型检查点。 + +主要功能: +- `SCDataset` 和 `load_data`:自定义数据集类和数据加载函数,用于单细胞数据的预处理。 +- `build_model`:构建和加载预训练模型,并对其进行微调。 +- `build_optimizer_and_scheduler`:配置优化器和损失函数。 +- `train_one_epoch` 和 `eval_one_epoch`:分别实现训练和验证的单个 epoch 流程。 +- `train`:整体训练循环,管理 epoch、日志记录和模型保存。 +- `parse`:解析命令行参数,用于控制训练的各种配置。 + +依赖库: +- argparse、numpy、mindspore、tqdm、pickle、os 等。 + +此模块支持 MindSpore 框架下的分布式训练。 +""" +import argparse +import os +import pickle as pkl + +from tqdm import tqdm +import mindspore as ms +from mindspore import ops, save_checkpoint, Tensor, value_and_grad, ParallelMode, nn +from mindspore.nn import Adam, CrossEntropyLoss +from mindspore.communication import init + +from dataset_finetune import load_data +from performer import PerformerLM + +# 微调中新的输出层 +class Identity(nn.Cell): + """ + 自定义输出层,用于替换模型的原始输出层,以适应微调过程中新的输出需求。 + + 参数: + - dropout: float,设置 Dropout 概率,用于防止过拟合。 + - h_dim: int,隐藏层维度,用于调整中间全连接层的大小。 + - out_dim: int,输出层的维度,通常与标签类别数一致。 + + 方法: + - construct(x): 前向传播函数,接收输入张量并返回经过全连接层和激活层处理后的输出。 + + 功能: + 该类构建了一个包含卷积、全连接层和 Dropout 层的自定义输出层,以替换模型的原始输出层。 + """ + def __init__(self, dropout=0.1, h_dim=100, out_dim=10): + super(Identity, self).__init__() + self.conv1 = nn.Conv2d(1, 1, (1, 200), pad_mode='valid', padding=0, has_bias=False) + self.act = nn.ReLU() + self.fc1 = nn.Dense(in_channels=SEQ_LEN, out_channels=512, has_bias=True) + self.act1 = nn.ReLU() + self.dropout1 = nn.Dropout(dropout) + self.fc2 = nn.Dense(in_channels=512, out_channels=h_dim, has_bias=True) + self.act2 = nn.ReLU() + self.dropout2 = nn.Dropout(dropout) + self.fc3 = nn.Dense(in_channels=h_dim, out_channels=out_dim, has_bias=True) + + def construct(self, x): + """ + 定义前向传播逻辑,将输入数据通过卷积、全连接层、激活层和 Dropout 层处理,并生成最终输出。 + + 参数: + - x: 输入张量,形状为 (batch_size, seq_len, feature_dim),其中包含模型的特征数据。 + + 返回: + - 输出张量,经过卷积和全连接层处理后的分类结果。 + + 功能: + 该方法实现了自定义输出层的前向传播过程,通过卷积层进行特征提取,再通过多层全连接层实现数据的维度变化, + 并结合激活和 Dropout 层防止过拟合。 + """ + x = x[:, None, :, :] + x = self.conv1(x) + x = self.act(x) + x = x.view(x.shape[0], -1) + x = self.fc1(x) + x = self.act1(x) + x = self.dropout1(x) + x = self.fc2(x) + x = self.act2(x) + x = self.dropout2(x) + x = self.fc3(x) + return x + +model = None +loss_fn = None + +def cum_loss_and_logits(data, label): + global model, loss_fn, SEQ_LEN + logits = model(data) + loss = loss_fn(logits, label) + return loss, logits + +def build_model(args): + """ + 构建并配置预训练模型,用于单细胞数据的微调。加载预训练权重、设置冻结层和微调输出层。 + + 参数: + - args: 包含模型路径和配置参数的命令行参数对象。 + + 功能: + 1. 加载标签字典。 + 2. 使用指定的 `PerformerLM` 结构构建模型。 + 3. 加载预训练权重并设置部分层为冻结状态,以避免它们在微调时更新。 + 4. 使用自定义 `Identity` 类覆盖模型的输出层,以适应新的任务需求。 + 5. 配置流水线模式(如启用)。 + + 返回: + - 无返回值,但全局变量 `model` 被初始化为配置后的模型实例。 + """ + global CLASS, SEQ_LEN, POS_EMBED_USING, model + #load the label stored + with open('label_dict', 'rb') as fp: + label_dict = pkl.load(fp) + model = PerformerLM( + num_tokens=CLASS, + dim=200, + depth=6, + max_seq_len=SEQ_LEN, + heads=10, + ) + args = parse() + # 加载预训练权重 + ckpt_file_name = args.model_path + param_dict = ms.load_checkpoint(ckpt_file_name) + # 将权重加载到模型中 + ms.load_param_into_net(model, param_dict) + # 设置参数是否参与梯度计算 + for param in model.trainable_params(): + param.requires_grad = False + for param in model.norm.trainable_params(): + param.requires_grad = True + for param in model.performer.layers[-2].trainable_params(): + param.requires_grad = True + # 覆盖输出层 + model.to_out = Identity(dropout=0.1, h_dim=128, out_dim=label_dict.shape[0]) + print("build model success.") + count = sum([item.size for item in model.get_parameters()]) + names = [item.name for item in model.trainable_params()] + print("param count is {}, names: {}, count: {}".format(count, str(names), len(names))) + + if args.enable_pipeline: + model.init_pipeline(0) + model.performer.layers[0].init_pipeline(1) + model.performer.layers[0].attention.init_pipeline(1) + +def build_optimizer_and_scheduler(model): + global LEARNING_RATE, loss_fn, optimizer + # optimizer + optimizer = Adam(params=model.trainable_params(), learning_rate=LEARNING_RATE) + # loss + loss_fn = CrossEntropyLoss(weight=None) + print("build optimizer success.") + return optimizer + +def train_one_epoch(train_dataloader, grad_fn, optimizer): + """ + 执行一个训练 epoch,计算累计损失和准确率。 + + 参数: + - train_dataloader: 训练数据加载器,提供批量训练数据。 + - grad_fn: 计算损失和梯度的函数。 + - optimizer: 优化器,用于更新模型参数。 + + 返回: + - running_loss: float,当前 epoch 中的累计损失。 + - cum_acc: float,当前 epoch 中的累计准确率。 + + 功能: + 1. 遍历训练数据,执行前向传播、计算损失和梯度,并使用优化器更新参数。 + 2. 计算批次损失并累加以获得 epoch 的总损失。 + 3. 使用 softmax 和 argmax 计算预测的准确率,并累加以获得 epoch 的准确率。 + """ + global model + running_loss = 0.0 + cum_acc = 0.0 + model.set_train(True) + for _, (data, label) in enumerate(tqdm(train_dataloader.create_tuple_iterator())): + # forward 推理 + (loss, logits), grads = grad_fn(data, label) + optimizer(grads) + # 累加损失 + running_loss += loss.item() + # 计算精度 + final = ops.softmax(logits) + final = final.argmax(axis=-1) + # 预测数 + pred_num = Tensor([final.shape[-1]], ms.int32) + # 计算正确数 + correct_num = ops.Equal()(final, label).sum(axis=-1) + # 计算累计准确率 + cum_acc += correct_num / pred_num.mean() + del data, label, final + + return running_loss, cum_acc + +# 从 Tensor 对象中提取整数值 +def get_value_from_tensor(tensor_list): + return [tensor.asnumpy()[0] for tensor in tensor_list] + +def eval_one_epoch(val_dataloader): + """ + 执行一个验证 epoch,计算累计损失和准确率。 + + 参数: + - val_dataloader: 验证数据加载器,提供批量验证数据。 + + 返回: + - val_loss: float,当前 epoch 中的平均验证损失。 + - val_acc: float,当前 epoch 中的验证准确率。 + + 功能: + 1. 遍历验证数据,执行前向传播并计算每批次的损失。 + 2. 使用 softmax 和 argmax 计算预测值,将预测值与真实标签进行对比以计算准确率。 + 3. 累加批次损失以获得 epoch 的平均验证损失,并计算总的准确率。 + """ + global loss_fn, model, SEQ_LEN + model.set_train(False) + predictions = [] + truths = [] + running_loss = 0.0 + print("========== 开始验证") + for _, (data, label) in enumerate(tqdm(val_dataloader.create_tuple_iterator())): + logits = model(data) + loss = loss_fn(logits, label) + running_loss += loss.item() + softmax = nn.Softmax(axis=-1) + final_prob = softmax(logits) + final = final_prob.argmax(axis=-1) + predictions.append(final) + truths.append(label) + del data, logits, final + val_loss = running_loss / len(val_dataloader) + # 获取 truths 和 predictions 的实际值 + truths_values = get_value_from_tensor(truths) + predictions_values = get_value_from_tensor(predictions) + # 计算正确率 + correct_count = sum(t == p for t, p in zip(truths_values, predictions_values)) + total_count = len(truths_values) + val_acc = correct_count / total_count if total_count > 0 else 0 + # 计算正确数 + del predictions, truths + return val_loss, val_acc + +def train(optimizer, train_dataloader, val_dataloader): + """ + 执行完整的模型训练过程,包括每个 epoch 的训练、日志记录、验证和模型检查点保存。 + + 参数: + - optimizer: 优化器实例,用于更新模型参数。 + - train_dataloader: 训练数据加载器,提供训练数据的批次迭代。 + - val_dataloader: 验证数据加载器,用于在训练过程中评估模型性能。 + + 功能: + 1. 在每个 epoch 中调用 `train_one_epoch` 进行训练,计算损失和准确率。 + 2. 每隔指定的 epoch 间隔执行一次验证,通过调用 `eval_one_epoch` 计算验证损失和准确率。 + 3. 将每个 epoch 的训练和验证结果记录到日志文件,并打印输出。 + 4. 在每个 epoch 结束时保存模型检查点,以备恢复和后续微调。 + + 返回: + - 无返回值,但生成日志文件并保存模型检查点。 + """ + global EPOCHS, VALIDATE_EVERY, loss_fn + + train_num_step = len(train_dataloader) + grad_fn = value_and_grad(cum_loss_and_logits, grad_position=None, weights=model.trainable_params(), has_aux=True) + for epoch in range(EPOCHS): + running_loss, cum_acc = train_one_epoch(train_dataloader, grad_fn, optimizer) + # log epoch的信息 + epoch_loss = running_loss / train_num_step + epoch_acc = 100 * cum_acc / train_num_step + + # 确保将Tensor转换为Python数值 + epoch_loss_value = epoch_loss.asnumpy().item() if isinstance(epoch_loss, ms.Tensor) else epoch_loss + epoch_acc_value = epoch_acc.asnumpy().item() if isinstance(epoch_acc, ms.Tensor) else epoch_acc + + log_string = ( + f' == Epoch: {epoch} | ' + f'Training Loss: {epoch_loss_value:.6f} | ' + f'Accuracy: {epoch_acc_value:6.4f}% ==' + ) + + print(log_string) + with open('finetune_result.txt', 'a') as f: + f.write(log_string + '\n') + + # 进行一次验证 + if epoch % VALIDATE_EVERY == 0: + val_loss, val_acc = eval_one_epoch(val_dataloader) + log_string = f' == Epoch: {epoch} | Validation Loss: {val_loss} | Accuracy: {val_acc.item()}% ==' + print(log_string) + with open('finetune_result.txt', 'a') as f: + f.write(log_string + '\n') + + ckpt_dir = "./" + FINETUNE_SAVE_PATH + if not os.path.exists(ckpt_dir): + os.makedirs(ckpt_dir, exist_ok=True) + ckpt_file = f"finetune-{epoch}.ckpt" + ckpt_path = os.path.join(ckpt_dir, ckpt_file) + save_checkpoint(model, ckpt_path) + +def parse(): + """ + 解析命令行参数,以配置微调模型的训练过程。 + + 功能: + 该函数使用 argparse 库定义和解析多个命令行参数,包括设备设置、数据路径、模型路径、训练超参数等。 + + 返回: + - args: argparse.Namespace 对象,包含所有解析后的参数值,可用于控制训练过程的配置。 + """ + parser = argparse.ArgumentParser() + parser.add_argument("--enable_pipeline", type=bool, default=False, help='Local process rank.') + parser.add_argument("--device_id", type=int, default=-1, help='Local process rank.') + parser.add_argument("--bin_num", type=int, default=5, help='Number of bins.') + parser.add_argument("--gene_num", type=int, default=16906, help='Number of genes.') + parser.add_argument("--epoch", type=int, default=100, help='Number of epochs.') + parser.add_argument("--seed", type=int, default=2021, help='Random seed.') + parser.add_argument("--batch_size", type=int, default=1, help='Number of batch size.') + parser.add_argument("--learning_rate", type=float, default=1e-4, help='Learning rate.') + parser.add_argument("--grad_acc", type=int, default=60, help='Number of gradient accumulation.') + parser.add_argument( + "--valid_every", + type=int, + default=1, + help='Number of training epochs between twice validation.' + ) + parser.add_argument("--pos_embed", type=bool, default=True, help='Using Gene2vec encoding or not.') + parser.add_argument( + "--data_path", + type=str, + default='./data/Zheng68k_prepeocessed.h5ad', + help='Path of data for finetune.' + ) + parser.add_argument("--model_path", type=str, default='./ckpt/ckpt-0.ckpt', help='Path of pretrained model.') + parser.add_argument("--ckpt_dir", type=str, default='./finetune_ckpts/', help='Directory of checkpoint to save.') + parser.add_argument("--model_name", type=str, default='finetune', help='Finetuned model name.') + args = parser.parse_args() + return args + +if __name__ == "__main__": + # 1. 解析命令行参数 + args = parse() + if args.enable_pipeline: + ms.set_context(mode=0, device_target="Ascend") + ms.set_auto_parallel_context( + parallel_mode=ParallelMode.SEMI_AUTO_PARALLEL, + pipeline_stages=2, + pipeline_result_broadcast=True + ) + init() + ms.set_seed(1) + else: + ms.set_context(max_device_memory='29GB') + ms.set_context(mode=0, device_target="Ascend", device_id=0) + # 2. 声明全局变量 + SEED = args.seed + EPOCHS = args.epoch + BATCH_SIZE = args.batch_size + GRADIENT_ACCUMULATION = args.grad_acc + LEARNING_RATE = args.learning_rate + SEQ_LEN = args.gene_num + 1 + VALIDATE_EVERY = args.valid_every + PATIENCE = 10 + UNASSIGN_THRES = 0.0 + CLASS = args.bin_num + 2 + POS_EMBED_USING = args.pos_embed + FINETUNE_SAVE_PATH = args.ckpt_dir + # 3. 加载数据集 + train_dataloader, val_dataloader = load_data(args.data_path, CLASS, SEED, BATCH_SIZE) + # 4. 加载模型 + build_model(args) + # 4. 构建优化器和损失函数 + optimizer = build_optimizer_and_scheduler(model) + # 5. 开始训练 + train(optimizer, train_dataloader, val_dataloader) diff --git a/community/cv/scBERT/layers.py b/community/cv/scBERT/layers.py new file mode 100644 index 0000000000000000000000000000000000000000..99da0719dd8f2b8883388ec443fe411fee8a975e --- /dev/null +++ b/community/cv/scBERT/layers.py @@ -0,0 +1,175 @@ +""" +layers.py + +该模块定义了多个用于单细胞数据处理和自定义神经网络模型的基本层,包括嵌入层、矩阵乘法、自注意力层等。 + +主要类: +- Gene2VecPositionalEmbedding: 基于预训练的基因向量权重构建的嵌入层。 +- BatchMatricMul: 批量矩阵乘法,用于高效的多维张量操作。 +- Add: 两个张量的加法操作。 +- SelfAttention: 自注意力机制,用于计算特征在不同维度之间的关联。 + +依赖库: +- numpy, mindspore, utils +""" +import mindspore as ms +from mindspore.nn import Cell, Embedding, GELU, Dense, Dropout, LayerNorm, Softmax, Tensor, ops +import numpy as np +from utils import default + +class Gene2VecPositionalEmbedding(Cell): + """ + 基因位置嵌入层,使用预训练的基因向量(gene2vec)作为嵌入表,为基因序列数据生成位置嵌入。 + + 参数: + - max_seq_len: int,最大序列长度,表示嵌入向量表的尺寸。 + + 功能: + 该类加载预训练的基因向量文件,并使用 MindSpore 的嵌入层,将基因 ID 转换为固定维度的向量表示。 + 嵌入表中的权重是预训练的 gene2vec 权重,且被设置为不参与训练更新。 + """ + def __init__(self, max_seq_len=16907): + super().__init__() + gene2vec_weight = np.load('./data/gene2vec_16906.npy') + gene2vec_weight = gene2vec_weight + gene2vec_weight = np.concatenate((gene2vec_weight, np.zeros((1, gene2vec_weight.shape[1]))), axis=0) + gene2vec_weight = Tensor(gene2vec_weight, dtype=ms.float32) + self.emb = Embedding( + vocab_size=max_seq_len, + embedding_size=200, + embedding_table=gene2vec_weight, + dtype=ms.float32 + ) + self.emb.embedding_table.requires_grad = False + + def construct(self, x): + t = ops.arange(start=0, end=x.shape[1], dtype=ms.int32) + return self.emb(t) + +class BatchMatricMul(Cell): + def __init__(self, transpose_a=False, transpose_b=False): + super().__init__() + self.matmul = ops.BatchMatMul(transpose_a, transpose_b) + + def construct(self, a, b): + return self.matmul(a, b) + +class Add(Cell): + def __init__(self): + super().__init__() + self.add = ops.Add() + + def construct(self, a, b): + return self.add(a, b) + +class SelfAttention(Cell): + """ + 自注意力层,实现多头自注意力机制,用于计算输入特征之间的关联性。 + + 参数: + - dim: int,输入的特征维度。 + - heads: int,注意力头的数量,默认值为 8。 + - dim_head: int,每个头的维度,默认值为 64。 + - dropout: float,Dropout 概率,默认值为 0。 + + 功能: + 1. 通过 `Dense` 层生成查询 (Q)、键 (K) 和值 (V) 向量,并进行多头分割。 + 2. 使用批量矩阵乘法计算注意力分数,并应用 softmax 进行归一化。 + 3. 将注意力分数和值向量相乘得到加权表示,通过 `Dense` 层转换为输出维度。 + 4. 使用残差连接、层归一化和全连接层来进一步处理输出。 + + 该类用于深度学习模型中的自注意力机制,特别适合于需要处理高维特征序列的任务。 + """ + def __init__( + self, + dim, + heads=8, + dim_head=64, + dropout=0., + ): + super().__init__() + assert dim % heads == 0, 'dimension must be divisible by number of heads' + self.dim_head = default(dim_head, dim // heads) + self.inner_dim = dim_head * heads + + self.heads = heads + self.reshape = ops.Reshape() + # stage 1 + self.to_q = Dense(in_channels=dim, out_channels=self.inner_dim, dtype=ms.float32, has_bias=False) + self.to_k = Dense(in_channels=dim, out_channels=self.inner_dim, dtype=ms.float32, has_bias=False) + self.to_v = Dense(in_channels=dim, out_channels=self.inner_dim, dtype=ms.float32, has_bias=False) + self.to_out = Dense(in_channels=self.inner_dim, out_channels=dim, dtype=ms.float32, has_bias=False) + self.dropout1 = Dropout(p=dropout+0.00000001) + + # stage 2 + self.matmul = BatchMatricMul(False, True) + self.softmax = Softmax(axis=-1) + self.mul = BatchMatricMul() + self.layer_norm = LayerNorm((dim,)) + self.w1 = Dense(in_channels=dim, out_channels=dim * 4, dtype=ms.float32) + self.act = GELU() + self.dropout2 = Dropout(p=dropout+0.000001) + self.w2 = Dense(in_channels=dim * 4, out_channels=dim, dtype=ms.float32) + self.add1 = Add() + self.add2 = Add() + + def construct(self, x): + """ + 前向传播方法,计算输入特征的自注意力表示。 + + 参数: + - x: 输入张量,形状为 (batch_size, seq_len, dim),表示输入的特征序列。 + + 返回: + - 输出张量,形状与输入相同 (batch_size, seq_len, dim),经过多头自注意力和残差连接后的特征表示。 + + 功能: + 1. 计算查询 (Q)、键 (K) 和值 (V) 向量,进行多头分割。 + 2. 使用批量矩阵乘法计算注意力分数,并通过 softmax 进行归一化。 + 3. 结合注意力分数和值向量,得到加权特征表示并进行维度变换。 + 4. 通过残差连接和层归一化,返回最终的自注意力特征表示。 + """ + # (batch_size, 16906, 200, 10, 10) + b, n, _, h = *x.shape, self.heads + + # 这里就是 [bs, seq, hidden] -> [bs, head, seq, head_dim] + q, k, v = self.to_q(x), self.to_k(x), self.to_v(x) + q = self.reshape(q, (b, h, n, self.dim_head)) + k = self.reshape(k, (b, h, n, self.dim_head)) + v = self.reshape(v, (b, h, n, self.dim_head)) + score = self.matmul(q, k) + out = self.mul(self.softmax(score), v) + out = self.reshape(out, (b, n, self.inner_dim)) + attn_out = self.to_out(out) + attn_out = self.add1(x, attn_out) + x = self.layer_norm(attn_out) + x = self.w1(x) + x = self.act(x) + x = self.dropout1(x) + x = self.w2(x) + out = self.add2(attn_out, x) + return self.dropout2(attn_out) + + def init_pipeline(self): + """ + 初始化流水线阶段设置,将 `SelfAttention` 层的各个子模块分配到不同的流水线阶段。 + + 功能: + 为启用流水线并行训练的模块,设置每个子层所属的流水线阶段,以便在多设备训练中分阶段执行。 + 该方法分配了查询、键、值、Dropout 和全连接层到不同的流水线阶段,提高训练效率。 + """ + self.to_q.pipeline_stage = 0 + self.to_k.pipeline_stage = 0 + self.to_v.pipeline_stage = 0 + self.dropout1.pipeline_stage = 1 + self.matmul.pipeline_stage = 1 + self.mul.pipeline_stage = 1 + self.softmax.pipeline_stage = 1 + self.to_out.pipeline_stage = 2 + self.add1.pipeline_stage = 2 + self.w1.pipeline_stage = 2 + self.act.pipeline_stage = 2 + self.dropout1.pipeline_stage = 2 + self.w2.pipeline_stage = 3 + self.add2.pipeline_stage = 3 + self.dropout2.pipeline_stage = 3 diff --git a/community/cv/scBERT/performer.py b/community/cv/scBERT/performer.py new file mode 100644 index 0000000000000000000000000000000000000000..d5de3cc3dfd00b79bb6b4e0e67357ee4bf55ec53 --- /dev/null +++ b/community/cv/scBERT/performer.py @@ -0,0 +1,669 @@ +""" +performer.py + +该模块实现了 Performer 架构,用于高效计算长序列数据的自注意力机制,并包括了线性和因果自注意力等优化实现。 + +主要类和方法: +- SelfAttention: 自注意力层,使用 FastAttention 和 Softmax 核函数以提高性能。 +- FastAttention: 实现高效的自注意力计算。 +- Linear_attention 和 Causal_linear_attention: 分别实现线性和因果自注意力,支持更长序列的处理。 +- Performer_layer: Performer 模型的基本层,由自注意力和前馈网络组成。 +- PerformerLM: 完整的基于 Performer 的语言模型,包括嵌入、位置编码、自注意力和输出层。 + +依赖库: +- numpy, mindspore, layers, utils +""" +import math +import numpy as np +import mindspore as ms +import mindspore.common.dtype as mstype +import mindspore.nn as nn +import mindspore.ops.functional as F +from mindspore.ops import operations as P +from mindspore.common.tensor import Tensor +from mindspore.common.parameter import Parameter +from mindspore.common.initializer import initializer, Normal +from layers import Gene2VecPositionalEmbedding + +# helpers +def exists(val): + return val is not None +def empty(tensor): + return tensor.numel() == 0 +def default(val, d): + return val if exists(val) else d + +def softmax_kernel(data, projection_matrix, is_query=False, normalize_data=True, eps=1e-4): + """ + data:[Batch,Heads,Seq,Dim_head] + projection_matrix:[m,Dim_head] + + """ + b, h, Seq, Dim_head = data.shape + data_normalizer = (data.shape[-1] ** -0.25) if normalize_data else 1. + ratio = (projection_matrix.shape[0] ** -0.5) + # W'*X + data_dash = data_normalizer * P.MatMul(transpose_b=True)(P.Reshape()(data, (-1, Dim_head)), projection_matrix) + data_dash = P.Reshape()(data_dash, (b, h, Seq, -1)) + # |X|^2/2 + diag_data = data ** 2 + diag_data = P.ReduceSum(keep_dims=True)(diag_data, -1) + diag_data = (diag_data / 2.0) * (data_normalizer ** 2) + #exp(W'x-|X|^2/2) + if is_query: + data_dash = ratio * ( + P.Exp()(data_dash - diag_data - + P.ReduceMax(keep_dims=True)(data_dash, -1)) + eps) + else: + data_dash = ratio * ( + P.Exp()(data_dash - diag_data - P.ReduceMax()(data_dash)) + eps) + + return data_dash + +def orthogonal_matrix_chunk(cols, qr_uniform_q=False): + """ + 生成一个正交矩阵块,用于随机投影。 + + 参数: + - cols: int,矩阵的列数。 + - qr_uniform_q: bool,指定是否在 QR 分解后使正交矩阵的列方向保持一致(通过调整符号)。 + + 返回: + - q: Tensor,正交矩阵,形状为 (cols, cols)。 + + 功能: + 通过生成随机矩阵并使用 QR 分解,创建正交矩阵块。若 `qr_uniform_q` 为 True,将通过调整符号确保矩阵列的方向一致。 + """ + unstructured_block = np.random.randn(cols, cols).astype(np.float32) + q, r = np.linalg.qr(unstructured_block, mode='reduced') + if qr_uniform_q: + d = np.diag(r, 0) + q *= np.sign(d) + # 转mindspore Tensor + q = np.transpose(q) + q = Tensor(q) + return q + +def gaussian_orthogonal_random_matrix(nb_rows, nb_columns, scaling=0, qr_uniform_q=False): + """ + 生成高斯分布的正交随机矩阵,用于随机特征映射和线性变换。 + + 参数: + - nb_rows: int,生成矩阵的行数。 + - nb_columns: int,生成矩阵的列数。 + - scaling: int,控制矩阵的缩放方式,0 表示无缩放,1 表示按列数缩放。 + - qr_uniform_q: bool,指定是否在 QR 分解后调整正交矩阵的列方向。 + + 返回: + - final_matrix: Tensor,经过缩放的正交随机矩阵,形状为 (nb_rows, nb_columns)。 + + 功能: + 该函数通过创建多个正交矩阵块并按需求拼接,生成一个具有高斯分布特性的正交随机矩阵。 + 支持根据缩放参数调整矩阵的标准化方式。 + """ + nb_full_blocks = int(nb_rows / nb_columns) + block_list = [] + for _ in range(nb_full_blocks): + q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q,) + block_list.append(q) + remaining_rows = nb_rows - nb_full_blocks * nb_columns + if remaining_rows > 0: + q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q=qr_uniform_q,) + block_list.append(q[:remaining_rows]) + final_matrix = P.Concat()(tuple(block_list)) + + if scaling == 0: + multiplier = Tensor(np.diag(np.linalg.norm(np.random.randn(nb_rows, nb_columns).astype(np.float32), axis=1))) + elif scaling == 1: + multiplier = Tensor(np.diag(math.sqrt((float(nb_columns))) * np.ones((nb_rows,)))) + else: + raise ValueError(f'Invalid scaling {scaling}') + + return P.MatMul()(multiplier, final_matrix) + +class Softmax_kernel(nn.Cell): + """ + 使用 Softmax 核函数计算输入数据的加权表示,适用于快速自注意力机制。 + + 功能: + - 通过内积计算数据的投影矩阵,并对输入数据进行规范化和归一化处理。 + - 提供 `construct` 方法,通过将数据转换为 Softmax 核表示以加速注意力计算。 + + 方法: + - construct(data, projection_matrix, is_query=False, normalize_data=True, eps=1e-4): + 根据投影矩阵和其他参数,对数据进行转换和归一化。 + + 参数: + - data: Tensor,输入数据,形状为 [Batch, Heads, Seq, Dim_head]。 + - projection_matrix: Tensor,投影矩阵,用于输入数据的降维和归一化。 + - is_query: bool,指示数据是否为查询向量,影响归一化策略。 + - normalize_data: bool,指示是否对输入数据进行归一化。 + - eps: float,防止除零的小数值。 + + 返回: + - data_dash: Tensor,处理后的 Softmax 核表示。 + """ + def __init__(self): + super().__init__() + self.Reshape = P.Reshape() + self.MatMul_b = P.MatMul(transpose_b=True) + self.ReduceSum = P.ReduceSum(keep_dims=True) + self.Exp = P.Exp() + self.ReduceMax_keep = P.ReduceMax(keep_dims=True) + self.ReduceMax = P.ReduceMax() + def construct(self, data, projection_matrix, is_query=False, normalize_data=True, eps=1e-4): + """ + data:[Batch,Heads,Seq,Dim_head] + projection_matrix:[m,Dim_head] + + """ + b, h, Seq, Dim_head = data.shape + data_normalizer = (data.shape[-1] ** -0.25) if normalize_data else 1. + ratio = (projection_matrix.shape[0] ** -0.5) + # W'*X + data_dash = data_normalizer * self.MatMul_b(self.Reshape(data, (-1, Dim_head)), projection_matrix) + data_dash = self.Reshape(data_dash, (b, h, Seq, -1)) + # |X|^2/2 + diag_data = data ** 2 + diag_data = self.ReduceMax_keep(diag_data, -1) + diag_data = (diag_data / 2.0) * (data_normalizer ** 2) + # exp(W'x-|X|^2/2) + if is_query: + adjusted_data = data_dash - diag_data - self.ReduceMax_keep(data_dash, -1) + exp_data = self.Exp(adjusted_data) + eps + data_dash = ratio * exp_data + + else: + data_dash = ratio * ( + self.Exp(data_dash - diag_data - self.ReduceMax(data_dash)) + eps) + + return data_dash + +class Linear_attention(nn.Cell): + """ + 线性注意力层,计算输入查询 (q)、键 (k) 和值 (v) 向量的加权表示,以实现高效的注意力机制。 + + 功能: + - 提供 `construct` 方法,通过累积和矩阵乘法计算查询、键和值之间的线性注意力。 + + 方法: + - construct(q, k, v): + 使用键的累积和计算查询的归一化因子,将查询和累积的键-值上下文相乘以获得加权输出。 + + 参数: + - q: Tensor,查询向量,形状为 [Batch, Seq_len, Heads, Dim_head]。 + - k: Tensor,键向量,形状为 [Batch, Seq_len, Heads, Dim_head]。 + - v: Tensor,值向量,形状为 [Batch, Seq_len, Heads, Dim_head]。 + + 返回: + - out: Tensor,线性注意力的输出结果,与输入形状相同。 + """ + def __init__(self): + super().__init__() + self.ReduceSum = P.ReduceSum(keep_dims=True) + self.BatchMatMul_b = P.BatchMatMul(transpose_b=True) + self.BatchMatMul_a = P.BatchMatMul(transpose_a=True) + self.BatchMatMul = P.BatchMatMul() + self.Mul = P.Mul() + def construct(self, q, k, v): + """ + k,q,v:[B,Sq,H] + """ + # [B,1,H] + k_cumsum = self.ReduceSum(k, -2) + # [B,Sq,1] + D_inv = 1. /self.BatchMatMul_b(q, k_cumsum) + # [B,H,H] + context = self.BatchMatMul_a(k, v) + # [B,Sq,H] + out = self.BatchMatMul(q, context) + # [B,Sq,H]*[B,Sq,1] -> + out = self.Mul(out, D_inv) + return out + +class Causal_linear_attention(nn.Cell): + """ + 因果线性注意力层,计算输入查询 (q)、键 (k) 和值 (v) 向量的加权表示,适用于因果推理的自注意力机制。 + + 功能: + - 提供 `construct` 方法,通过累积和计算键和值的上下文,并应用因果掩码来确保信息的方向性。 + + 方法: + - construct(q, k, v): 使用键的累积和计算查询的归一化因子,将查询和因果上下文相乘以获得加权输出。 + + 参数: + - q: Tensor,查询向量,形状为 [Batch, Seq_len, Heads, Dim_head]。 + - k: Tensor,键向量,形状为 [Batch, Seq_len, Heads, Dim_head]。 + - v: Tensor,值向量,形状为 [Batch, Seq_len, Heads, Dim_head]。 + + 返回: + - out: Tensor,因果线性注意力的输出结果,与输入形状相同。 + """ + def __init__(self): + super().__init__() + self.view_ = P.Reshape() + self.CumSum = P.CumSum() + self.ReduceSum = P.ReduceSum(keep_dims=True) + self.BatchMatMul_b = P.BatchMatMul(transpose_b=True) + self.BatchMatMul_a = P.BatchMatMul(transpose_a=True) + self.Mul = P.Mul() + def construct(self, q, k, v): + """ + 执行因果线性注意力的前向传播,计算查询 (q)、键 (k) 和值 (v) 的加权表示,确保因果关系。 + + 参数: + - q: Tensor,查询向量,形状为 [Batch, Seq_len, Heads, Dim_head]。 + - k: Tensor,键向量,形状为 [Batch, Seq_len, Heads, Dim_head]。 + - v: Tensor,值向量,形状为 [Batch, Seq_len, Heads, Dim_head]。 + + 返回: + - out: Tensor,因果线性注意力的输出,形状与输入相同。 + + 功能: + - 计算键向量的累积和以形成因果上下文。 + - 使用累积和与查询相乘,应用因果掩码,确保仅使用前序信息,输出满足因果性的加权表示。 + """ + k_cumsum = self.CumSum(k, -2) + # [n,] + D_inv = 1. / self.ReduceSum(q * k_cumsum, -1) + # [n,d,1]*[n,1,e] -> [n,d,e] + context = self.BatchMatMul_b(self.view_(k, k.shape + (1,)), self.view_(v, v.shape + (1,))) + #[n,d,e] -> + context = self.CumSum(context, -3) + # [n,1,d] * [n,d,e] -> [n,1,e] = [n,e] + out = self.BatchMatMul_a(self.view_(q, q.shape + (1,)), context) + out = self.view_(out, v.shape) + out = self.Mul(out, D_inv) + return out + +class LayerNorm(nn.Cell): + """ + Layer Normalization + + Args: + normalized_shape: the corresponding shape of the normalized axes + eps: epsilon, a small number avoiding zero division + + Inputs: + x: input tensor + + Returns: + rescaled_output: Tensor, returned tensor after layernorm + """ + def __init__(self, normalized_shape, eps=1e-5): + super(LayerNorm, self).__init__() + self.gamma = Parameter(initializer('ones', normalized_shape), name="gamma") + self.beta = Parameter(initializer('zeros', normalized_shape), name="beta") + self.mean = P.ReduceMean(keep_dims=True) + self.eps = eps + + def construct(self, x): + mean = self.mean(x, -1) + variance = self.mean(F.square(x - mean), -1) + output = (x - mean) / F.sqrt(variance + self.eps) + rescaled_output = output * self.gamma + self.beta + return rescaled_output + +class FeedForward(nn.Cell): + """ + 前馈神经网络层,通常用于自注意力模块后的进一步特征提取和非线性变换。 + + 参数: + - dim: int,输入和输出的特征维度。 + - mult: int,隐藏层维度的倍数,用于扩大特征空间,默认值为 4。 + - initializer_range: float,用于初始化权重的标准差,默认值为 0.02。 + - hidden_dropout_prob: float,Dropout 概率,默认值为 0.1。 + - compute_type: 数据类型,用于指定计算精度,默认值为 `mstype.float32`。 + + 方法: + - construct(x): 前向传播方法,计算输入张量的前馈网络输出。 + + 功能: + 该类实现了一个两层前馈网络,使用 GELU 激活函数和 Dropout 层,适用于注意力模块后的特征变换。 + """ + def __init__(self, dim, + mult=4, + initializer_range=0.02, + hidden_dropout_prob=0.1, + compute_type=mstype.float32): + super(FeedForward, self).__init__() + self.hidden_size = dim + self.w1 = Mapping(dim, dim*mult, initializer_range, compute_type) + self.w2 = Mapping(dim * mult, dim, initializer_range, compute_type) + self.act = nn.GELU() + self.dropout = nn.Dropout(hidden_dropout_prob) + def construct(self, x): + x = self.w1(x) + x = self.act(x) + x = self.w2(x) + x = self.dropout(x) + return x + +class Mapping(nn.Cell): + """ + A mapping function with a 3d input + Args: + input_size: the size of the last dimension of the input tensor + output_size: the desired size of the last dimension of the output tensor + dtype: the compute datatype + scale: the scale factor for initialization + Inputs: + x: the 3d input + Returns: + output: Tensor, a 3d tensor after projection + """ + def __init__(self, input_size, output_size, initializer_range=0.02, dtype=ms.float32, scale=1.0): + super(Mapping, self).__init__() + self.output_size = output_size + self.input_size = input_size + self.weight = Parameter( + initializer(Normal(sigma=initializer_range*scale), [input_size, output_size]), + name="Weight" + ) + self.bias = Parameter(initializer("zeros", [output_size,]), name="Bias") + self.dtype = dtype + self.cast = P.Cast() + + def construct(self, x): + out_shape = P.Shape()(x)[:-1] + (self.output_size,) + x = P.Reshape()(x, (-1, self.input_size)) + x = nn.MatMul()(x, self.cast(self.weight, self.dtype)) + self.cast(self.bias, self.dtype) + output = P.Reshape()(x, out_shape) + return output + +class FastAttention(nn.Cell): + """ + 快速注意力层,用于高效计算自注意力,适用于长序列数据。 + + 参数: + - dim_heads: int,每个注意力头的维度。 + - nb_features: int,用于随机投影的特征数,默认为 dim_heads 的对数倍。 + - ortho_scaling: int,正交矩阵的缩放方式,默认值为 0。 + - causal: bool,是否启用因果注意力,控制信息传递的方向性,默认为 False。 + - qr_uniform_q: bool,指定是否在 QR 分解后使正交矩阵的列方向保持一致。 + + 方法: + - construct(q, k, v): 前向传播方法,使用快速注意力机制计算查询 (q)、键 (k) 和值 (v) 的加权表示。 + + 功能: + 该类使用 Softmax 核和线性/因果线性注意力机制,提供高效的自注意力计算方式,适合处理长序列输入。 + """ + def __init__(self, dim_heads, nb_features=None, ortho_scaling=0, causal=False, qr_uniform_q=False): + super(FastAttention, self).__init__() + nb_features = default(nb_features, int(dim_heads * math.log(dim_heads))) + self.dim_heads = dim_heads + self.nb_features = nb_features + self.ortho_scaling = ortho_scaling + ## projection_matrix is buffer + self.projection_matrix = gaussian_orthogonal_random_matrix(nb_rows=self.nb_features, + nb_columns=dim_heads, + scaling=ortho_scaling, + qr_uniform_q=qr_uniform_q) + self.causal = causal + self.attn_fn = Linear_attention() if not self.causal else Causal_linear_attention() + self.softmax_kernel = Softmax_kernel() + def construct(self, q, k, v): + q = self.softmax_kernel(data=q, projection_matrix=self.projection_matrix, is_query=True) + k = self.softmax_kernel(data=k, projection_matrix=self.projection_matrix, is_query=False) + out = self.attn_fn(q, k, v) + return out + +class SelfAttention(nn.Cell): + """ + 自注意力层,使用快速注意力机制计算查询 (q)、键 (k) 和值 (v) 的加权表示,适用于高效的长序列处理。 + + 参数: + - dim: int,输入的特征维度。 + - heads: int,注意力头的数量。 + - dim_head: int,每个头的特征维度。 + - causal: bool,是否启用因果注意力机制,默认为 False。 + - nb_features: int,随机投影的特征数,默认为 None。 + - qr_uniform_q: bool,指定是否在 QR 分解后使正交矩阵的列方向保持一致。 + - dropout: float,Dropout 概率,默认值为 0.9。 + + 方法: + - construct(x): 前向传播方法,计算输入特征的多头自注意力输出。 + + 功能: + 该类通过 `FastAttention` 实现了快速的自注意力机制,适合长序列输入,并通过多个注意力头提升表示能力。 + """ + def __init__(self, dim, heads, dim_head, causal=False, nb_features=None, qr_uniform_q=False, dropout=0.9): + super(SelfAttention, self).__init__() + assert dim % heads == 0, 'dimension must be divisible by number of heads' + self.dim_head = dim_head + self.fast_attention = FastAttention( + dim_heads=self.dim_head, + nb_features=nb_features, + causal=causal, + qr_uniform_q=qr_uniform_q + ) + self.heads = heads + self.to_q = Mapping(dim, dim) + self.to_k = Mapping(dim, dim) + self.to_v = Mapping(dim, dim) + self.to_out = Mapping(dim, dim) + self.dropout = nn.Dropout(dropout) + self.view = P.Reshape() + self.Concat = P.Concat(axis=1) + self.Mul = P.Mul() + self.ExpandDims = P.ExpandDims() + self.Tile = P.Tile() + def construct(self, x): + """ + #b:batch_size + #h:num_heads + #n:seq_len + #d:dim_perhead + """ + b, n, _, = x.shape + h = self.heads + + q, k, v = self.to_q(x), self.to_k(x), self.to_v(x) + q = self.view(q, (b, h, n, self.dim_head)) + k = self.view(k, (b, h, n, self.dim_head)) + v = self.view(v, (b, h, n, self.dim_head)) + + out = self.fast_attention(q, k, v) + out = self.view(out, (b, n, h* self.dim_head)) + out = self.to_out(out) + + return self.dropout(out) + +class EmbeddingLookup(nn.Cell): + """ + A embeddings lookup table with a fixed dictionary and size. + + Args: + vocab_size (int): Size of the dictionary of embeddings. + embedding_size (int): The size of each embedding vector. + use_one_hot_embeddings (bool): Specifies whether to use one hot encoding form. Default: False. + initializer_range (float): Initialization value of TruncatedNormal. Default: 0.02. + """ + def __init__(self, + vocab_size, + embedding_size, + use_one_hot_embeddings=False, + initializer_range=0.02): + super(EmbeddingLookup, self).__init__() + self.vocab_size = vocab_size + self.embedding_size = embedding_size + self.use_one_hot_embeddings = use_one_hot_embeddings + self.embedding_table = Parameter(initializer(Normal(sigma=initializer_range), + [vocab_size, embedding_size]), name="embedding_table") + self.expand = P.ExpandDims() + self.shape_flat = (-1,) + self.gather = P.GatherV2() + self.one_hot = P.OneHot() + self.on_value = Tensor(1.0, mstype.float32) + self.off_value = Tensor(0.0, mstype.float32) + self.array_mul = P.MatMul() + self.reshape = P.Reshape() + self.shape = P.Shape() + + def construct(self, input_ids): + """Get a embeddings lookup table with a fixed dictionary and size.""" + input_shape = self.shape(input_ids) + + flat_ids = self.reshape(input_ids, self.shape_flat) + if self.use_one_hot_embeddings: + one_hot_ids = self.one_hot(flat_ids, self.vocab_size, self.on_value, self.off_value) + output_for_reshape = self.array_mul(one_hot_ids, self.embedding_table) + else: + output_for_reshape = self.gather(self.embedding_table, flat_ids, 0) + + out_shape = input_shape + (self.embedding_size,) + output = self.reshape(output_for_reshape, out_shape) + return output + +class AbsolutePositionalEmbedding(nn.Cell): + def __init__(self, dim, max_seq_len): + super(AbsolutePositionalEmbedding, self).__init__() + self.emb = nn.EmbeddingLookup(max_seq_len, dim) + + def construct(self, x): + _, seq_length = x.shape[0], x.shape[1] + input_position = F.tuple_to_array(F.make_range(seq_length)) + # input_position = P.Tile()(input_position, (batch_size, 1)) + return self.emb(input_position) + + +class Performer_layer(nn.Cell): + """ + Performer 模型的基本层,包含自注意力和前馈网络,用于高效处理长序列数据。 + + 参数: + - dim: int,输入特征的维度。 + - heads: int,注意力头的数量。 + - dim_head: int,每个注意力头的维度。 + - causal: bool,是否启用因果注意力机制,默认为 False。 + - nb_features: int,用于随机投影的特征数。 + - qr_uniform_q: bool,指定是否在 QR 分解后保持正交矩阵列的方向一致。 + - dropout: float,Dropout 概率,默认值为 0.9。 + + 方法: + - construct(x): 前向传播方法,使用自注意力和前馈网络对输入进行处理。 + + 功能: + 该类实现了自注意力和前馈网络的组合,支持高效的注意力机制,适用于长序列的特征学习。 + """ + def __init__(self, dim, heads, dim_head, causal=False, nb_features=None, qr_uniform_q=False, dropout=0.9): + super(Performer_layer, self).__init__() + self.SelfAttention = SelfAttention(dim, heads, dim_head, causal, nb_features, qr_uniform_q, dropout) + self.FeedForward = FeedForward(dim=dim) + self.LayerNorm = LayerNorm(dim,) + def construct(self, x): + + x = self.LayerNorm(x) + out = x + self.SelfAttention(x) + out = self.LayerNorm(out) + out = out + self.FeedForward(x) + return out + +class Performer(nn.Cell): + """ + Performer 模型,使用多个 `Performer_layer` 层实现高效的自注意力机制,适用于长序列数据的特征表示学习。 + + 参数: + - dim: int,输入特征的维度。 + - depth: int,模型层数。 + - heads: int,注意力头的数量。 + - causal: bool,是否启用因果注意力机制。 + - nb_features: int,用于随机投影的特征数,默认为 None。 + - qr_uniform_q: bool,指定是否在 QR 分解后保持正交矩阵列的方向一致。 + - dropout: float,Dropout 概率,默认为 0.9。 + + 方法: + - construct(input_tensor): 前向传播方法,依次通过每一层 `Performer_layer`,生成最终特征表示。 + + 功能: + 该类通过堆叠多层 `Performer_layer` 实现了高效的自注意力计算,适合处理长序列输入,提供高性能的序列特征学习。 + """ + def __init__(self, dim, depth, heads, causal=False, nb_features=None, qr_uniform_q=False, dropout=0.9): + super(Performer, self).__init__() + assert dim % heads == 0 + dim_head = dim//heads + layers = [] + for _ in range(depth): + layers.append(Performer_layer(dim=dim, heads=heads, + dim_head=dim_head, + causal=causal, + nb_features=nb_features, + qr_uniform_q=qr_uniform_q, + dropout=dropout)) + + self.layers = nn.CellList(layers) + + def construct(self, input_tensor): + prev_output = input_tensor + for layer_module in self.layers: + prev_output = layer_module(prev_output) + return prev_output + +class PerformerLM(nn.Cell): + """ + 基于 Performer 的语言模型 (PerformerLM),用于高效处理长序列的嵌入、位置编码、自注意力和输出预测。 + + 参数: + - num_tokens: int,词汇表的大小。 + - max_seq_len: int,最大序列长度。 + - dim: int,嵌入和特征表示的维度。 + - depth: int,模型的层数。 + - heads: int,注意力头的数量。 + - causal: bool,是否启用因果注意力。 + - nb_features: int,随机特征的数量,用于投影空间,默认为 None。 + - emb_dropout: float,嵌入层的 Dropout 概率,默认值为 0.9。 + - pf_dropout: float,前馈层的 Dropout 概率,默认值为 0.9。 + - qr_uniform_q: bool,是否在 QR 分解后保持正交矩阵列的方向一致。 + + 方法: + - construct(input_ids): 前向传播方法,生成给定输入序列的预测结果。 + + 功能: + PerformerLM 使用自注意力机制和位置嵌入实现长序列数据的建模,适合用于语言模型或其他序列预测任务。 + """ + def __init__(self, num_tokens, max_seq_len, dim, depth, heads, causal=True, + nb_features=None, emb_dropout=0.9, pf_dropout=0.9, qr_uniform_q=False): + super(PerformerLM, self).__init__() + self.max_seq_len = max_seq_len + self.dim = dim + self.num_tokens = num_tokens + self.token_emb = EmbeddingLookup(num_tokens, dim) + # self.pos_emb = AbsolutePositionalEmbedding(dim, max_seq_len) + self.pos_emb = Gene2VecPositionalEmbedding() + self.dropout = nn.Dropout(emb_dropout) + self.performer = Performer(dim, depth, heads, causal, nb_features, qr_uniform_q, pf_dropout) + self.norm = LayerNorm(dim) + self.MatMul = P.MatMul(transpose_b=True) + self.Reshape = P.Reshape() + self.to_out = nn.Dense(dim, num_tokens, dtype=ms.float32) + def construct(self, input_ids): + """ + 执行前向传播,生成输入序列的嵌入和模型预测输出。 + + 参数: + - input_ids: Tensor,输入序列的 ID,表示为形状 [batch_size, seq_len] 的张量。 + + 返回: + - x: Tensor,模型的输出,形状为 [batch_size, seq_len, num_tokens],表示每个序列位置的预测分布。 + + 功能: + - 将输入序列转换为词嵌入,并添加基因位置嵌入。 + - 通过堆叠的 Performer 层提取特征,并使用归一化层和全连接层生成最终输出。 + """ + # b, n = input_ids.shape + # assert n <= self.max_seq_len, f'sequence length {n} must be less than the max sequence length {self.max_seq_len}' + # token and positional embeddings + + x = self.token_emb(input_ids) + x += self.pos_emb(x) + x = self.dropout(x) + x = self.performer(x) + # norm and to logits + #[batch,seq,hidden] + x = self.norm(x) + # res = self.MatMul(self.Reshape(x,(-1,self.dim)), self.token_emb.embedding_table) + # return self.Reshape(res, input_ids.shape+(self.num_tokens,)) + # 5. (batch, 16906, 200) -> (batch, 16906, 7) + # 输出层 + x = self.to_out(x) + return x diff --git a/community/cv/scBERT/predict.py b/community/cv/scBERT/predict.py new file mode 100644 index 0000000000000000000000000000000000000000..201ef27e9e3fd8fdfe5f9d24314a450aa8d966c2 --- /dev/null +++ b/community/cv/scBERT/predict.py @@ -0,0 +1,179 @@ +""" +predict.py + +该模块实现了使用训练好的 PerformerLM 模型进行预测的功能,支持对单细胞数据进行推理。 + +主要功能: +- 解析命令行参数,配置模型和数据集路径。 +- 加载并处理输入数据,生成模型预测。 +- 使用日志记录预测结果。 + +依赖库: +- argparse, numpy, scanpy, mindspore, layers, utils, tqdm +""" +import argparse +import logging +import pickle as pkl + +import numpy as np +import scanpy as sc +import mindspore as ms +from mindspore import ops, Tensor, ParallelMode, nn +from mindspore.communication import init +from performer import PerformerLM + +class Identity(nn.Cell): + """ + 自定义输出层,包含卷积和全连接层,用于处理输入特征并生成最终输出。 + + 参数: + - dropout: float,Dropout 概率,用于防止过拟合,默认值为 0.1。 + - h_dim: int,隐藏层维度,决定中间全连接层的大小,默认值为 100。 + - out_dim: int,输出层的维度,通常与标签类别数一致,默认值为 10。 + + 方法: + - construct(x): 前向传播方法,将输入数据通过卷积、激活、全连接和 Dropout 层处理,输出最终结果。 + + 功能: + 该类用于在模型中构建一个简单的前馈网络,结合卷积和全连接层,适合处理一维序列数据。 + """ + def __init__(self, dropout=0.1, h_dim=100, out_dim=10): + super(Identity, self).__init__() + self.conv1 = nn.Conv2d(1, 1, (1, 200), pad_mode='valid', padding=0, has_bias=False) + self.act = nn.ReLU() + self.fc1 = nn.Dense(in_channels=SEQ_LEN, out_channels=512, has_bias=True) + self.act1 = nn.ReLU() + self.dropout1 = nn.Dropout(dropout) + self.fc2 = nn.Dense(in_channels=512, out_channels=h_dim, has_bias=True) + self.act2 = nn.ReLU() + self.dropout2 = nn.Dropout(dropout) + self.fc3 = nn.Dense(in_channels=h_dim, out_channels=out_dim, has_bias=True) + + def construct(self, x): + """ + 执行前向传播,将输入数据通过卷积、激活和全连接层处理,生成最终输出。 + + 参数: + - x: Tensor,输入张量,形状为 [batch_size, seq_len, feature_dim]。 + + 返回: + - x: Tensor,处理后的输出张量,形状为 [batch_size, out_dim],表示最终的类别预测。 + + 功能: + 该方法实现了卷积操作、激活函数、全连接层和 Dropout 的组合,用于处理输入特征并生成分类结果。 + """ + x = x[:, None, :, :] + # [batch, 1, seq_len, 200] + x = self.conv1(x) + # [batch, 1, seq_len, 1] + x = self.act(x) + x = x.view(x.shape[0], -1) + x = self.fc1(x) + x = self.act1(x) + x = self.dropout1(x) + x = self.fc2(x) + x = self.act2(x) + x = self.dropout2(x) + x = self.fc3(x) + return x + +def parse(): + """ + 解析命令行参数,配置模型和数据集的路径及其他超参数。 + + 返回: + - args: argparse.Namespace 对象,包含解析后的命令行参数,供模型和预测过程使用。 + + 功能: + 该函数定义了多个命令行参数,包括是否启用流水线、设备 ID、数据路径、模型路径等,并将这些参数解析为可用于程序的对象。 + """ + parser = argparse.ArgumentParser() + parser.add_argument("--enable_pipeline", type=bool, default=False, help='Local process rank.') + parser.add_argument("--device_id", type=int, default=-1, help='Local process rank.') + parser.add_argument("--bin_num", type=int, default=5, help='Number of bins.') + parser.add_argument("--gene_num", type=int, default=16906, help='Number of genes.') + parser.add_argument("--epoch", type=int, default=100, help='Number of epochs.') + parser.add_argument("--seed", type=int, default=2021, help='Random seed.') + parser.add_argument("--pos_embed", type=bool, default=True, help='Using Gene2vec encoding or not.') + parser.add_argument( + "--data_path", + type=str, + default='./data/Zheng68k_prepeocessed.h5ad', + help='Path of data for predict.' + ) + parser.add_argument("--model_path", type=str, default='./ckpt/ckpt-0.ckpt', help='Path of finetuned model.') + args = parser.parse_args() + return args + +if __name__ == "__main__": + # 配置日志记录到文件 'prediction_log.log' + logging.basicConfig( + filename='prediction_log.log', + filemode='a', # 'a' 表示追加模式,如果要覆盖日志文件,可以使用 'w' + format='%(asctime)s - %(levelname)s - %(message)s', + level=logging.INFO + ) + + # 解析命令行参数 + args = parse() + if args.enable_pipeline: + ms.set_context(mode=0, device_target="Ascend") + ms.set_auto_parallel_context( + parallel_mode=ParallelMode.SEMI_AUTO_PARALLEL, + pipeline_stages=2, + pipeline_result_broadcast=True + ) + init() + ms.set_seed(1) + else: + ms.set_context(variable_memory_max_size='29GB') + ms.set_context(mode=0, device_target="Ascend", device_id=0) + + # 声明全局变量 + SEED = args.seed + EPOCHS = args.epoch + SEQ_LEN = args.gene_num + 1 + CLASS = args.bin_num + 2 + POS_EMBED_USING = args.pos_embed + + # 读预测数据集 + data = sc.read_h5ad(args.data_path) + # 标签字典 + with open('label_dict', 'rb') as fp: + label_dict = pkl.load(fp) + data = data.X[:10] + + # 加载模型 + model = PerformerLM( + num_tokens=CLASS, + dim=200, + depth=6, + max_seq_len=SEQ_LEN, + heads=10, + # local_attn_heads = 0, + # g2v_position_emb = True + ) + model.to_out = Identity(dropout=0.1, h_dim=128, out_dim=label_dict.shape[0]) + path = args.model_path + ckpt = ms.load_checkpoint(path) + ms.load_param_into_net(model, ckpt) + for param in model.trainable_params(): + param.requires_grad = False + + batch_size = data.shape[0] + model.set_train(False) + pred_finals = [] + for index in range(batch_size): + full_seq = data[index].toarray()[0] + full_seq[full_seq > (CLASS - 2)] = CLASS - 2 + full_seq = np.append(full_seq, 0).astype(np.int32) + full_seq = Tensor(full_seq).astype(ms.int32) # 转换为 MindSpore Tensor + full_seq = ops.expand_dims(full_seq, 0) # 在第 0 维度添加一个维度,类似于 unsqueeze + pred_logits = model(full_seq) + pred_prob = ops.softmax(pred_logits, axis=-1) + pred_final = pred_prob.argmax(axis=-1) + pred_finals.append(pred_final) + pred_list = [] + for pred_final in pred_finals: + pred_list.append(label_dict[pred_final]) + logging.info(f"Predictions: {pred_list}") diff --git a/community/cv/scBERT/pretrain.py b/community/cv/scBERT/pretrain.py new file mode 100644 index 0000000000000000000000000000000000000000..1e11aaceeb6ee218051074c737e54c5904698916 --- /dev/null +++ b/community/cv/scBERT/pretrain.py @@ -0,0 +1,463 @@ +""" +pretrain.py + +该模块实现了使用 PerformerLM 模型进行预训练的功能,包括数据加载、模型构建、训练和验证过程。 + +主要功能: +- 解析命令行参数,配置模型和数据集的路径及其他超参数。 +- 加载并处理输入数据,生成模型的输入和标签。 +- 定义模型、优化器和损失函数,并执行训练和验证循环。 +- 使用日志记录训练和验证的结果。 + +依赖库: +- argparse, numpy, scanpy, mindspore, performer, tqdm +""" +import argparse +import logging +import math +from functools import reduce + +from dataset_pretrain import load_data +from performer import PerformerLM +from tqdm import tqdm + +import mindspore as ms +from mindspore import ops, save_checkpoint, Tensor +from mindspore.nn import Adam, CrossEntropyLoss, ExponentialDecayLR +from mindspore import value_and_grad +from mindspore.communication import init +from mindspore.communication.management import get_rank +from mindspore import nn + +model = None +loss_fn = None + +def prob_mask_like(t, prob): + return ops.uniform(t.shape, Tensor(0, dtype=ms.float32), Tensor(1, dtype=ms.float32)).float() < prob + +def mask_with_tokens(t, token_ids): + init_no_mask = ops.full_like(t, False, dtype=ms.uint8) + mask = reduce(lambda acc, el: acc | (t == el), token_ids, init_no_mask) + return Tensor(mask, dtype=ms.uint8) + +def get_mask_subset_with_prob(mask, prob): + """ + 根据给定的概率,从掩码中随机选择一部分作为掩码子集。 + + 参数: + - mask: Tensor,布尔掩码,表示哪些位置可以被掩码,形状为 [batch, seq_len]。 + - prob: float,掩码概率,表示要掩码的元素占总元素的比例。 + + 返回: + - Tensor,布尔掩码,形状与输入相同,指示哪些位置被选为掩码。 + + 功能: + 该函数根据给定的概率,从输入掩码中随机选择一定数量的位置,并返回更新后的掩码。 + 只有标记为可掩码的元素才有可能被选为掩码。 + """ + batch, seq_len = mask.shape + max_masked = math.ceil(prob * seq_len) # num of mask of a single sequence in average + num_tokens = mask.sum(axis=-1, keepdims=True) # num of pure tokens of each sequence except special tokens + mask_excess = ops.cat(( + ops.zeros(size=(batch), dtype=ms.float32), + ops.arange(1, seq_len, dtype=ms.float32).repeat(batch) + )).reshape(batch, seq_len) + mask_excess = (mask_excess >= (num_tokens * prob).ceil()) # only 15% of pure tokens can be masked + mask_excess = ops.Reshape()(mask_excess, (batch, seq_len)) + mask_excess = mask_excess[:, :max_masked] # get difference between 15% of pure tokens and 15% of all tokens + rand = ops.rand((batch, seq_len)).masked_fill(~mask, -1e9) # rand (0-1) as prob, special token use -1e9 + _, sampled_indices = rand.topk(max_masked, dim=-1) # get index of topk prob to mask + sampled_indices = (sampled_indices + 1).masked_fill(mask_excess, 0) # delete difference of mask not pure + new_mask = ops.zeros((batch, seq_len + 1), dtype=ms.uint8) # get (batch, seq_len) shape zero matrix + new_mask = new_mask.scatter(-1, sampled_indices, ops.ones(shape=ops.shape(sampled_indices), dtype=ms.uint8)) # set masks in zero matrix as 1 + new_mask = ops.Cast()(new_mask, ms.uint8) + return new_mask[:, 1:] # the final mask, True is mask + +def data_mask( + data, + mask_prob=None, + replace_prob=None, + num_tokens=None, + random_token_prob=None, + mask_token_id=None, + pad_token_id=None, + mask_ignore_token_ids=None + ): + """ + 对输入数据进行掩码处理,包括随机掩码和随机替换。 + + 参数: + - data: Tensor,输入数据,形状为 [batch_size, seq_len]。 + - mask_prob: float,掩码概率,表示要掩码的元素占总元素的比例。 + - replace_prob: float,替换概率,表示要用随机令牌替换的元素的比例。 + - num_tokens: int,词汇表大小,用于生成随机令牌。 + - random_token_prob: float,随机替换的概率。 + - mask_token_id: int,掩码令牌的 ID。 + - pad_token_id: int,填充令牌的 ID。 + - mask_ignore_token_ids: list,指定不应被掩码的令牌 ID 列表。 + + 返回: + - masked_input: Tensor,经过掩码处理的输入数据。 + - labels: Tensor,包含原始数据中被掩码的位置,未被掩码的位置用 pad_token_id 填充。 + + 功能: + 该函数根据给定的掩码概率和替换概率,对输入数据进行掩码处理,返回经过处理的输入数据和相应的标签。 + 掩码处理可以用于训练语言模型,帮助模型学习上下文信息。 + """ + global MASK_PROB, REPLACE_PROB, RANDOM_TOKEN_PROB, MASK_TOKEN_ID, PAD_TOKEN_ID, MASK_IGNORE_TOKEN_IDS + replace_prob = REPLACE_PROB + mask_prob = MASK_PROB + random_token_prob = RANDOM_TOKEN_PROB + mask_token_id = MASK_TOKEN_ID + pad_token_id = PAD_TOKEN_ID + mask_ignore_token_ids = MASK_IGNORE_TOKEN_IDS + + mask_ignore_token_ids = set([*mask_ignore_token_ids, pad_token_id]) + # do not mask [pad] tokens, or any other tokens in the tokens designated to be excluded ([cls], [sep]) + # also do not include these special tokens in the tokens chosen at random + no_mask = mask_with_tokens(data, mask_ignore_token_ids) # ignore_token as True, will not be masked later + mask = get_mask_subset_with_prob(~no_mask, mask_prob) # get the True/False mask matrix + # get mask indices + ## mask_indices = torch.nonzero(mask, as_tuple=True) # get the index of mask(nonzero value of mask matrix) + # mask input with mask tokens with probability of `replace_prob` (keep tokens the same with probability 1 - replace_prob) + masked_input = data + # if random token probability > 0 for mlm + if random_token_prob > 0: + assert num_tokens is not None, ( + 'num_tokens keyword must be supplied when instantiating MLM ' + 'if using random token replacement' + ) + + random_token_prob = prob_mask_like(data, random_token_prob) # get the mask matrix of random token replace + random_tokens = ops.randint(0, num_tokens, data.shape) # generate random token matrix with the same shape as input + random_no_mask = mask_with_tokens(random_tokens, mask_ignore_token_ids) # not masked matrix for the random token matrix + random_token_prob &= ~random_no_mask # get the pure mask matrix of random token replace + random_indices = ops.nonzero(random_token_prob, as_tuple=True) # index of random token replace + masked_input[random_indices] = random_tokens[random_indices] # replace some tokens by random token + # [mask] input + replace_prob = prob_mask_like(data, replace_prob) # get the mask matrix of token being masked + masked_input = masked_input.masked_fill(ops.Cast()(mask * replace_prob, ms.bool_), mask_token_id) # get the data has been masked by mask_token + # mask out any tokens to padding tokens that were not originally going to be masked + labels = data.masked_fill(~mask, pad_token_id) # the label of masked tokens + return masked_input, labels + +def build_model(args): + """ + 构建并初始化 PerformerLM 模型。 + + 参数: + - args: Namespace 对象,包含模型参数和配置选项,例如类别数量、嵌入维度、层数等。 + + 返回: + - model: PerformerLM 对象,初始化后的模型实例。 + + 功能: + 该函数根据传入的参数配置创建一个 PerformerLM 模型实例,并打印模型参数数量和名称。 + 如果启用了流水线并行,模型将进行相应的初始化以支持流水线训练。 + """ + global CLASS, SEQ_LEN, POS_EMBED_USING, model + model = PerformerLM( + num_tokens=CLASS, # 7 + dim=200, + depth=1, + max_seq_len=SEQ_LEN, # 16907 + heads=10, + #local_attn_heads = 0, + ) + print("build model success.") + count = sum([item.size for item in model.get_parameters()]) + names = [item.name for item in model.trainable_params()] + + print("param count is {}, names: {}, count: {}".format(count, str(names), len(names))) + + if args.enable_pipeline: + model.init_pipeline() + model.performer.layers[0].init_pipeline() + model.performer.layers[0].attention.init_pipeline() + return model + + +def build_optimizer_and_scheduler(model): + global LEARNING_RATE, PAD_TOKEN_ID, loss_fn, optimizer + # optimizer + optimizer = Adam(params=model.trainable_params(), learning_rate=LEARNING_RATE) + loss_fn = CrossEntropyLoss(ignore_index=PAD_TOKEN_ID, reduction='mean') + print("build optimizer success.") + return optimizer, loss_fn + +def train_one_epoch(train_dataloader, grad_fn, optimizer, pp_grad_reducer): + """ + 在一个训练周期内执行模型训练,计算损失和准确率。 + + 参数: + - train_dataloader: 数据加载器,提供训练数据的批量迭代。 + - grad_fn: 函数,用于计算模型的梯度和损失。 + - optimizer: 优化器,用于更新模型参数。 + - pp_grad_reducer: 用于处理梯度的对象,支持并行训练。 + + 返回: + - running_loss: float,当前 epoch 的平均损失值。 + - cum_acc: float,当前 epoch 的训练准确率。 + + 功能: + 该函数遍历训练数据集,计算每个批次的损失,并更新模型参数。它还计算准确率,并在每个 epoch 结束时返回总体损失和准确率。 + """ + global PAD_TOKEN_ID, model, PIPELINE, BATCH_SIZE, SEQ_LEN, DP, lr_schedule + running_loss = 0.0 + model.set_train(True) + correct_num = 0 + val_num = 0 + for index, (data,) in enumerate(tqdm(train_dataloader.create_tuple_iterator())): + data, orig_labels = data_mask(data) + labels = ops.repeat_elements(orig_labels, rep=7, axis=-1) + labels = ops.cast(labels, dtype=ms.float32) + if (labels.shape[0] % BATCH_SIZE) != 0: + continue + labels = ops.reshape(labels, (BATCH_SIZE, SEQ_LEN, 7)) + if PIPELINE: + loss, grads = grad_fn(data, labels) + grads = pp_grad_reducer(grads) + elif DP: + (loss, logits), grads = grad_fn(data, labels) + grads = pp_grad_reducer(grads) + else: + (loss, logits), grads = grad_fn(data, labels) + optimizer(grads) + lr = lr_schedule(index) + optimizer.learning_rate = lr + # 累加损失 + running_loss += loss.item() / (SEQ_LEN*BATCH_SIZE*7) + # 计算精度 + if not PIPELINE: + labels = ops.repeat_elements(orig_labels, rep=7, axis=-1) + labels = ops.reshape(labels, (-1, SEQ_LEN, 7)) + labels = ops.cast(labels, dtype=ms.float32) + final = ops.softmax(logits, axis=-1)[..., 1:-1] # (bs, seq_len, 7) + final = final.argmax(axis=-1) + 1 # # (bs, seq_len) + correct_num += ( + ops.mul( + Tensor(orig_labels != PAD_TOKEN_ID, dtype=ms.uint8), + Tensor(final == orig_labels, dtype=ms.uint8) + ) + .sum(axis=-1) + .sum() + ) + + val_num += Tensor(orig_labels != PAD_TOKEN_ID, dtype=ms.uint8).sum(axis=-1).sum() + del data, labels, logits, final, orig_labels + + return running_loss, 100 * correct_num / val_num + +def eval_one_epoch(val_dataloader): + """ + 在验证数据集上执行一个评估周期,计算损失和准确率。 + + 参数: + - val_dataloader: 数据加载器,提供验证数据的批量迭代。 + + 返回: + - val_loss: float,当前验证周期的平均损失值。 + - val_acc: float,当前验证周期的准确率。 + + 功能: + 该函数遍历验证数据集,计算每个批次的损失,并记录预测结果。它还计算总体准确率,并返回平均损失和准确率。 + """ + global PAD_TOKEN_ID, loss_fn, model, SEQ_LEN + model.set_train(False) + predictions = [] + truths = [] + running_loss = 0.0 + print("========== 开始验证") + correct_num = 0 + val_num = 0 + for _, (data,) in enumerate(tqdm(val_dataloader.create_tuple_iterator())): + data, ori_labels = data_mask(data) + ori_labels = ops.cast(ori_labels, ms.float32) + labels = ops.repeat_elements(ori_labels, rep=7, axis=-1) + labels = ops.reshape(labels, (-1, SEQ_LEN, 7)) + labels = ops.cast(labels, dtype=ms.float32) + logits = model(data) + loss = loss_fn(logits, labels) + running_loss += loss.item() / (SEQ_LEN*BATCH_SIZE*7) + final = ops.softmax(logits, axis=-1)[..., 1:-1] + final = final.argmax(axis=-1) + 1 + correct_num += ( + ops.mul( + Tensor(ori_labels != PAD_TOKEN_ID, dtype=ms.uint8), + Tensor(final == ori_labels, dtype=ms.uint8) + ) + .sum(axis=-1) + .sum() + ) + + val_num += Tensor(ori_labels != PAD_TOKEN_ID, dtype=ms.uint8).sum(axis=-1).sum() + del data, labels, logits, final, ori_labels + val_loss = running_loss / len(val_dataloader) + val_acc = 100 * correct_num / val_num + del predictions, truths + return val_loss, val_acc + +def train(optimizer, train_dataloader, val_dataloader): + """ + 执行训练过程,迭代多个 epoch,计算训练和验证的损失与准确率。 + + 参数: + - optimizer: 优化器,用于更新模型参数。 + - train_dataloader: 数据加载器,提供训练数据的批量迭代。 + - val_dataloader: 数据加载器,提供验证数据的批量迭代。 + + 功能: + 该函数管理训练过程,通过多个 epoch 训练模型,并在每个 epoch 结束时进行验证,记录和打印训练与验证的损失和准确率。 + """ + global EPOCHS, VALIDATE_EVERY, MODEL_NAME, loss_fn, PIPELINE, DP + + train_num_step = len(train_dataloader) + if PIPELINE: + grad_fn = value_and_grad(forward_pipeline, grad_position=None, weights=optimizer.parameters) + pp_grad_reducer = nn.PipelineGradReducer(optimizer.parameters) + elif DP: + grad_fn = value_and_grad(forward, grad_position=None, weights=model.trainable_params(), has_aux=True) + pp_grad_reducer = nn.DistributedGradReducer(optimizer.parameters, mean=True) + else: + grad_fn = value_and_grad(forward, grad_position=None, weights=model.trainable_params(), has_aux=True) + pp_grad_reducer = None + + for epoch in range(EPOCHS): + running_loss, cum_acc = train_one_epoch(train_dataloader, grad_fn, optimizer, pp_grad_reducer) + # log epoch的信息 + epoch_loss = running_loss / train_num_step + logging.info(f' == Epoch: {epoch} | Training Loss: {epoch_loss:.6f} | Accuracy: {cum_acc.item():6.4f}% ==') + + # 进行一次验证 + if epoch % VALIDATE_EVERY == 0: + val_loss, val_acc = eval_one_epoch(val_dataloader) + logging.info(f' == Epoch: {epoch} | Validation Loss: {val_loss} | Accuracy: {val_acc.item()}% ==') + if get_rank() == 0: + # 存模型 + ckpt_dir = "./" + PRETRAIN_PATH + if not os.path.exists(ckpt_dir): + os.makedirs(ckpt_dir, exist_ok=True) + ckpt_file = f"pretrain-{epoch}.ckpt" + ckpt_path = os.path.join(ckpt_dir, ckpt_file) + save_checkpoint(model, ckpt_path) + +def forward_pipeline(data, label): + global net_with_loss + return net_with_loss(data, label) + +def forward(data, label): + global model, loss_fn + logits = model(data) + loss = loss_fn(logits, label) + return loss, logits + +def setup_pipeline(model, loss_fn): + net_with_loss = nn.PipelineCell(nn.WithLossCell(model, loss_fn), micro_size=4) + net_with_loss.set_train() + return net_with_loss + +def create_lr_schedule(): + lr_schedule = ExponentialDecayLR(learning_rate=0.1, decay_rate=0.9, decay_steps=100) + return lr_schedule + +def parse(): + """ + 解析命令行参数,配置模型训练和数据预处理的相关设置。 + + 返回: + - args: argparse.Namespace 对象,包含解析后的命令行参数,供训练过程使用。 + + 功能: + 该函数定义多个命令行参数,包括模型超参数(如学习率、批量大小等)、数据路径等,并将其解析为可用于程序的对象。 + """ + parser = argparse.ArgumentParser() + parser.add_argument("--enable_pipeline", type=bool, default=True, help='Local process rank.') + parser.add_argument("--device_id", type=int, default=-1, help='Local process rank.') + parser.add_argument("--bin_num", type=int, default=5, help='Number of bins.') + parser.add_argument("--gene_num", type=int, default=16906, help='Number of genes.') + parser.add_argument("--epoch", type=int, default=100, help='Number of epochs.') + parser.add_argument("--seed", type=int, default=2021, help='Random seed.') + parser.add_argument("--batch_size", type=int, default=4, help='Number of batch size.') + parser.add_argument("--learning_rate", type=float, default=1e-4, help='Learning rate.') + parser.add_argument( + "--valid_every", + type=int, + default=1, + help='Number of training epochs between twice validation.' + ) + parser.add_argument("--mask_prob", type=float, default=0.15, help='Probability of masking.') + parser.add_argument( + "--replace_prob", + type=float, + default=0.9, + help='Probability of replacing with [MASK] token for masking.' + ) + parser.add_argument("--pos_embed", type=bool, default=True, help='Using Gene2vec encoding or not.') + parser.add_argument( + "--data_path", + type=str, + default='./data/panglao_10000.h5ad', + help='Path of data for pretraining.' + ) + parser.add_argument("--model_name", type=str, default='panglao_pretrain', help='Pretrained model name.') + args = parser.parse_args() + return args + +if __name__ == "__main__": + # 创建日志记录器,文件保存日志,同时控制台也输出日志 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler("training_log.log", mode='a'), # 保存日志到文件 + logging.StreamHandler() # 同时输出日志到控制台 + ] + ) + + args = parse() + args.enable_pipeline = False + args.enable_dp = True + if args.enable_pipeline: + ms.set_context(mode=0, device_target="Ascend") + ms.reset_auto_parallel_context() + ms.set_auto_parallel_context( + parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, + pipeline_stages=4, + pipeline_result_broadcast=True + ) + init() + ms.set_seed(1) + elif args.enable_dp: + ms.set_context(mode=0, device_target="Ascend", max_device_memory="29GB") + ms.reset_auto_parallel_context() + ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.DATA_PARALLEL, gradients_mean=True) + init() + ms.set_seed(1) + else: + ms.set_context(mode=0, device_target="Ascend", device_id=1) + + SEED = args.seed + EPOCHS = args.epoch + BATCH_SIZE = args.batch_size + LEARNING_RATE = args.learning_rate + SEQ_LEN = 5000 + VALIDATE_EVERY = args.valid_every + CLASS = args.bin_num + 2 + MASK_PROB = args.mask_prob + REPLACE_PROB = args.replace_prob + RANDOM_TOKEN_PROB = 0. + MASK_TOKEN_ID = CLASS - 1 + PIPELINE = args.enable_pipeline + PAD_TOKEN_ID = CLASS - 1 + MASK_IGNORE_TOKEN_IDS = [0] + POS_EMBED_USING = args.pos_embed + MODEL_NAME = args.model_name + DP = args.enable_dp + train_dataloader, val_dataloader = load_data(args.data_path, CLASS, SEED, BATCH_SIZE, SEQ_LEN, args) + model = build_model(args) + + optimizer, loss_fn = build_optimizer_and_scheduler(model) + if args.enable_pipeline: + net_with_loss = setup_pipeline(model, loss_fn) + lr_schedule = create_lr_schedule() + train(optimizer, train_dataloader, val_dataloader) diff --git a/community/cv/scBERT/run_distribute_finetune.sh b/community/cv/scBERT/run_distribute_finetune.sh new file mode 100644 index 0000000000000000000000000000000000000000..36e993d62d375b2b43ec77379be8137354c0f208 --- /dev/null +++ b/community/cv/scBERT/run_distribute_finetune.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +echo "==============================================================================================================" +echo "Please run the script as: " +echo "bash run.sh" +echo "==============================================================================================================" + + +msrun --worker_num=8 --local_worker_num=8 --master_port=8118 --log_dir=msrun_log --join=True --cluster_time_out=300 finetune.py diff --git a/community/cv/scBERT/run_distribute_pretrain.sh b/community/cv/scBERT/run_distribute_pretrain.sh new file mode 100644 index 0000000000000000000000000000000000000000..d941a94aa90baa58481deeea50648dc9e7e4de00 --- /dev/null +++ b/community/cv/scBERT/run_distribute_pretrain.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +echo "==============================================================================================================" +echo "Please run the script as: " +echo "bash run.sh" +echo "==============================================================================================================" + + +msrun --worker_num=8 --local_worker_num=8 --master_port=8118 --log_dir=msrun_log --join=True --cluster_time_out=300 pretrain.py diff --git a/community/cv/scBERT/utils.py b/community/cv/scBERT/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..2e79dbf1d31fa8d24e515b508379e0572ddce0b6 --- /dev/null +++ b/community/cv/scBERT/utils.py @@ -0,0 +1,32 @@ +""" +utils.py + +该模块包含一些辅助函数和上下文管理器,旨在提供通用的功能,例如参数检查、元组处理和模块查找。 + +主要功能: +- 检查值是否存在。 +- 检查 Tensor 是否为空。 +- 提供默认值。 +- 上下文管理器用于 null 上下文。 +- 将值转换为元组。 +- 查找特定类型的模块。 +- 路由参数以适应不同深度的结构。 + +依赖库: +- contextlib +""" +from contextlib import contextmanager + +def exists(val): + return val is not None +def empty(tensor): # mindspore的Tensor.size 返回 张量中元素的个数 + return tensor.size == 0 +def default(val, d): + return val if exists(val) else d +@contextmanager +def null_context(): + yield +def cast_tuple(val): + return (val,) if not isinstance(val, tuple) else val +def find_modules(nn_module, module_type): + return [module for module in nn_module.cells() if isinstance(module, module_type)] # mindspore通过Cell.cells() 获取到子cell。 diff --git a/community/cv/scGen/README_CN.md b/community/cv/scGen/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..bd12f1a59b5dfee2b0300132aeb2330608cbe4eb --- /dev/null +++ b/community/cv/scGen/README_CN.md @@ -0,0 +1,148 @@ +# 目录 + +- [目录](#目录) +- [scGen描述](#scGen描述) +- [模型架构](#模型架构) +- [数据集](#数据集) +- [环境要求](#环境要求) +- [快速入门](#快速入门) +- [脚本说明](#脚本说明) + - [脚本和样例代码](#脚本和样例代码) + - [脚本参数](#脚本参数) + - [训练过程](#训练过程) + - [推理过程](#推理过程) + - [用法](#用法) + - [结果](#结果) +- [随机情况说明](#随机情况说明) +- [ModelZoo主页](#modelzoo主页) + +# scGen描述 + +scGen是一种生成模型,用于预测不同细胞类型、研究和物种中的单细胞扰动响应(发表于《Nature Methods》,2019年)。 + +# 模型架构 + +scGEN 的模型架构基于变分自编码器(VAE),包括一个编码器和一个解码器。编码器将单细胞基因表达数据映射到一个潜在空间,生成细胞的隐变量表示;解码器则从潜在空间生成新的基因表达数据。SCGEN 通过在潜在空间中施加条件编码,实现对不同条件下细胞状态的生成与转换。 + +# 数据集 + +使用的数据集:[pancreas](https://drive.google.com/drive/folders/1v3qySFECxtqWLRhRTSbfQDFqdUCAXql3) + +- 名称: Panglao scRNA-seq数据集 +- 格式: H5AD文件 +- 路径: ./data/pancreas.h5ad +- 数据大小: 176MB + +支持的数据集:[pancreas] 或者与 AnnData 格式相同的数据集 + +- 目录结构如下,由用户定义目录和文件的名称 + +![image](demo/predict-demo.jpg) + +- 如果用户需要自定义数据集,则需要将数据集格式转化为AnnData数据格式。 + +# 环境要求 + +- 硬件(Ascend) + - 使用Ascend处理器来搭建硬件环境。 +- 框架 + - [MindSpore](https://www.mindspore.cn/install) +- 如需查看详情,请参见如下资源 + - [MindSpore教程](https://www.mindspore.cn/tutorials/zh-CN/master/index.html) + - [MindSpore Python API](https://www.mindspore.cn/docs/zh-CN/master/api_python/mindspore.html) + +# 快速入门 + +- 通过官方网站安装Mindspore后,您可以按照如下步骤进行训练 + +```shell +# 单卡训练 +python train_model.py +``` + +# 脚本说明 + +## 脚本和样例代码 + +```text + |----data + |----demo + |----AnnData.png + |----models + |----vae.py + |----utils + |----data_utils.py + |----model_utils.py + |----batch_correction.py + |----batch_effect_removal.py + |----README_CN.md + |----train.py + |----train_model.py +``` + +## 脚本参数 + +预训练参数(pretrain.py): + +```text +--train_path 数据路径 +--model_to_use 模型路径 +--batch_size 批次大小, 默认: 32 +--X_dim 基因表达矩阵的特征维度 +--z_dim 潜在空间维度,默认值:100 +--lr 学习率,默认值:0.001 +--dr_rate dropout,默认值:0.2 +``` + +## 训练过程 + +在Ascend设备上,使用python脚本直接开始训练(单卡) + +- 第一步\ + python命令启动 + + ```shell + # 单卡训练 + python train_model.py + ``` + +```text + + Epoch [1/100], Loss: 981.0034 + Epoch [2/100], Loss: 939.3733 + Epoch [3/100], Loss: 922.7879 + Epoch [4/100], Loss: 913.1795 + Epoch [5/100], Loss: 905.8361 + Epoch [6/100], Loss: 900.6747 + +``` + +## 推理过程 + +**推理前需使用train_model.py文件生成的模型检查点文件。** + +### 用法 + +执行完整的推理脚本如下: + +```shell + +python batch_effect_removal.py + +``` + +### 结果 + +去批次结果保存在/batch_removal_data/1.h5ad中。 + +# 随机情况说明 + +在训练中存在以下随机性来源: + +1. 数据和索引的随机打乱 +2. 潜在空间中的随机噪声生成 +3. 样本生成时的噪声引入 + +# ModelZoo主页 + +请浏览官网[主页](https://gitee.com/mindspore/models)。 \ No newline at end of file diff --git a/community/cv/scGen/batch_correction.py b/community/cv/scGen/batch_correction.py new file mode 100644 index 0000000000000000000000000000000000000000..c55e1b5aaf1451c512343dd8da2d5f02e39ac189 --- /dev/null +++ b/community/cv/scGen/batch_correction.py @@ -0,0 +1,96 @@ +""" +batch_correction.py + +该模块实现了批次效应去除的功能,提供了基于给定模型对数据进行批次校正的操作。 + +主要功能: +- 从模型中提取潜在变量。 +- 根据细胞类型进行批次去除。 +- 处理不同细胞类型的潜在数据并校正。 +- 返回校正后的数据和共享的校正数据。 + +依赖库: +- numpy +- scanpy +- anndata +- utils.model_utils +""" +import numpy as np +import scanpy as sc +import anndata +from utils.model_utils import give_me_latent, reconstruct + +def vector_batch_removal(model, data): + """ + 从给定模型和数据中移除批次效应。 + + 参数: + - model: 训练好的模型,用于提取潜在变量和重构数据。 + - data: Anndata 对象,包含要校正的数据。 + + 返回: + - corrected: Anndata 对象,包含校正后的数据。 + - corrected_shared: Anndata 对象,包含共享的校正数据。 + + 功能: + 该函数提取输入数据的潜在变量,识别并处理不同细胞类型的批次效应,返回校正后的数据和共享的校正数据。 + """ + latent_all = give_me_latent(model, data.X) + latent_ann = sc.AnnData(latent_all) + latent_ann.obs["cell_type"] = data.obs["cell_type"].tolist() + latent_ann.obs["batch"] = data.obs["batch"].tolist() + latent_ann.obs["sample"] = data.obs["sample"].tolist() + + unique_cell_types = np.unique(latent_ann.obs["cell_type"]) + shared_anns = [] + not_shared_ann = [] + + for cell_type in unique_cell_types: + temp_cell = latent_ann[latent_ann.obs["cell_type"] == cell_type] + if len(np.unique(temp_cell.obs["batch"])) < 2: + cell_type_ann = latent_ann[latent_ann.obs["cell_type"] == cell_type] + not_shared_ann.append(cell_type_ann) + continue + + print(cell_type) + temp_cell = latent_ann[latent_ann.obs["cell_type"] == cell_type] + batch_list = {} + max_batch = 0 + max_batch_ind = "" + batchs = np.unique(temp_cell.obs["batch"]) + + for i in batchs: + temp = temp_cell[temp_cell.obs["batch"] == i] + if max_batch < len(temp): + max_batch = len(temp) + max_batch_ind = i + batch_list[i] = temp + + max_batch_ann = batch_list[max_batch_ind] + for study in batch_list: + delta = np.average(max_batch_ann.X, axis=0) - np.average(batch_list[study].X, axis=0) + batch_list[study] = batch_list[study].copy() + batch_list[study].X = delta + batch_list[study].X + corrected = anndata.concat(list(batch_list.values())) + shared_anns.append(corrected) + + all_shared_ann = anndata.concat(shared_anns) if shared_anns else sc.AnnData() + all_not_shared_ann = anndata.concat(not_shared_ann) if not_shared_ann else sc.AnnData() + all_corrected_data = anndata.concat([all_shared_ann, all_not_shared_ann]) + + corrected_data = reconstruct(model, all_corrected_data.X, use_data=True) + corrected = sc.AnnData(corrected_data) + corrected.obs["cell_type"] = all_corrected_data.obs["cell_type"].tolist() + corrected.obs["study"] = all_corrected_data.obs["sample"].tolist() + corrected.var_names = data.var_names.tolist() + + if all_shared_ann.n_obs > 0: + corrected_shared_data = reconstruct(model, all_shared_ann.X, use_data=True) + corrected_shared = sc.AnnData(corrected_shared_data) + corrected_shared.obs["cell_type"] = all_shared_ann.obs["cell_type"].tolist() + corrected_shared.obs["study"] = all_shared_ann.obs["sample"].tolist() + corrected_shared.var_names = data.var_names.tolist() + else: + corrected_shared = sc.AnnData() + + return corrected, corrected_shared diff --git a/community/cv/scGen/batch_effect_removal.py b/community/cv/scGen/batch_effect_removal.py new file mode 100644 index 0000000000000000000000000000000000000000..fed7a70df9eb28eadee3fc596145ee35eb21eca8 --- /dev/null +++ b/community/cv/scGen/batch_effect_removal.py @@ -0,0 +1,60 @@ +""" +batch_effect_removal.py + +该模块实现了批次效应去除的功能,使用预训练的 VAE 模型处理单细胞 RNA 测序数据,并将结果保存到文件。 + +主要功能: +- 加载数据集。 +- 初始化并加载已训练的 VAE 模型。 +- 使用 `vector_batch_removal` 函数去除批次效应。 +- 对结果进行后处理,将细胞类型映射到新列。 +- 将处理后的数据保存为 .h5ad 文件。 + +依赖库: +- mindspore +- models.vae +- utils.data_utils +- anndata +""" +import mindspore as ms +from models.vae import VAE +from utils.data_utils import load_data +from batch_correction import vector_batch_removal + +# 设置MindSpore上下文 +ms.set_context(mode=ms.GRAPH_MODE, device_target="Ascend") + +# 配置参数 +train_path = "./data/pancreas.h5ad" +model_path = "./models/scGen/scgen.pt" +output_path = "./batch_removal_data/1.h5ad" + +def main(): + # 加载数据 + data, _, _ = load_data(train_path) + gex_size = data.X.shape[1] + + # 初始化并加载已训练模型 + model = VAE(input_dim=gex_size, z_dim=100, dr_rate=0.2) + param_dict = ms.load_checkpoint(model_path) + ms.load_param_into_net(model, param_dict) + print("模型加载完毕。") + + # 批次效应去除 + all_data, _ = vector_batch_removal(model, data) + + # 后处理 + top_cell_types = all_data.obs["cell_type"].value_counts().index.tolist()[:7] + if "not applicable" in top_cell_types: + top_cell_types.remove("not applicable") + + all_data.obs["celltype"] = "others" + for cell_type in top_cell_types: + all_data.obs.loc[all_data.obs["cell_type"] == cell_type, "celltype"] = cell_type + + # 保存结果 + all_data.write(output_path) + print(f"scGen batch corrected pancreas has been saved in {output_path}") + +if __name__ == "__main__": + main() diff --git a/community/cv/scGen/demo/AnnData.png b/community/cv/scGen/demo/AnnData.png new file mode 100644 index 0000000000000000000000000000000000000000..e6aa27577a0d28323dcadf6deffab18faf0dbf1c Binary files /dev/null and b/community/cv/scGen/demo/AnnData.png differ diff --git a/community/cv/scGen/models/vae.py b/community/cv/scGen/models/vae.py new file mode 100644 index 0000000000000000000000000000000000000000..fd43608975f97576ab6ef659765759e036013bf4 --- /dev/null +++ b/community/cv/scGen/models/vae.py @@ -0,0 +1,95 @@ +""" +vae.py + +该模块实现了变分自编码器 (VAE) 的功能,使用 MindSpore 框架构建模型以进行数据重建和生成。 + +主要功能: +- 定义变分自编码器模型,包含编码器和解码器结构。 +- 提供编码、重参数化和解码的函数。 +- 实现前向传播过程,将输入数据转换为重建数据。 + +依赖库: +- numpy +- mindspore +""" +import mindspore.ops as ops +import mindspore.nn as nn +from mindspore import Tensor, dtype as mstype + +class VAE(nn.Cell): + """ + 变分自编码器 (VAE) 类,用于实现潜在变量建模。 + + VAE 是一种生成模型,能够将输入数据映射到潜在空间并从该空间重建数据。 + 本类实现了 VAE 的编码器、解码器和重参数化技巧。 + + 属性: + - encoder: nn.SequentialCell,构成 VAE 编码器的层。 + - decoder: nn.SequentialCell,构成 VAE 解码器的层。 + - fc_mean: nn.Dense,生成潜在空间均值的全连接层。 + - fc_var: nn.Dense,生成潜在空间对数方差的全连接层。 + - exp: ops.Exp,计算指数。 + - randn_like: ops.StandardNormal,生成标准正态分布的随机数。 + + 参数: + - input_dim: int,输入数据的特征维度。 + - hidden_dim: int,可选,隐藏层维度,默认为 800。 + - z_dim: int,可选,潜在空间的维度,默认为 100。 + - dr_rate: float,可选,丢弃率,默认为 0.2。 + """ + def __init__(self, input_dim, hidden_dim=800, z_dim=100, dr_rate=0.2): + super(VAE, self).__init__() + + # =============================== Q(z|X) ====================================== + self.encoder = nn.SequentialCell([ + nn.Dense(input_dim, hidden_dim, has_bias=False), + nn.BatchNorm1d(hidden_dim), + nn.LeakyReLU(alpha=0.01), + nn.Dropout(p=dr_rate), + nn.Dense(hidden_dim, hidden_dim, has_bias=False), + nn.BatchNorm1d(hidden_dim), + nn.LeakyReLU(), + nn.Dropout(p=dr_rate), + ]) + self.fc_mean = nn.Dense(hidden_dim, z_dim) + self.fc_var = nn.Dense(hidden_dim, z_dim) + + # =============================== P(X|z) ====================================== + self.decoder = nn.SequentialCell([ + nn.Dense(z_dim, hidden_dim, has_bias=False), + nn.BatchNorm1d(hidden_dim), + nn.LeakyReLU(alpha=0.01), + nn.Dropout(p=dr_rate), + nn.Dense(hidden_dim, hidden_dim, has_bias=False), + nn.BatchNorm1d(hidden_dim), + nn.LeakyReLU(), + nn.Dropout(p=dr_rate), + nn.Dense(hidden_dim, input_dim), + nn.ReLU(), + ]) + + self.exp = ops.Exp() + self.randn_like = ops.StandardNormal() + + def encode(self, x): + h = self.encoder(x) + mean = self.fc_mean(h) + log_var = self.fc_var(h) + return mean, log_var + + def reparameterize(self, mu, log_var): + std = self.exp(0.5 * log_var) + shape_tuple = ops.Shape()(std) + shape = Tensor(list(shape_tuple), mstype.int32) + eps = self.randn_like(shape) + return mu + eps * std + + def decode(self, z): + x_hat = self.decoder(z) + return x_hat + + def construct(self, x): + mu, log_var = self.encode(x) + z = self.reparameterize(mu, log_var) + x_hat = self.decode(z) + return x_hat, mu, log_var diff --git a/community/cv/scGen/train.py b/community/cv/scGen/train.py new file mode 100644 index 0000000000000000000000000000000000000000..02a2167b58091d96628d7cf5a7714e39ef9835de --- /dev/null +++ b/community/cv/scGen/train.py @@ -0,0 +1,64 @@ +""" +训练模块用于实现变分自编码器(VAE)的训练过程。 + +主要功能: +- 定义训练函数,该函数包括训练循环和损失计算。 +- 支持多轮训练并通过批量数据更新模型参数。 +- 在每个训练周期后输出平均损失,并可选择性地保存训练好的模型。 + +依赖库: +- numpy: 用于处理数据数组和生成随机排列。 +- mindspore: 深度学习框架,提供模型训练、优化和张量操作的功能。 +""" +import numpy as np +import mindspore as ms +from mindspore import ops, Tensor +from mindspore.train.serialization import save_checkpoint + +def train(model, optimizer, data, n_epochs, batch_size=32, model_path=None): + """ + 训练变分自编码器(VAE)模型。 + + 参数: + - model: VAE 模型实例,定义了前向传播和参数更新的行为。 + - optimizer: 优化器,用于更新模型的可训练参数。 + - data: 训练数据,形状为 [样本数, 特征维度]。 + - n_epochs: int,训练的总轮数。 + - batch_size: int,可选,批量大小,默认为 32。 + - model_path: str,可选,保存模型的路径。 + + 功能: + 该函数执行训练循环,包括数据的随机排列、批量处理和损失计算。 + 在每个训练周期后输出平均损失,并在指定路径保存训练好的模型。 + """ + model.set_train() + data_size = data.shape[0] + + for epoch in range(n_epochs): + permutation = np.random.permutation(data_size) + data = data[permutation, :] + train_loss = 0 + + for i in range(0, data_size, batch_size): + batch_data_np = data[i:i + batch_size] + batch_data = Tensor(batch_data_np, ms.float32) + + def forward_fn(batch_data): + x_hat, mu, log_var = model(batch_data) + recon_loss = 0.5 * ops.reduce_sum(ops.mse_loss(x_hat, batch_data)) + kl_loss = 0.5 * ops.reduce_sum(ops.exp(log_var) + ops.square(mu) - 1 - log_var) + vae_loss = recon_loss + 0.00005 * kl_loss + return vae_loss + + grads = ops.GradOperation(get_by_list=True)(forward_fn, model.trainable_params())(batch_data) + optimizer(grads) + + vae_loss = forward_fn(batch_data) + train_loss += vae_loss.asnumpy() + + avg_loss = train_loss / data_size + print(f"Epoch [{epoch + 1}/{n_epochs}], Loss: {avg_loss:.4f}") + + if model_path: + save_checkpoint(model.trainable_params(), model_path) + print(f"模型已保存到 {model_path}") diff --git a/community/cv/scGen/train_model.py b/community/cv/scGen/train_model.py new file mode 100644 index 0000000000000000000000000000000000000000..38cd2c33b6bd97eceee50f8d7647cc31535d8024 --- /dev/null +++ b/community/cv/scGen/train_model.py @@ -0,0 +1,50 @@ +""" +训练模型模块,用于加载数据、初始化变分自编码器(VAE),并执行训练过程。 + +主要功能: +- 加载和预处理数据,生成适合模型输入的格式。 +- 初始化 VAE 模型和优化器。 +- 调用训练函数以执行模型训练。 + +依赖库: +- mindspore: 深度学习框架,提供模型训练和张量操作的功能。 +- models.vae: 包含 VAE 模型定义的模块。 +- utils.data_utils: 包含数据加载和预处理功能的模块。 +- train: 包含训练过程的函数。 +- batch_correction: 包含批次效应去除功能的模块。 +""" +import mindspore as ms +from models.vae import VAE +from utils.data_utils import load_data +from train import train + +# 设置MindSpore上下文 +ms.set_context(mode=ms.GRAPH_MODE, device_target="Ascend") + +# 配置参数 +train_path = "./data/pancreas.h5ad" +model_path = "./models/scGen/scgen.pt" +batch_size = 32 +z_dim = 100 +lr = 0.001 +dr_rate = 0.2 + +def main(): + # 加载数据 + data, train_data, input_matrix = load_data(train_path) + gex_size = input_matrix.shape[1] + + # 初始化模型 + model = VAE(input_dim=gex_size, z_dim=z_dim, dr_rate=dr_rate) + optimizer = ms.experimental.optim.Adam(model.trainable_params(), lr=lr) + + # 数据预处理 + data.obs["study"] = data.obs["sample"] + data.obs["cell_type"] = data.obs["celltype"] + + # 训练模型 + train(model, optimizer, train_data, n_epochs=100, model_path=model_path) + print(f"模型已保存到 {model_path}") + +if __name__ == "__main__": + main() diff --git a/community/cv/scGen/utils/data_utils.py b/community/cv/scGen/utils/data_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..a30242b26627d6f275115911a4f82aa7503795db --- /dev/null +++ b/community/cv/scGen/utils/data_utils.py @@ -0,0 +1,19 @@ +""" +数据处理模块,用于加载和处理训练数据。 + +主要功能: +- 加载输入数据集并返回经过随机打乱的训练数据、原始数据和输入矩阵。 + +依赖库: +- scanpy: 用于单细胞基因组数据分析的库。 +""" +from random import shuffle +import scanpy as sc + +def load_data(train_path): + data = sc.read(train_path) + input_matrix = data.X + ind_list = [i for i in range(input_matrix.shape[0])] + shuffle(ind_list) + train_data = input_matrix[ind_list, :] + return data, train_data, input_matrix diff --git a/community/cv/scGen/utils/model_utils.py b/community/cv/scGen/utils/model_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..f7dc93ebd7fce83822de09a61e5f86676997b8ee --- /dev/null +++ b/community/cv/scGen/utils/model_utils.py @@ -0,0 +1,44 @@ +""" +模型实用工具模块,用于处理与 VAE 模型相关的功能。 + +主要功能: +- 提取潜在变量(latent variables)。 +- 计算潜在变量的平均值。 +- 重建输入数据。 +- 生成随机样本。 + +依赖库: +- numpy: 提供数值计算支持。 +- mindspore: 深度学习框架,用于构建和训练模型。 +""" +import numpy as np +from mindspore import Tensor +import mindspore as ms + +def give_me_latent(model, data): + model.set_train(False) + data_tensor = Tensor(data, dtype=ms.float32) + mu = model.encode(data_tensor) + return mu.asnumpy() + +def avg_vector(model, data): + latent = give_me_latent(model, data) + arithmetic = np.average(latent, axis=0) + return arithmetic + +def reconstruct(model, data, use_data=False): + model.set_train(False) + if use_data: + latent_tensor = Tensor(data, dtype=ms.float32) + else: + latent_np = give_me_latent(model, data) + latent_tensor = Tensor(latent_np, dtype=ms.float32) + reconstructed_tensor = model.decode(latent_tensor) + return reconstructed_tensor.asnumpy() + +def sample(model, n_sample, z_dim): + model.set_train(False) + noise_np = np.random.randn(n_sample, z_dim).astype(np.float32) + noise_tensor = Tensor(noise_np) + gen_cells_tensor = model.decode(noise_tensor) + return gen_cells_tensor.asnumpy()