代码拉取完成,页面将自动刷新
import csv
import pandas as pd
import pandas as pd
import numpy as np
from pgmpy.estimators import BDeuScore, K2Score, BicScore
from pgmpy.models import BayesianNetwork
import json
from Hive.DAG_update import DataConversion
import os
from colorama import Fore
best_vector = [
0.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
1.0,
0.0,
0.0,
1.0,
1.0,
0.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
1.0,
1.0,
0.0,
0.0,
0.0,
1.0,
0.0,
0.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
0.0,
1.0,
0.0,
0.0,
0.0,
0.0,
1.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
1.0,
1.0,
0.0,
1.0,
1.0,
1.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
1.0,
0.0,
0.0,
0.0,
0.0,
1.0,
0.0,
0.0,
0.0,
0.0,
1.0,
1.0,
0.0,
]
class DirUtils:
def get_output_path(args):
res_path = ""
if args["task_name"] != "":
res_path = f"output/{args['task_name']}/size_{args['net_size']}/Net-{args['net_idx']}/{args['mode']}/{args['evaluator']}/{args['mutation']}"
else:
res_path = f"output/size_{args['size']}/Net-{args['idx']}/{args['mode']}/{args['evaluator']}/{args['mutation']}"
if not os.path.exists(res_path):
os.makedirs(res_path)
return res_path
class DataName:
Heterozygous = "heterozygous"
Null_Mutants = "null-mutants"
Trajectories = "trajectories"
Task1 = "Ecoli1"
Task2 = "Ecoli2"
Task3 = "Yeast1"
Task4 = "Yeast2"
Task5 = "Yeast3"
Task_List = [Task1, Task2, Task3, Task4, Task5]
def get_column_names(mode, size):
if size == 100:
return [f"G{i}" for i in range(1, 101)]
# 用一个for循环总结下面的所有情况
if mode.startswith("D"):
dim_num = int(mode[1])
name_list = []
for k in range(dim_num):
name_list.extend([f"{chr(65 + k)}{i+1}" for i in range(10)])
return name_list
else:
raise ValueError("mode not found")
ColumnNames = ["G1", "G2", "G3", "G4", "G5", "G6", "G7", "G8", "G9", "G10"]
D3ColumnNames = (
[f"A{i}" for i in range(1, 11)]
+ [f"B{i}" for i in range(1, 11)]
+ [f"C{i}" for i in range(1, 11)]
)
def read_base_data(size):
file_path = f"output/report/ours/result_info_{size}.json"
print(f"{Fore.GREEN}Read data from {file_path}{Fore.RESET}")
with open(file_path, "r") as f:
data = json.load(f)
return data
def read_data_file(task_name, file_type, without_noise=False):
"""
Read the data file for the given task name and return the data as a list of lists.
"""
if without_noise:
file_path = f"DREAM3 in silico challenge/Size10/Data without noise/InSilicoSize10-{task_name}-nonoise-{file_type}.tsv"
else:
file_path = f"DREAM3 in silico challenge/Size10/DREAM3 data/InSilicoSize10-{task_name}-{file_type}.tsv"
# Open the file in read mode
with open(file_path, "r") as file:
# Create a CSV reader object
reader = csv.reader(file, delimiter="\t")
# Skip the first row (header)
next(reader)
# Read the file data and return it as a list of lists
data = [[float(cell) for cell in row[1:]] for row in reader]
return data
def read_Dream4_data(idx):
data_path = f"DREAM4 in silico challenge/Size 10/DREAM4 training data/insilico_size10_{idx}/insilico_size10_{idx}_timeseries.tsv"
print(f"read data from {data_path}")
df = pd.read_csv(data_path, sep="\t")
df = df.dropna()
df = df.iloc[1:, 1:]
array_data = df.values.astype(float)
return array_data
def read_size_100_data(idx):
data_path = f"DREAM4 in silico challenge/Size 100/DREAM4 training data/insilico_size100_{idx}/insilico_size100_{idx}_timeseries.tsv"
df = pd.read_csv(data_path, sep="\t")
df = df.dropna()
df = df.iloc[1:, 1:]
array_data = df.values.astype(float)
return array_data
def read_size_100_time_series_data(idx, mode):
data = read_size_100_data(idx)
if mode == "D1":
gen_data = data
elif mode == "D3":
gen_data = np.concatenate(
(
data[0:207, :],
data[1:208, :],
data[2:209, :],
),
axis=1,
)
return gen_data
def get_gold_standard(size, idx):
file_name = f"DREAM4 in silico challenge/Size {size}/DREAM4 gold standards/insilico_size{size}_{idx}_goldstandard.tsv"
df = pd.read_csv(
file_name, sep="\t", header=None, names=["source", "target", "value"]
)
return df
def general_read_Dream4_data(idx, mode):
data = read_Dream4_data(idx)
if mode == "D1":
gen_data = data
elif mode == "D2":
gen_data = np.concatenate((data[0:103, :], data[1:104, :]), axis=1)
elif mode == "D3":
gen_data = np.concatenate(
(data[0:102, :], data[1:103, :], data[2:104, :]), axis=1
)
elif mode == "D4":
gen_data = np.concatenate(
(data[0:101, :], data[1:102, :], data[2:103, :], data[3:104, :]), axis=1
)
elif mode == "D5":
gen_data = np.concatenate(
(
data[0:100, :],
data[1:101, :],
data[2:102, :],
data[3:103, :],
data[4:104, :],
),
axis=1,
)
elif mode == "D6":
gen_data = np.concatenate(
(
data[0:99, :],
data[1:100, :],
data[2:101, :],
data[3:102, :],
data[4:103, :],
data[5:104, :],
),
axis=1,
)
elif mode == "D7":
gen_data = np.concatenate(
(
data[0:98, :],
data[1:99, :],
data[2:100, :],
data[3:101, :],
data[4:102, :],
data[5:103, :],
data[6:104, :],
),
axis=1,
)
return gen_data
def read_Dream4_time_series_data(idx):
data = read_Dream4_data(idx)
data1 = data[0:102, :]
data2 = data[1:103, :]
data3 = data[2:104, :]
gen_data = np.concatenate((data1, data2, data3), axis=1)
return gen_data
def eval_test():
data = read_data_file(DataName.Task1, DataName.Null_Mutants, without_noise=True)
data = read_Dream4_data()
data = pd.DataFrame(data, columns=DataName.ColumnNames)
print(f"data shape = {data.shape}")
bdeu = BDeuScore(data, equivalent_sample_size=5)
k2 = K2Score(data)
bic = BicScore(data)
model1 = BayesianNetwork([("G1", "G2"), ("G1", "G3")])
model2 = BayesianNetwork([("G1", "G5"), ("G2", "G3")])
model_gold = BayesianNetwork(
[
("G2", "G1"),
("G2", "G3"),
("G3", "G4"),
("G9", "G4"),
("G3", "G5"),
("G8", "G5"),
("G9", "G5"),
("G3", "G6"),
("G3", "G7"),
("G8", "G7"),
("G10", "G7"),
]
)
back_model_gold = BayesianNetwork(
[
("G1", "G2"),
("G3", "G2"),
("G4", "G3"),
("G4", "G9"),
("G5", "G3"),
("G5", "G8"),
("G5", "G9"),
("G6", "G3"),
("G7", "G3"),
("G7", "G8"),
("G7", "G10"),
]
)
all_false_model = BayesianNetwork()
all_false_model.add_nodes_from(DataName.ColumnNames)
print("-----model1的评分-----")
cur_model = model1
# print(f"bdeu score: \t{bdeu.score(cur_model)}")
print(f"k2 score: \t{k2.score(cur_model)}")
print(f"bic score: \t{bic.score(cur_model)}")
print("-----model2的评分-----")
cur_model = model2
# print(f"bdeu score: \t{bdeu.score(cur_model)}")
print(f"k2 score: \t{k2.score(cur_model)}")
print(f"bic score: \t{bic.score(cur_model)}")
print("_______model_gold_______")
cur_model = model_gold
# print(f"bdeu score: \t{bdeu.score(cur_model)}")
print(f"k2 score: \t{k2.score(cur_model)}")
print(f"bic score: \t{bic.score(cur_model)}")
print("_______model_gold_back_______")
cur_model = back_model_gold
# print(f"bdeu score: \t{bdeu.score(cur_model)}")
print(f"k2 score: \t{k2.score(cur_model)}")
print(f"bic score: \t{bic.score(cur_model)}")
def single_display():
data = read_data_file(DataName.Task1, DataName.Heterozygous, without_noise=True)
data = read_Dream4_data()
import pandas as pd
import numpy as np
from pgmpy.estimators import BDeuScore, K2Score, BicScore
from pgmpy.models import BayesianNetwork
data = pd.DataFrame(data, columns=DataName.ColumnNames)
bdeu = BDeuScore(data, equivalent_sample_size=5)
k2 = K2Score(data)
bic = BicScore(data)
model_gold = BayesianNetwork(
[
("G2", "G1"),
("G2", "G3"),
("G3", "G4"),
("G9", "G4"),
("G3", "G5"),
("G8", "G5"),
("G9", "G5"),
("G3", "G6"),
("G3", "G7"),
("G8", "G7"),
("G10", "G7"),
]
)
parent_name, child_name = "G2", "G1"
print(
f"bdeu {parent_name} -> {child_name}: {bdeu.local_score(child_name, parents=[parent_name])}"
)
parent_name, child_name = "G1", "G2"
print(
f"bdeu {parent_name} -> {child_name}: {bdeu.local_score(child_name, parents=[parent_name])}"
)
parent_name, child_name = "G7", "G10"
print(
f"bdeu {parent_name} -> {child_name}: {bdeu.local_score(child_name, parents=[parent_name])}"
)
parent_name, child_name = "G10", "G7"
print(
f"bdeu {parent_name} -> {child_name}: {bdeu.local_score(child_name, parents=[parent_name])}"
)
def save_train_info(save_info, save_path):
for key, value in save_info.items():
if type(value) == np.ndarray:
save_info[key] = value.tolist()
with open(save_path, "w") as f:
json.dump(save_info, f, indent=4)
def single_this_test():
data = read_Dream4_time_series_data()
df = pd.DataFrame(
data,
columns=[f"A{i}" for i in range(1, 11)]
+ [f"B{i}" for i in range(1, 11)]
+ [f"C{i}" for i in range(1, 11)],
)
df.to_excel("output/data/rebuild.xlsx", index=False)
def read_ground_truth(idx=1, type="vector"):
file_name = f"DREAM4 in silico challenge/Size 10/DREAM4 gold standards/insilico_size10_{idx}_goldstandard.tsv"
df = pd.read_csv(
file_name, sep="\t", header=None, names=["source", "target", "value"]
)
# 获取所有的节点
nodes = [f"G{i}" for i in range(1, 11)]
# 创建一个空的邻接矩阵
adj_matrix = pd.DataFrame(
np.zeros((len(nodes), len(nodes)), dtype=int), index=nodes, columns=nodes
)
# 填充邻接矩阵
for _, row in df.iterrows():
adj_matrix.loc[row["source"], row["target"]] = row["value"]
adj_matrix = adj_matrix.values
if type == "matrix":
return adj_matrix
elif type == "vertor":
ground_vector = DataConversion.matrix2vector(adj_matrix)
return ground_vector
if __name__ == "__main__":
# read_Dream4_data()
# eval_test()
single_this_test()
# print(os.getcwd())
# single_display()
# data = read_data_file(DataName.Task1, DataName.Heterozygous)
# print(data)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。