diff --git a/dagrs/Cargo.toml b/dagrs/Cargo.toml index b73e4cdfc42e9399067d156ab70bde241f24e46a..c06222a6946dcca287949e5d7d97b2781ddbd66a 100644 --- a/dagrs/Cargo.toml +++ b/dagrs/Cargo.toml @@ -6,3 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +yaml-rust = "0.4.5" +bimap = "0.6.1" \ No newline at end of file diff --git a/dagrs/doc/doc.md b/dagrs/doc/doc.md new file mode 100644 index 0000000000000000000000000000000000000000..d417a49626f027679ea0d7e105ba23ae0f5c14d5 --- /dev/null +++ b/dagrs/doc/doc.md @@ -0,0 +1,24 @@ +[TOC] + +# Overview + +DAG Engine 有四个主要文件: + +- dag_engine.rs - Engine 的实现,具体功能包括从 YAML 读取、形成图、判断环和输出。 +- task.rs - 任务结构的实现。 +- graph.rs - 图的实现,用来记录任务之间的依赖关系。 +- error_handler.rs - 错误处理,只有最简单的错误输出。 + + + +四个文件的关系如下: + +``` + dag_engine + / | \ + / | \ + task.rs graph.rs error_handler + | | +error_handler error_handler +``` + diff --git a/dagrs/src/dag_engine.rs b/dagrs/src/dag_engine.rs new file mode 100644 index 0000000000000000000000000000000000000000..2f257f6c007e01f19d12caa73f9c35491a0f64af --- /dev/null +++ b/dagrs/src/dag_engine.rs @@ -0,0 +1,90 @@ +/* + * @Author: Yinwhe + * @Date: 2022-01-25 18:26:14 + * @LastEditors: Yinwhe + * @LastEditTime: 2022-01-26 23:23:07 + * @Description: DAG Engine implementation + * @Copyright: Copyright (c) 2022 + */ + +use crate::{error_handler::DagError, graph::Graph, task::Task}; +use std::{collections::HashMap, fs::File, io::Read}; +use yaml_rust::YamlLoader; + +pub struct DagEngine { + tasks: HashMap, + rely_graph: Graph, +} + +impl DagEngine { + /// Allocate a new DagEngine + pub fn new() -> DagEngine { + DagEngine { + tasks: HashMap::new(), + rely_graph: Graph::new(), + } + } + + /// Do its job + pub fn run(&mut self, tasks_list: &str) -> Result { + self.read_tasks(tasks_list)?; + self.create_graph()?; + Ok(self.check_dag()) + } + + /// Read tasks into engine throuh yaml + fn read_tasks(&mut self, filename: &str) -> Result<(), DagError> { + let mut yaml_cont = String::new(); + + let mut yaml_file = File::open(filename)?; + yaml_file.read_to_string(&mut yaml_cont)?; + + // Parse Yaml + let yaml_tasks = YamlLoader::load_from_str(&yaml_cont)?; + let yaml_tasks = yaml_tasks[0]["dagrs"] + .as_hash() + .ok_or(DagError::error("format error"))?; + + // Read tasks + for (v, w) in yaml_tasks { + let id = v.as_str().ok_or(DagError::error("task id error"))?; + let task = Task::from_yaml(w)?; + + self.tasks.insert(id.to_owned(), task); + } + + Ok(()) + } + + /// create rely map between tasks + fn create_graph(&mut self) -> Result<(), DagError> { + let size = self.tasks.len(); + self.rely_graph.set_graph_size(size); + + // Add Node (create id - index mapping) + self.tasks + .iter() + .map(|(n, _)| self.rely_graph.add_node(n)) + .count(); + + // Form Graph + for (id, task) in &self.tasks { + let index = self.rely_graph.find_index_by_id(id).unwrap(); + + task.get_rely_list() + .iter() + .map(|t| { + self.rely_graph + .add_edge(index, self.rely_graph.find_index_by_id(t).unwrap()) + }) + .count(); + } + + Ok(()) + } + + /// Check whether it's DAG or not + fn check_dag(&self) -> bool { + self.rely_graph.topo_sort() + } +} \ No newline at end of file diff --git a/dagrs/src/error_handler.rs b/dagrs/src/error_handler.rs new file mode 100644 index 0000000000000000000000000000000000000000..97f6c4d1bb4b37039f70aa70a07ef253a41eef21 --- /dev/null +++ b/dagrs/src/error_handler.rs @@ -0,0 +1,38 @@ +/* + * @Author: Yinwhe + * @Date: 2022-01-21 11:14:18 + * @LastEditors: Yinwhe + * @LastEditTime: 2022-01-26 23:17:51 + * @Description: Simple error handler + * @Copyright: Copyright (c) 2021 + */ + +use std::fmt::{Debug, Display}; +use yaml_rust::ScanError; + +#[derive(Debug)] +pub struct DagError(String); + +impl Display for DagError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.0, f) + } +} + +impl From for DagError { + fn from(e: std::io::Error) -> Self { + DagError(e.to_string()) + } +} + +impl From for DagError { + fn from(e: ScanError) -> Self { + DagError(e.to_string()) + } +} + +impl DagError{ + pub fn error(message: &str) -> DagError { + DagError(message.to_string()) + } +} \ No newline at end of file diff --git a/dagrs/src/graph.rs b/dagrs/src/graph.rs new file mode 100644 index 0000000000000000000000000000000000000000..0f6cdcabb07064ddd48e0ee14ca621540cedb8f9 --- /dev/null +++ b/dagrs/src/graph.rs @@ -0,0 +1,101 @@ +/* + * @Author: Yinwhe + * @Date: 2022-01-25 17:46:18 + * @LastEditors: Yinwhe + * @LastEditTime: 2022-01-26 23:26:05 + * @Description: Graph records dependency relations between tasks + * @Copyright: Copyright (c) 2022 + */ + +use bimap::BiMap; + +#[derive(Debug)] +pub struct Graph { + nodes: BiMap, // Record node id and it's index + adj: Vec>, // Adjacency list + indegree: Vec, // Node's indegree, used for topo sort +} + +impl Graph { + /// Allocate an empty graph + pub fn new() -> Graph { + Graph { + nodes: BiMap::new(), + adj: Vec::new(), + indegree: Vec::new(), + } + } + + /// Set graph size, size is the number of tasks + pub fn set_graph_size(&mut self, size: usize) { + self.adj.resize(size, Vec::new()); + self.indegree.resize(size, 0) + } + + /// Add a node into the graph, repetitive add can cause errors + pub fn add_node(&mut self, id: &str) { + let index = self.nodes.len(); + self.nodes.insert(id.to_owned(), index); + } + + /// Add an edge into the graph + pub fn add_edge(&mut self, v: usize, w: usize) { + self.adj[v].push(w); + self.indegree[w] += 1; + } + + pub fn find_index_by_id(&self, id: &str) -> Option { + self.nodes.get_by_left(id).map(|i| i.to_owned()) + } + + pub fn find_id_by_index(&self, index: usize) -> Option { + self.nodes.get_by_right(&index).map(|n| n.to_string()) + } + + pub fn get_node_num(&self) -> usize { + self.nodes.len() + } + + /// Do topo sort in graph, returns true if DAG + pub fn topo_sort(&self) -> bool { + let mut queue = Vec::new(); + let mut indegree = self.indegree.clone(); + let mut count = 0; + let mut sequence = String::new(); + + indegree + .iter() + .enumerate() + .map(|(index, °ree)| { + if degree == 0 { + queue.push(index) + } + }) + .count(); + + while !queue.is_empty() { + let v = queue.pop().unwrap(); + + sequence.push_str(&format!(" -> {}", self.find_id_by_index(v).unwrap())); + count += 1; + + self.adj[v] + .iter() + .map(|&index| { + indegree[index] -= 1; + if indegree[index] == 0 { + queue.push(index) + } + }) + .count(); + } + + if count < self.get_node_num() { + println!("LOOP Detected"); + false + } else { + println!("[Start]{} -> [End]", sequence); + true + } + } +} \ No newline at end of file diff --git a/dagrs/src/main.rs b/dagrs/src/main.rs index e7a11a969c037e00a796aafeff6258501ec15e9a..e95e9100239d1670967d8ea87231953083c2e041 100644 --- a/dagrs/src/main.rs +++ b/dagrs/src/main.rs @@ -1,3 +1,25 @@ +/* + * @Author: Yinwhe + * @Date: 2022-01-19 14:14:28 + * @LastEditors: Yinwhe + * @LastEditTime: 2022-01-26 23:39:46 + * @Description: dagrs + * @Copyright: Copyright (c) 2022 + */ + +extern crate bimap; +extern crate yaml_rust; + +mod error_handler; +mod graph; +mod task; +mod dag_engine; + +use dag_engine::DagEngine; + fn main() { - println!("Hello, world!"); -} + let mut dagrs = DagEngine::new(); + if let Err(e) = dagrs.run("test/test1.yaml") { + println!("[Error] {}", e); + } +} \ No newline at end of file diff --git a/dagrs/src/task.rs b/dagrs/src/task.rs new file mode 100644 index 0000000000000000000000000000000000000000..958c9f4d47088f87453476ad6e844f167d1193c6 --- /dev/null +++ b/dagrs/src/task.rs @@ -0,0 +1,43 @@ +/* + * @Author: Yinwhe + * @Date: 2022-01-25 17:49:19 + * @LastEditors: Yinwhe + * @LastEditTime: 2022-01-26 23:18:04 + * @Description: Task implementation + * @Copyright: Copyright (c) 2022 + */ + +use crate::error_handler::DagError; +use yaml_rust::Yaml; + +#[derive(Debug)] +pub struct Task { + name: String, + relys: Vec, +} + +impl Task { + pub fn from_yaml(info: &Yaml) -> Result { + let name = info["name"] + .as_str() + .ok_or(DagError::error("Task name not found"))? + .to_owned(); + + let mut relys = Vec::new(); + if let Some(rely_tasks) = info["rely"].as_vec() { + for rely_task_id in rely_tasks { + let id = rely_task_id + .as_str() + .ok_or(DagError::error("Rely tasks id error"))? + .to_owned(); + relys.push(id) + } + } + + Ok(Task { name, relys }) + } + + pub fn get_rely_list(&self) -> &Vec { + &self.relys + } +} diff --git a/dagrs/test/test1.yaml b/dagrs/test/test1.yaml new file mode 100644 index 0000000000000000000000000000000000000000..391afa851aedda6ed63a6e4b1d6d2291bba65994 --- /dev/null +++ b/dagrs/test/test1.yaml @@ -0,0 +1,16 @@ +dagrs: + a: + name: "任务1" + rely: [b, c] + b: + name: "任务2" + rely: [c] + c: + name: "任务3" + rely: [d] + d: + name: "任务4" + rely: [e] + e: + name: "任务5" + rely: [c] \ No newline at end of file