diff --git a/dagrs/Cargo.toml b/dagrs/Cargo.toml index d38d983f4b97fd83f2607f6495a80d31f71be88a..638a48124bb895ce78150bd41062ab6c49baf526 100644 --- a/dagrs/Cargo.toml +++ b/dagrs/Cargo.toml @@ -7,5 +7,5 @@ edition = "2021" [dependencies] yaml-rust = "0.4.5" -bimap = "0.6.1" +lazy_static = "1.4.0" clap = { version = "3.0.14", features = ["derive"] } \ No newline at end of file diff --git a/dagrs/src/dag_engine.rs b/dagrs/src/dag_engine.rs index 5df227c324c875aa95e414b088d3c2636c4547c6..b78389a3efa9ee89ec47989e0257acc5ad1ea6a5 100644 --- a/dagrs/src/dag_engine.rs +++ b/dagrs/src/dag_engine.rs @@ -1,7 +1,7 @@ //! Dag Engine is dagrs's main body use crate::{ - error_handler::{DagError, FormatErrorMark}, + error_handler::{DagError, InnerError}, graph::Graph, task::TaskTrait, }; @@ -35,6 +35,8 @@ impl DagEngine { /// # Example /// ``` /// let dagrs = DagEngine::new(); + /// dagrs.add_task(task1); + /// dagrs.add_task(task2); /// dagrs.run("test/test_dag.yaml"); /// ``` pub fn run(&mut self, task_info_file: &str) -> Result { @@ -43,6 +45,36 @@ impl DagEngine { Ok(self.check_dag()) } + /// Do dagrs's job from yaml file. + /// + /// # Example + /// ``` + /// let dagrs = DagEngine::new(); + /// dagrs.run_from_yaml("test/test_dag.yaml"); + /// ``` + /// + /// This method is similar to `run`, but read tasks from yaml file, + /// thus no need to add tasks mannually. + pub fn run_from_yaml(&mut self, filename: &str) -> Result { + self.read_tasks(filename)?; + self.run() + } + + /// Add new task into dagrs + /// + /// # Example + /// ``` + /// let dagrs = DagEngine::new(); + /// dagrs.add_task(task1); + /// dagrs.add_task(task2); + /// dagrs.run("test/test_dag.yaml"); + /// ``` + /// + /// Here `task1` and `task2` are user defined task wrapped in [`TaskWrapper`]. + pub fn add_task(&mut self, task: TaskWrapper) { + self.tasks.insert(task.get_id(), task); + } + /// Read tasks into engine throuh yaml /// /// # Example @@ -65,23 +97,17 @@ impl DagEngine { let size = self.tasks.len(); self.rely_graph.set_graph_size(size); - // Add Node (create id - index mapping) - self.tasks - .iter() - .map(|(n, _)| self.rely_graph.add_node(n)) - .count(); - // Form Graph - for (id, task) in &self.tasks { - let index = self.rely_graph.find_index_by_id(id).unwrap(); - + for (&id, task) in self.tasks.iter() { + assert!(id < size); for rely_task_id in task.get_rely_list() { - let rely_index = self - .rely_graph - .find_index_by_id(&rely_task_id) - .ok_or(DagError::format_error(id, FormatErrorMark::RelyIDIllegal))?; - - self.rely_graph.add_edge(index, rely_index); + // Rely task existence check + if !self.tasks.contains_key(&rely_task_id) { + return Err(DagError::inner_error(InnerError::RelyTaskIllegal( + self.tasks[&id].get_name(), + ))); + } + self.rely_graph.add_edge(id, rely_task_id); } } @@ -96,6 +122,18 @@ impl DagEngine { /// ``` /// This opeartions will judge the graph and give out a execution sequence if possible. fn check_dag(&self) -> bool { - self.rely_graph.topo_sort() + if let Some(seq) = self.rely_graph.topo_sort() { + self.print_seq(&seq); + seq.iter() + .map(|id| { + println!("Executing Task[name: {}]", self.tasks[id].get_name()); + self.tasks[id].run(); + }) + .count(); + true + } else { + println!("Loop Detect"); + false + } } } diff --git a/dagrs/src/error_handler.rs b/dagrs/src/error_handler.rs index 0e77daedf1f94f3eb948ad2a0746630d9aa75286..051f02d679d479b0407e20722d0c85051a0a903b 100644 --- a/dagrs/src/error_handler.rs +++ b/dagrs/src/error_handler.rs @@ -2,55 +2,78 @@ use std::fmt::Display; +#[derive(Debug, PartialEq, Eq)] +/// A synthesis of all possible errors. +pub enum DagError { + /// IO Error, like file not exist, etc. + /// Here we simplefy it to a message(String). + IOError(String), + /// YAML Error, like format error, etc. + YamlError(YamlError), + /// Error that occurs when running dagrs. + InnerError(InnerError), +} #[derive(Debug, PartialEq, Eq)] -/// Format Error may occur in yaml file -pub enum FormatErrorMark { - /// Not start with 'dagrs' +/// Format Error, point out which part has what kinds of error. +pub enum FormatError { + /// Not start with 'dagrs'. StartWordError, - /// A task have no name filed - NoName, - /// The task ID dependent on the task does not exist - RelyIDIllegal, + /// A task have no name filed, `String` points out task's id. + NoName(String), + /// No run scripts + NoRunScript(String), } #[derive(Debug, PartialEq, Eq)] -/// Format Error, point out which part has what error type. -pub struct FormatError { - /// which task definition has errors. - id: String, - /// Has what kinds of errors. - mark: FormatErrorMark, +/// Error that occurs when parsing YAML file. +pub enum YamlError { + /// Yaml Parser defined error type. + YamlParserError(yaml_rust::ScanError), + /// Format Error type. + YamlFormatError(FormatError), +} + +#[derive(Debug, PartialEq, Eq)] +/// Error that occurs when running dagrs +pub enum InnerError { + /// Dependency task not exist + RelyTaskIllegal(String), } impl Display for FormatError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.mark { - FormatErrorMark::StartWordError => write!(f, "Not start with 'dagrs:'"), - FormatErrorMark::NoName => write!(f, "Task[ID:{}] name not found", self.id), - FormatErrorMark::RelyIDIllegal => write!(f, "Task[ID:{}] rely ID not found", self.id), + match self { + Self::StartWordError => write!(f, "YAML file not start with 'dagrs:'"), + Self::NoName(id) => write!(f, "Task[ID:{}] name not found", id), + Self::NoRunScript(id) => write!(f, "Task[ID:{}] run script not found", id), } } } -#[derive(Debug, PartialEq, Eq)] -/// A synthesis of all possible errors. -pub enum DagError { - /// IO Error, like file not exist, etc. - /// Here we simplefy it to a message(String). - IOError(String), - /// Yaml Parser defined error type. - YamlParserError(yaml_rust::ScanError), - /// Format Error type. - YamlFormatError(FormatError), +impl Display for YamlError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::YamlFormatError(e) => e.fmt(f), + Self::YamlParserError(e) => e.fmt(f), + } + } +} + +impl Display for InnerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::RelyTaskIllegal(name) => write!(f, "Task[Name:{}] rely tasks not exist!", name), + } + } } impl Display for DagError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self { Self::IOError(e) => e.fmt(f), - Self::YamlParserError(e) => e.fmt(f), - Self::YamlFormatError(e) => e.fmt(f), + Self::YamlError(e) => e.fmt(f), + Self::InnerError(e) => e.fmt(f), } } } @@ -63,22 +86,30 @@ impl From for DagError { impl From for DagError { fn from(e: yaml_rust::ScanError) -> Self { - Self::YamlParserError(e) + Self::YamlError(YamlError::YamlParserError(e)) } } impl DagError { /// Throw a format error - /// + /// /// # Example /// ``` - /// DagError::format_error("a", FormatErrorMark::NoName); + /// DagError::format_error(FormatError::NoName("a")); /// ``` /// This will throw a error that says, task 'a' has no name field. - pub fn format_error(id: &str, mark: FormatErrorMark) -> Self { - Self::YamlFormatError(FormatError { - id: id.into(), - mark, - }) + pub fn format_error(error: FormatError) -> Self { + Self::YamlError(YamlError::YamlFormatError(error)) + } + + /// Throw a inner error + /// + /// # Example + /// ``` + /// DagError::inner_error(InnerError::RelyTaskIllegal("task 1")) + /// ``` + /// This will throw a error that says, task with name "task 1" has non-exist rely tasks. + pub fn inner_error(error: InnerError) -> Self { + Self::InnerError(error) } } diff --git a/dagrs/src/graph.rs b/dagrs/src/graph.rs index 5e407ae06ec7bf44f75fdaac1b522ddac14e4dbf..eb4f58bc842eb0086e6dc81629daa4102a590fec 100644 --- a/dagrs/src/graph.rs +++ b/dagrs/src/graph.rs @@ -1,12 +1,9 @@ //! Graph stores dependency relations -use bimap::BiMap; - #[derive(Debug)] /// Graph Struct pub struct Graph { - /// Record node id and it's index - nodes: BiMap, + size: usize, /// Adjacency list adj: Vec>, /// Node's indegree, used for topo sort @@ -22,7 +19,7 @@ impl Graph { /// ``` pub fn new() -> Graph { Graph { - nodes: BiMap::new(), + size: 0, adj: Vec::new(), indegree: Vec::new(), } @@ -36,25 +33,11 @@ impl Graph { /// g.set_graph_size(size); /// ``` pub fn set_graph_size(&mut self, size: usize) { + self.size = size; self.adj.resize(size, Vec::new()); self.indegree.resize(size, 0) } - /// Add a node into the graph - /// - /// This operation will create a mapping between ID and its index. - /// - /// # Example - /// ``` - /// g.add_node("Node1"); - /// ``` - /// **Note:** `id` won't get repeated in dagrs, - /// since yaml parser will overwrite its info if a task's ID repeats. - pub fn add_node(&mut self, id: &str) { - let index = self.nodes.len(); - self.nodes.insert(id.to_owned(), index); - } - /// Add an edge into the graph /// /// ```Example @@ -67,32 +50,16 @@ impl Graph { self.indegree[w] += 1; } - /// Find a task's index by its ID - pub fn find_index_by_id(&self, id: &str) -> Option { - self.nodes.get_by_left(id).map(|i| i.to_owned()) - } - - /// Find a task's ID by its index - pub fn find_id_by_index(&self, index: usize) -> Option { - self.nodes.get_by_right(&index).map(|n| n.to_string()) - } - - /// Get number of nodes in grapg - pub fn get_node_num(&self) -> usize { - self.nodes.len() - } - /// Do topo sort in graph, returns true if DAG + /// Do topo sort in graph, returns a possible execution sequnce if DAG /// /// # Example /// ``` /// g.topo_sort(); /// ``` /// This operation will judge whether graph is a DAG or not, - /// returns true if yes, and false if no. + /// returns Some(Possible Sequence) if yes, and None if no. /// - /// This function has output, if graph is a DAG, it will print a possible execution sequence, - /// or it will print `Loop Detected`. /// /// **Note**: this function can only be called after graph's initialization (add nodes and edges, etc.) is done. /// @@ -111,7 +78,7 @@ impl Graph { let mut queue = Vec::new(); let mut indegree = self.indegree.clone(); let mut count = 0; - let mut sequence = String::new(); + let mut sequence = vec![]; indegree .iter() @@ -126,7 +93,7 @@ impl Graph { while !queue.is_empty() { let v = queue.pop().unwrap(); - sequence.push_str(&format!(" -> {}", self.find_id_by_index(v).unwrap())); + sequence.push(v); count += 1; self.adj[v] @@ -140,12 +107,10 @@ impl Graph { .count(); } - if count < self.get_node_num() { - println!("LOOP Detected"); - false + if count < self.size { + None } else { - println!("[Start]{} -> [End]", sequence); - true + Some(sequence) } } } diff --git a/dagrs/src/main.rs b/dagrs/src/main.rs index f3b00952a56321fd5dff6203e675ebb0e8cbff7d..95cbd0cc11ec0e1b53aaf56efc4a024868415751 100644 --- a/dagrs/src/main.rs +++ b/dagrs/src/main.rs @@ -1,75 +1,18 @@ -/*! -本项目是用 Rust 写的 DAG 执行引擎,开发文档请参考:[使用 Rust 编写 DAG 执行引擎](https://openeuler.feishu.cn/docs/doccnVLprAY6vIMv6W1vgfLnfrf)。 - -## 用法 - -确保 Rust 编译环境可用(`cargo build`),然后在此文件夹中运行`cargo build --release`,在`target/release/`中获取可执行文件,并将其放入PATH。 - -命令行使用方式为: - -```bash -$ dagrs -h -dagrs 0.1.0 - -USAGE: - dagrs --filepath - -OPTIONS: - -f, --filepath YAML file path - -h, --help Print help information - -V, --version Print version information -``` - -例如: - -```bash -$ dagrs -f test/test_dag.yaml -[Start] -> d -> a -> b -> f -> c -> g -> e -> h -> [End] -``` - - - -## YAML 定义 - -YAML 定义 DAG 通过如下一个例子来说明: - -```YAML -dagrs: - a: - name: "任务1" - rely: [b, c] - b: - name: "任务2" - rely: [c] - c: - name: "任务3" - # empty if no rely -``` - -- 这里有三个任务:a,b 和 c。这里的 a,b,c 并不是 `name` ,而是标识任务的标识,可以认为是 ID。 -- 例如 a 指向 b 和 c,表示 a 在 b,c 之前执行,写做 `rely: [b, c]` 。 -- **注意,重复的 ID 会覆盖之前的任务定义。** - -最终形成的图如下,那么一个可行的执行顺序是: `A->B->C->D->E` - -img - -具体的任务定义可以后续随需要进行扩展。 - */ - -extern crate bimap; extern crate clap; +extern crate lazy_static; extern crate yaml_rust; mod dag_engine; mod error_handler; mod graph; -pub mod task; +mod task; use clap::Parser; use dag_engine::DagEngine; use task::Task; + + #[derive(Parser)] #[clap(version)] /// Command Line input @@ -79,15 +22,23 @@ struct Args { filepath: String, } + fn main() { let args = Args::parse(); let mut dagrs: DagEngine = DagEngine::new(); - if let Err(e) = dagrs.run(&args.filepath) { + if let Err(e) = dagrs.run_from_yaml(&args.filepath) { println!("[Error] {}", e); } } +#[test] +fn test_yaml() { + let mut res = DagEngine::new(); + res.run_from_yaml("test/test.yaml").unwrap(); +} + + #[test] fn test_dag() { let res = DagEngine::::new().run("test/test_dag.yaml").unwrap(); @@ -112,7 +63,7 @@ fn test_format_error1() { let res = DagEngine::::new().run("test/test_error1.yaml"); assert_eq!( res, - Err(DagError::format_error("a".into(), FormatErrorMark::NoName)) + Err(DagError::format_error(FormatError::NoName("a".into()))) ); } @@ -122,7 +73,7 @@ fn test_format_error2() { let res = DagEngine::::new().run("test/test_error2.yaml"); assert_eq!( res, - Err(DagError::format_error("".into(), FormatErrorMark::StartWordError)) + Err(DagError::format_error(FormatError::StartWordError)) ); } @@ -133,6 +84,50 @@ fn test_rely_error() { let res = DagEngine::::new().run("test/test_error3.yaml"); assert_eq!( res, - Err(DagError::format_error("a".into(), FormatErrorMark::RelyIDIllegal)) + Err(DagError::inner_error(InnerError::RelyTaskIllegal("任务1".into()))) ); +} + +#[test] +fn test_no_runscript() { + use error_handler::{DagError, FormatError}; + let res = DagEngine::new().run_from_yaml("test/test_error4.yaml"); + assert_eq!( + res, + Err(DagError::format_error(FormatError::NoRunScript("a".into()))) + ); +} + +#[test] +fn test_prom() { + use crate::task::{TaskTrait, TaskWrapper}; + + struct T1 {} + impl TaskTrait for T1 { + fn run(&self) { + println!("T1!"); + } + } + + struct T2 {} + impl TaskTrait for T2 { + fn run(&self) { + println!("T2!"); + } + } + + let mut t1 = TaskWrapper::new(T1{}, "Task 1"); + let mut t2 = TaskWrapper::new(T2{}, "Task 2"); + let mut t3 = TaskWrapper::new(T1{}, "Task 3"); + + t2.add_relys(&[&t1, &t3]); + t1.add_relys(&[&t3]); + + + let mut dag = DagEngine::new(); + dag.add_task(t1); + dag.add_task(t2); + dag.add_task(t3); + + dag.run(); } \ No newline at end of file diff --git a/dagrs/src/task.rs b/dagrs/src/task.rs index 9cb341d7232cb1716d9c86716259953c38910cbf..13122e0e93beb6cb361af9a01a0576fc8ecab3da 100644 --- a/dagrs/src/task.rs +++ b/dagrs/src/task.rs @@ -15,15 +15,11 @@ pub trait TaskTrait where Self:Sized { fn from_file(filename: &str) -> HashMap; } -/// Task Struct #[derive(Debug)] -pub struct Task { - /// Task ID, must be unique - pub ID: String, - /// Task Name, can repeat - pub name: String, - /// Dependency relations, store relied tasks' ID - pub relys: Vec, +/// Task Struct for YAML file. +struct YamlTaskInner { + /// Running Script + run: String, } impl TaskTrait for Task { @@ -76,7 +72,7 @@ impl Task { /// /// # Example /// ``` - /// let task = Task::from_yaml(id, yaml); + /// let task = Task::parse_one(id, yaml); /// ``` /// Here `id` and `yaml` comes from: /// ``` @@ -84,35 +80,42 @@ impl Task { /// let yaml_tasks = yaml_tasks[0]["dagrs"] /// .as_hash() /// .ok_or(DagError::format_error("", FormatErrorMark::StartWordError))?; - /// + /// /// for(id, yaml) in yaml_tasks{ /// ... /// } /// ``` fn from_yaml(id: &str, info: &Yaml) -> Result { // Get name first - let name = info["name"] .as_str() - .ok_or(DagError::format_error(id, FormatErrorMark::NoName))? + .ok_or(DagError::format_error(FormatError::NoName(id.to_owned())))? + .to_owned(); + + // Get run script + let run = info["run"] + .as_str() + .ok_or(DagError::format_error(FormatError::NoRunScript( + id.to_owned(), + )))? .to_owned(); // relys can be empty let mut relys = Vec::new(); if let Some(rely_tasks) = info["rely"].as_vec() { - for rely_task_id in rely_tasks { - let rely_task_id = rely_task_id - .as_str() - .ok_or(DagError::format_error(id, FormatErrorMark::RelyIDIllegal))? - .to_owned(); - relys.push(rely_task_id) - } + rely_tasks + .iter() + .map(|rely_task_id| relys.push(rely_task_id.as_str().unwrap().to_owned())) + .count(); } - Ok(Task { - ID: id.into(), + let inner = YamlTaskInner { run }; + + Ok(YamlTask { + yaml_id: id.to_string(), name, relys, + inner, }) } diff --git a/dagrs/test/test.yaml b/dagrs/test/test.yaml new file mode 100644 index 0000000000000000000000000000000000000000..e3c4888e2508534537813e1988bd3ea23fd73518 --- /dev/null +++ b/dagrs/test/test.yaml @@ -0,0 +1,4 @@ +dagrs: + a: + name: "Task 1" + run: "ls" \ No newline at end of file diff --git a/dagrs/test/test_dag.yaml b/dagrs/test/test_dag.yaml index 62ebc582962794d8dd04ef297088c68779862b28..c5f8f7064893ffda879c909e52d1297193695b06 100644 --- a/dagrs/test/test_dag.yaml +++ b/dagrs/test/test_dag.yaml @@ -2,23 +2,31 @@ dagrs: a: name: "任务1" rely: [b, c] + run: "1" b: name: "任务2" rely: [c, f, g] + run: "2" c: name: "任务3" rely: [e, g] + run: "3" d: name: "任务4" rely: [c, e] + run: "4" e: name: "任务5" rely: [h] + run: "5" f: name: "任务6" rely: [g] + run: "6" g: name: "任务7" rely: [h] + run: "7" h: - name: "任务8" \ No newline at end of file + name: "任务8" + run: "8" \ No newline at end of file diff --git a/dagrs/test/test_error1.yaml b/dagrs/test/test_error1.yaml index 3a43f677e1e53c4c08f9a16451c75489eeb7d3a5..34e3ea5ee353b2e8c2fe4ac30d3913da7d8cf801 100644 --- a/dagrs/test/test_error1.yaml +++ b/dagrs/test/test_error1.yaml @@ -2,5 +2,7 @@ dagrs: a: # no name rely: [b] + run: "echo 123" b: - name: "任务2" \ No newline at end of file + name: "任务2" + run: "echo 456" \ No newline at end of file diff --git a/dagrs/test/test_error2.yaml b/dagrs/test/test_error2.yaml index d2bfdb1a36aaedb17e4ed0f0667491e7a63b1476..6488564ba3159298b7375d0c60c0d09acd20c8d6 100644 --- a/dagrs/test/test_error2.yaml +++ b/dagrs/test/test_error2.yaml @@ -1,3 +1,4 @@ a: name: "任务1" - rely: [a] \ No newline at end of file + rely: [a] + run: "echo a" \ No newline at end of file diff --git a/dagrs/test/test_error3.yaml b/dagrs/test/test_error3.yaml index 390da24c322f3b03afaa95c51302f9afd724f887..2ce0ba83d8853a7fcaa2f0d186dd08c40a883d49 100644 --- a/dagrs/test/test_error3.yaml +++ b/dagrs/test/test_error3.yaml @@ -1,4 +1,5 @@ dagrs: a: name: "任务1" - rely: [b] \ No newline at end of file + rely: [b] + run: "echo a" \ No newline at end of file diff --git a/dagrs/test/test_error4.yaml b/dagrs/test/test_error4.yaml new file mode 100644 index 0000000000000000000000000000000000000000..b876305359442cf8366d78d1af158b34600a1bfa --- /dev/null +++ b/dagrs/test/test_error4.yaml @@ -0,0 +1,3 @@ +dagrs: + a: + name: "任务1" \ No newline at end of file diff --git a/dagrs/test/test_loop1.yaml b/dagrs/test/test_loop1.yaml index 391afa851aedda6ed63a6e4b1d6d2291bba65994..957be6b8db782413562805b40ccc850b977fe0cd 100644 --- a/dagrs/test/test_loop1.yaml +++ b/dagrs/test/test_loop1.yaml @@ -2,15 +2,20 @@ dagrs: a: name: "任务1" rely: [b, c] + run: "1" b: name: "任务2" rely: [c] + run: "2" c: name: "任务3" rely: [d] + run: "3" d: name: "任务4" rely: [e] + run: "4" e: name: "任务5" - rely: [c] \ No newline at end of file + rely: [c] + run: "4" \ No newline at end of file diff --git a/dagrs/test/test_loop2.yaml b/dagrs/test/test_loop2.yaml index 80ad3e747899746db54ff78d8e53f52cd352dc85..697430a068f0168882781d3e0d74f7353a1aa426 100644 --- a/dagrs/test/test_loop2.yaml +++ b/dagrs/test/test_loop2.yaml @@ -2,24 +2,32 @@ dagrs: a: name: "任务1" rely: [b, c] + run: "1" b: name: "任务2" rely: [c, f, g] + run: "2" c: name: "任务3" rely: [e, g] + run: "3" d: name: "任务4" rely: [c, e] + run: "4" e: name: "任务5" rely: [h] + run: "5" f: name: "任务6" rely: [g] + run: "6" g: name: "任务7" rely: [h] + run: "7" h: name: "任务8" - rely: [f] \ No newline at end of file + rely: [f] + run: "8" \ No newline at end of file