Ai
1 Star 0 Fork 0

gitee-hc/ABC_machine_learning

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
data_utils.py 12.07 KB
一键复制 编辑 原始数据 按行查看 历史
gitee-hc 提交于 2024-05-12 21:49 +08:00 . fixed with one stage compare
import csv
import pandas as pd
import pandas as pd
import numpy as np
from pgmpy.estimators import BDeuScore, K2Score, BicScore
from pgmpy.models import BayesianNetwork
import json
from Hive.DAG_update import DataConversion
import os
from colorama import Fore
best_vector = [
0.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
0.0,
0.0,
1.0,
1.0,
0.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
1.0,
1.0,
0.0,
0.0,
0.0,
1.0,
0.0,
0.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
0.0,
1.0,
0.0,
0.0,
0.0,
0.0,
1.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
1.0,
0.0,
0.0,
0.0,
0.0,
1.0,
0.0,
0.0,
0.0,
0.0,
1.0,
1.0,
0.0,
]
class DirUtils:
def get_output_path(args):
res_path = ""
if args["task_name"] != "":
res_path = f"output/{args['task_name']}/size_{args['net_size']}/Net-{args['net_idx']}/{args['mode']}/{args['evaluator']}/{args['mutation']}"
else:
res_path = f"output/size_{args['size']}/Net-{args['idx']}/{args['mode']}/{args['evaluator']}/{args['mutation']}"
if not os.path.exists(res_path):
os.makedirs(res_path)
return res_path
class DataName:
Heterozygous = "heterozygous"
Null_Mutants = "null-mutants"
Trajectories = "trajectories"
Task1 = "Ecoli1"
Task2 = "Ecoli2"
Task3 = "Yeast1"
Task4 = "Yeast2"
Task5 = "Yeast3"
Task_List = [Task1, Task2, Task3, Task4, Task5]
def get_column_names(mode, size):
if size == 100:
return [f"G{i}" for i in range(1, 101)]
# 用一个for循环总结下面的所有情况
if mode.startswith("D"):
dim_num = int(mode[1])
name_list = []
for k in range(dim_num):
name_list.extend([f"{chr(65 + k)}{i+1}" for i in range(10)])
return name_list
else:
raise ValueError("mode not found")
ColumnNames = ["G1", "G2", "G3", "G4", "G5", "G6", "G7", "G8", "G9", "G10"]
D3ColumnNames = (
[f"A{i}" for i in range(1, 11)]
+ [f"B{i}" for i in range(1, 11)]
+ [f"C{i}" for i in range(1, 11)]
)
def read_base_data(size):
file_path = f"output/report/ours/result_info_{size}.json"
print(f"{Fore.GREEN}Read data from {file_path}{Fore.RESET}")
with open(file_path, "r") as f:
data = json.load(f)
return data
def read_data_file(task_name, file_type, without_noise=False):
"""
Read the data file for the given task name and return the data as a list of lists.
"""
if without_noise:
file_path = f"DREAM3 in silico challenge/Size10/Data without noise/InSilicoSize10-{task_name}-nonoise-{file_type}.tsv"
else:
file_path = f"DREAM3 in silico challenge/Size10/DREAM3 data/InSilicoSize10-{task_name}-{file_type}.tsv"
# Open the file in read mode
with open(file_path, "r") as file:
# Create a CSV reader object
reader = csv.reader(file, delimiter="\t")
# Skip the first row (header)
next(reader)
# Read the file data and return it as a list of lists
data = [[float(cell) for cell in row[1:]] for row in reader]
return data
def read_Dream4_data(idx):
data_path = f"DREAM4 in silico challenge/Size 10/DREAM4 training data/insilico_size10_{idx}/insilico_size10_{idx}_timeseries.tsv"
print(f"read data from {data_path}")
df = pd.read_csv(data_path, sep="\t")
df = df.dropna()
df = df.iloc[1:, 1:]
array_data = df.values.astype(float)
return array_data
def read_size_100_data(idx):
data_path = f"DREAM4 in silico challenge/Size 100/DREAM4 training data/insilico_size100_{idx}/insilico_size100_{idx}_timeseries.tsv"
df = pd.read_csv(data_path, sep="\t")
df = df.dropna()
df = df.iloc[1:, 1:]
array_data = df.values.astype(float)
return array_data
def read_size_100_time_series_data(idx, mode):
data = read_size_100_data(idx)
if mode == "D1":
gen_data = data
elif mode == "D3":
gen_data = np.concatenate(
(
data[0:207, :],
data[1:208, :],
data[2:209, :],
),
axis=1,
)
return gen_data
def get_gold_standard(size, idx):
file_name = f"DREAM4 in silico challenge/Size {size}/DREAM4 gold standards/insilico_size{size}_{idx}_goldstandard.tsv"
df = pd.read_csv(
file_name, sep="\t", header=None, names=["source", "target", "value"]
)
return df
def general_read_Dream4_data(idx, mode):
data = read_Dream4_data(idx)
if mode == "D1":
gen_data = data
elif mode == "D2":
gen_data = np.concatenate((data[0:103, :], data[1:104, :]), axis=1)
elif mode == "D3":
gen_data = np.concatenate(
(data[0:102, :], data[1:103, :], data[2:104, :]), axis=1
)
elif mode == "D4":
gen_data = np.concatenate(
(data[0:101, :], data[1:102, :], data[2:103, :], data[3:104, :]), axis=1
)
elif mode == "D5":
gen_data = np.concatenate(
(
data[0:100, :],
data[1:101, :],
data[2:102, :],
data[3:103, :],
data[4:104, :],
),
axis=1,
)
elif mode == "D6":
gen_data = np.concatenate(
(
data[0:99, :],
data[1:100, :],
data[2:101, :],
data[3:102, :],
data[4:103, :],
data[5:104, :],
),
axis=1,
)
elif mode == "D7":
gen_data = np.concatenate(
(
data[0:98, :],
data[1:99, :],
data[2:100, :],
data[3:101, :],
data[4:102, :],
data[5:103, :],
data[6:104, :],
),
axis=1,
)
return gen_data
def read_Dream4_time_series_data(idx):
data = read_Dream4_data(idx)
data1 = data[0:102, :]
data2 = data[1:103, :]
data3 = data[2:104, :]
gen_data = np.concatenate((data1, data2, data3), axis=1)
return gen_data
def eval_test():
data = read_data_file(DataName.Task1, DataName.Null_Mutants, without_noise=True)
data = read_Dream4_data()
data = pd.DataFrame(data, columns=DataName.ColumnNames)
print(f"data shape = {data.shape}")
bdeu = BDeuScore(data, equivalent_sample_size=5)
k2 = K2Score(data)
bic = BicScore(data)
model1 = BayesianNetwork([("G1", "G2"), ("G1", "G3")])
model2 = BayesianNetwork([("G1", "G5"), ("G2", "G3")])
model_gold = BayesianNetwork(
[
("G2", "G1"),
("G2", "G3"),
("G3", "G4"),
("G9", "G4"),
("G3", "G5"),
("G8", "G5"),
("G9", "G5"),
("G3", "G6"),
("G3", "G7"),
("G8", "G7"),
("G10", "G7"),
]
)
back_model_gold = BayesianNetwork(
[
("G1", "G2"),
("G3", "G2"),
("G4", "G3"),
("G4", "G9"),
("G5", "G3"),
("G5", "G8"),
("G5", "G9"),
("G6", "G3"),
("G7", "G3"),
("G7", "G8"),
("G7", "G10"),
]
)
all_false_model = BayesianNetwork()
all_false_model.add_nodes_from(DataName.ColumnNames)
print("-----model1的评分-----")
cur_model = model1
# print(f"bdeu score: \t{bdeu.score(cur_model)}")
print(f"k2 score: \t{k2.score(cur_model)}")
print(f"bic score: \t{bic.score(cur_model)}")
print("-----model2的评分-----")
cur_model = model2
# print(f"bdeu score: \t{bdeu.score(cur_model)}")
print(f"k2 score: \t{k2.score(cur_model)}")
print(f"bic score: \t{bic.score(cur_model)}")
print("_______model_gold_______")
cur_model = model_gold
# print(f"bdeu score: \t{bdeu.score(cur_model)}")
print(f"k2 score: \t{k2.score(cur_model)}")
print(f"bic score: \t{bic.score(cur_model)}")
print("_______model_gold_back_______")
cur_model = back_model_gold
# print(f"bdeu score: \t{bdeu.score(cur_model)}")
print(f"k2 score: \t{k2.score(cur_model)}")
print(f"bic score: \t{bic.score(cur_model)}")
def single_display():
data = read_data_file(DataName.Task1, DataName.Heterozygous, without_noise=True)
data = read_Dream4_data()
import pandas as pd
import numpy as np
from pgmpy.estimators import BDeuScore, K2Score, BicScore
from pgmpy.models import BayesianNetwork
data = pd.DataFrame(data, columns=DataName.ColumnNames)
bdeu = BDeuScore(data, equivalent_sample_size=5)
k2 = K2Score(data)
bic = BicScore(data)
model_gold = BayesianNetwork(
[
("G2", "G1"),
("G2", "G3"),
("G3", "G4"),
("G9", "G4"),
("G3", "G5"),
("G8", "G5"),
("G9", "G5"),
("G3", "G6"),
("G3", "G7"),
("G8", "G7"),
("G10", "G7"),
]
)
parent_name, child_name = "G2", "G1"
print(
f"bdeu {parent_name} -> {child_name}: {bdeu.local_score(child_name, parents=[parent_name])}"
)
parent_name, child_name = "G1", "G2"
print(
f"bdeu {parent_name} -> {child_name}: {bdeu.local_score(child_name, parents=[parent_name])}"
)
parent_name, child_name = "G7", "G10"
print(
f"bdeu {parent_name} -> {child_name}: {bdeu.local_score(child_name, parents=[parent_name])}"
)
parent_name, child_name = "G10", "G7"
print(
f"bdeu {parent_name} -> {child_name}: {bdeu.local_score(child_name, parents=[parent_name])}"
)
def save_train_info(save_info, save_path):
for key, value in save_info.items():
if type(value) == np.ndarray:
save_info[key] = value.tolist()
with open(save_path, "w") as f:
json.dump(save_info, f, indent=4)
def single_this_test():
data = read_Dream4_time_series_data()
df = pd.DataFrame(
data,
columns=[f"A{i}" for i in range(1, 11)]
+ [f"B{i}" for i in range(1, 11)]
+ [f"C{i}" for i in range(1, 11)],
)
df.to_excel("output/data/rebuild.xlsx", index=False)
def read_ground_truth(idx=1, type="vector"):
file_name = f"DREAM4 in silico challenge/Size 10/DREAM4 gold standards/insilico_size10_{idx}_goldstandard.tsv"
df = pd.read_csv(
file_name, sep="\t", header=None, names=["source", "target", "value"]
)
# 获取所有的节点
nodes = [f"G{i}" for i in range(1, 11)]
# 创建一个空的邻接矩阵
adj_matrix = pd.DataFrame(
np.zeros((len(nodes), len(nodes)), dtype=int), index=nodes, columns=nodes
)
# 填充邻接矩阵
for _, row in df.iterrows():
adj_matrix.loc[row["source"], row["target"]] = row["value"]
adj_matrix = adj_matrix.values
if type == "matrix":
return adj_matrix
elif type == "vertor":
ground_vector = DataConversion.matrix2vector(adj_matrix)
return ground_vector
if __name__ == "__main__":
# read_Dream4_data()
# eval_test()
single_this_test()
# print(os.getcwd())
# single_display()
# data = read_data_file(DataName.Task1, DataName.Heterozygous)
# print(data)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/gitee-hc/ABC_machine_learning.git
git@gitee.com:gitee-hc/ABC_machine_learning.git
gitee-hc
ABC_machine_learning
ABC_machine_learning
main

搜索帮助