From 6ba06f8cd9d0c447adcc288da03f716ae65da410 Mon Sep 17 00:00:00 2001 From: Yinwhe Date: Mon, 7 Feb 2022 18:44:29 +0800 Subject: [PATCH] Intern dagrs --- dagrs/Cargo.toml | 3 +- dagrs/README.md | 16 ++- dagrs/src/dag_engine.rs | 73 +++++++---- dagrs/src/error_handler.rs | 88 +++++++++---- dagrs/src/graph.rs | 64 ++++++++-- dagrs/src/main.rs | 133 ++++++++++++++++---- dagrs/src/task.rs | 65 +++++++--- dagrs/test/{test2.yaml => test_dag.yaml} | 0 dagrs/test/test_error1.yaml | 6 + dagrs/test/test_error2.yaml | 3 + dagrs/test/{test4.yaml => test_error3.yaml} | 2 +- dagrs/test/{test1.yaml => test_loop1.yaml} | 0 dagrs/test/{test3.yaml => test_loop2.yaml} | 0 13 files changed, 352 insertions(+), 101 deletions(-) rename dagrs/test/{test2.yaml => test_dag.yaml} (100%) create mode 100644 dagrs/test/test_error1.yaml create mode 100644 dagrs/test/test_error2.yaml rename dagrs/test/{test4.yaml => test_error3.yaml} (71%) rename dagrs/test/{test1.yaml => test_loop1.yaml} (100%) rename dagrs/test/{test3.yaml => test_loop2.yaml} (100%) diff --git a/dagrs/Cargo.toml b/dagrs/Cargo.toml index c06222a6..d38d983f 100644 --- a/dagrs/Cargo.toml +++ b/dagrs/Cargo.toml @@ -7,4 +7,5 @@ edition = "2021" [dependencies] yaml-rust = "0.4.5" -bimap = "0.6.1" \ No newline at end of file +bimap = "0.6.1" +clap = { version = "3.0.14", features = ["derive"] } \ No newline at end of file diff --git a/dagrs/README.md b/dagrs/README.md index 2d1c7ec3..4ab8d0cd 100644 --- a/dagrs/README.md +++ b/dagrs/README.md @@ -11,14 +11,23 @@ 命令行使用方式为: ```bash -$ dagrs +$ dagrs -h +dagrs 0.1.0 + +USAGE: + dagrs --filepath + +OPTIONS: + -f, --filepath YAML file path + -h, --help Print help information + -V, --version Print version information ``` 例如: ```bash -$ dagrs test/test2.yaml -[Start] -> a -> b -> f -> d -> c -> g -> e -> h -> [End] +$ dagrs -f test/test_dag.yaml +[Start] -> d -> a -> b -> f -> c -> g -> e -> h -> [End] ``` @@ -42,6 +51,7 @@ dagrs: - 这里有三个任务:a,b 和 c。这里的 a,b,c 并不是 `name` ,而是标识任务的标识,可以认为是 ID。 - 例如 a 指向 b 和 c,表示 a 在 b,c 之前执行,写做 `rely: [b, c]` 。 +- **注意,重复的 ID 会覆盖之前的任务定义。** 最终形成的图如下,那么一个可行的执行顺序是: `A->B->C->D->E` diff --git a/dagrs/src/dag_engine.rs b/dagrs/src/dag_engine.rs index 2f257f6c..6a3f5540 100644 --- a/dagrs/src/dag_engine.rs +++ b/dagrs/src/dag_engine.rs @@ -1,23 +1,29 @@ -/* - * @Author: Yinwhe - * @Date: 2022-01-25 18:26:14 - * @LastEditors: Yinwhe - * @LastEditTime: 2022-01-26 23:23:07 - * @Description: DAG Engine implementation - * @Copyright: Copyright (c) 2022 - */ - -use crate::{error_handler::DagError, graph::Graph, task::Task}; +//! Dag Engine is dagrs's main body + +use crate::{ + error_handler::{DagError, FormatErrorMark}, + graph::Graph, + task::Task, +}; use std::{collections::HashMap, fs::File, io::Read}; use yaml_rust::YamlLoader; +/// dagrs's function is wrapped in DagEngine struct pub struct DagEngine { + /// Store all tasks' infos tasks: HashMap, + /// Store dependency relations rely_graph: Graph, } impl DagEngine { /// Allocate a new DagEngine + /// + /// # Example + /// ``` + /// let dagrs = DagEngine::new(); + /// ``` + /// This function is usually used with `run`. pub fn new() -> DagEngine { DagEngine { tasks: HashMap::new(), @@ -25,7 +31,13 @@ impl DagEngine { } } - /// Do its job + /// Do dagrs's job + /// + /// # Example + /// ``` + /// let dagrs = DagEngine::new(); + /// dagrs.run("test/test_dag.yaml"); + /// ``` pub fn run(&mut self, tasks_list: &str) -> Result { self.read_tasks(tasks_list)?; self.create_graph()?; @@ -33,6 +45,12 @@ impl DagEngine { } /// Read tasks into engine throuh yaml + /// + /// # Example + /// ``` + /// let yaml_tasks = dagrs.read_task("test/test.yaml"); + /// ``` + /// This operation will read all info in yaml file into `dagrs.tasks` if no error occurs. fn read_tasks(&mut self, filename: &str) -> Result<(), DagError> { let mut yaml_cont = String::new(); @@ -43,12 +61,12 @@ impl DagEngine { let yaml_tasks = YamlLoader::load_from_str(&yaml_cont)?; let yaml_tasks = yaml_tasks[0]["dagrs"] .as_hash() - .ok_or(DagError::error("format error"))?; + .ok_or(DagError::format_error("", FormatErrorMark::StartWordError))?; // Read tasks for (v, w) in yaml_tasks { - let id = v.as_str().ok_or(DagError::error("task id error"))?; - let task = Task::from_yaml(w)?; + let id = v.as_str().unwrap(); // .ok_or(DagError::form("task id error"))?; + let task = Task::from_yaml(id, w)?; self.tasks.insert(id.to_owned(), task); } @@ -57,6 +75,12 @@ impl DagEngine { } /// create rely map between tasks + /// + /// # Example + /// ``` + /// dagrs.create_graph(); + /// ``` + /// This operation will initialize `dagrs.rely_graph` if no error occurs. fn create_graph(&mut self) -> Result<(), DagError> { let size = self.tasks.len(); self.rely_graph.set_graph_size(size); @@ -71,19 +95,26 @@ impl DagEngine { for (id, task) in &self.tasks { let index = self.rely_graph.find_index_by_id(id).unwrap(); - task.get_rely_list() - .iter() - .map(|t| { - self.rely_graph - .add_edge(index, self.rely_graph.find_index_by_id(t).unwrap()) - }) - .count(); + for rely_task_id in task.get_rely_list() { + let rely_index = self + .rely_graph + .find_index_by_id(&rely_task_id) + .ok_or(DagError::format_error(id, FormatErrorMark::RelyIDIllegal))?; + + self.rely_graph.add_edge(index, rely_index); + } } Ok(()) } /// Check whether it's DAG or not + /// + /// # Example + /// ``` + /// dagrs.check_dag(); + /// ``` + /// This opeartions will judge the graph and give out a execution sequence if possible. fn check_dag(&self) -> bool { self.rely_graph.topo_sort() } diff --git a/dagrs/src/error_handler.rs b/dagrs/src/error_handler.rs index 97f6c4d1..0e77daed 100644 --- a/dagrs/src/error_handler.rs +++ b/dagrs/src/error_handler.rs @@ -1,38 +1,84 @@ -/* - * @Author: Yinwhe - * @Date: 2022-01-21 11:14:18 - * @LastEditors: Yinwhe - * @LastEditTime: 2022-01-26 23:17:51 - * @Description: Simple error handler - * @Copyright: Copyright (c) 2021 - */ +//! A simple error handle, can output error type and info -use std::fmt::{Debug, Display}; -use yaml_rust::ScanError; +use std::fmt::Display; -#[derive(Debug)] -pub struct DagError(String); + +#[derive(Debug, PartialEq, Eq)] +/// Format Error may occur in yaml file +pub enum FormatErrorMark { + /// Not start with 'dagrs' + StartWordError, + /// A task have no name filed + NoName, + /// The task ID dependent on the task does not exist + RelyIDIllegal, +} + +#[derive(Debug, PartialEq, Eq)] +/// Format Error, point out which part has what error type. +pub struct FormatError { + /// which task definition has errors. + id: String, + /// Has what kinds of errors. + mark: FormatErrorMark, +} + +impl Display for FormatError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.mark { + FormatErrorMark::StartWordError => write!(f, "Not start with 'dagrs:'"), + FormatErrorMark::NoName => write!(f, "Task[ID:{}] name not found", self.id), + FormatErrorMark::RelyIDIllegal => write!(f, "Task[ID:{}] rely ID not found", self.id), + } + } +} + +#[derive(Debug, PartialEq, Eq)] +/// A synthesis of all possible errors. +pub enum DagError { + /// IO Error, like file not exist, etc. + /// Here we simplefy it to a message(String). + IOError(String), + /// Yaml Parser defined error type. + YamlParserError(yaml_rust::ScanError), + /// Format Error type. + YamlFormatError(FormatError), +} impl Display for DagError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Display::fmt(&self.0, f) + match &self { + Self::IOError(e) => e.fmt(f), + Self::YamlParserError(e) => e.fmt(f), + Self::YamlFormatError(e) => e.fmt(f), + } } } impl From for DagError { fn from(e: std::io::Error) -> Self { - DagError(e.to_string()) + Self::IOError(e.to_string()) } } -impl From for DagError { - fn from(e: ScanError) -> Self { - DagError(e.to_string()) +impl From for DagError { + fn from(e: yaml_rust::ScanError) -> Self { + Self::YamlParserError(e) } } -impl DagError{ - pub fn error(message: &str) -> DagError { - DagError(message.to_string()) +impl DagError { + /// Throw a format error + /// + /// # Example + /// ``` + /// DagError::format_error("a", FormatErrorMark::NoName); + /// ``` + /// This will throw a error that says, task 'a' has no name field. + pub fn format_error(id: &str, mark: FormatErrorMark) -> Self { + Self::YamlFormatError(FormatError { + id: id.into(), + mark, + }) } -} \ No newline at end of file +} diff --git a/dagrs/src/graph.rs b/dagrs/src/graph.rs index 0f6cdcab..bc2225d1 100644 --- a/dagrs/src/graph.rs +++ b/dagrs/src/graph.rs @@ -1,23 +1,25 @@ -/* - * @Author: Yinwhe - * @Date: 2022-01-25 17:46:18 - * @LastEditors: Yinwhe - * @LastEditTime: 2022-01-26 23:26:05 - * @Description: Graph records dependency relations between tasks - * @Copyright: Copyright (c) 2022 - */ +//! Graph stores dependency relations use bimap::BiMap; #[derive(Debug)] +/// Graph Struct pub struct Graph { - nodes: BiMap, // Record node id and it's index - adj: Vec>, // Adjacency list - indegree: Vec, // Node's indegree, used for topo sort + /// Record node id and it's index + nodes: BiMap, + /// Adjacency list + adj: Vec>, + /// Node's indegree, used for topo sort + indegree: Vec, } impl Graph { /// Allocate an empty graph + /// + /// # Example + /// ``` + /// let g = Grapg::new(); + /// ``` pub fn new() -> Graph { Graph { nodes: BiMap::new(), @@ -27,36 +29,72 @@ impl Graph { } /// Set graph size, size is the number of tasks + /// + /// # Example + /// ``` + /// let size = 10; // 10 nodes + /// g.set_graph_size(size); + /// ``` pub fn set_graph_size(&mut self, size: usize) { self.adj.resize(size, Vec::new()); self.indegree.resize(size, 0) } - /// Add a node into the graph, repetitive add can cause errors + /// Add a node into the graph + /// + /// This operation will create a mapping between ID and its index. + /// + /// # Example + /// ``` + /// g.add_node("Node1"); + /// ``` + /// **Note:** `id` won't get repeated in dagrs, + /// since yaml parser will overwrite its info if a task's ID repeats. pub fn add_node(&mut self, id: &str) { let index = self.nodes.len(); self.nodes.insert(id.to_owned(), index); } /// Add an edge into the graph + /// + /// ```Example + /// g.add_edge(0, 1); + /// ``` + /// Above operation adds a arrow from node 0 to node 1, + /// which means task 0 shall be executed before task 1. pub fn add_edge(&mut self, v: usize, w: usize) { self.adj[v].push(w); self.indegree[w] += 1; } + /// Find a task's index by its ID pub fn find_index_by_id(&self, id: &str) -> Option { self.nodes.get_by_left(id).map(|i| i.to_owned()) } + /// Find a task's ID by its index pub fn find_id_by_index(&self, index: usize) -> Option { self.nodes.get_by_right(&index).map(|n| n.to_string()) } + /// Get number of nodes in grapg pub fn get_node_num(&self) -> usize { self.nodes.len() } /// Do topo sort in graph, returns true if DAG + /// + /// # Example + /// ``` + /// g.topo_sort(); + /// ``` + /// This operation will judge whether graph is a DAG or not, + /// returns true if yes, and false if no. + /// + /// This function has output, if graph is a DAG, it will print a possible execution sequence, + /// or it will print `Loop Detected`. + /// + /// **Note**: this function can only be called after graph's initialization (add nodes and edges, etc.) is done. pub fn topo_sort(&self) -> bool { let mut queue = Vec::new(); let mut indegree = self.indegree.clone(); @@ -98,4 +136,4 @@ impl Graph { true } } -} \ No newline at end of file +} diff --git a/dagrs/src/main.rs b/dagrs/src/main.rs index fbe888f0..6bd582bc 100644 --- a/dagrs/src/main.rs +++ b/dagrs/src/main.rs @@ -1,52 +1,137 @@ -/* - * @Author: Yinwhe - * @Date: 2022-01-19 14:14:28 - * @LastEditors: Yinwhe - * @LastEditTime: 2022-02-06 13:48:41 - * @Description: dagrs +/*! +本项目是用 Rust 写的 DAG 执行引擎,开发文档请参考:[使用 Rust 编写 DAG 执行引擎](https://openeuler.feishu.cn/docs/doccnVLprAY6vIMv6W1vgfLnfrf)。 + +## 用法 + +确保 Rust 编译环境可用(`cargo build`),然后在此文件夹中运行`cargo build --release`,在`target/release/`中获取可执行文件,并将其放入PATH。 + +命令行使用方式为: + +```bash +$ dagrs -h +dagrs 0.1.0 + +USAGE: + dagrs --filepath + +OPTIONS: + -f, --filepath YAML file path + -h, --help Print help information + -V, --version Print version information +``` + +例如: + +```bash +$ dagrs -f test/test_dag.yaml +[Start] -> d -> a -> b -> f -> c -> g -> e -> h -> [End] +``` + + + +## YAML 定义 + +YAML 定义 DAG 通过如下一个例子来说明: + +```YAML +dagrs: + a: + name: "任务1" + rely: [b, c] + b: + name: "任务2" + rely: [c] + c: + name: "任务3" + # empty if no rely +``` + +- 这里有三个任务:a,b 和 c。这里的 a,b,c 并不是 `name` ,而是标识任务的标识,可以认为是 ID。 +- 例如 a 指向 b 和 c,表示 a 在 b,c 之前执行,写做 `rely: [b, c]` 。 +- **注意,重复的 ID 会覆盖之前的任务定义。** + +最终形成的图如下,那么一个可行的执行顺序是: `A->B->C->D->E` + +img + +具体的任务定义可以后续随需要进行扩展。 */ extern crate bimap; +extern crate clap; extern crate yaml_rust; mod dag_engine; mod error_handler; mod graph; -mod task; +pub mod task; +use clap::Parser; use dag_engine::DagEngine; +#[derive(Parser)] +#[clap(version)] +/// Command Line input +struct Args { + /// YAML file path + #[clap(short, long)] + filepath: String, +} + fn main() { + let args = Args::parse(); let mut dagrs = DagEngine::new(); - if let Some(filename) = std::env::args().nth(1) { - if let Err(e) = dagrs.run(&filename) { - println!("[Error] {}", e); - } - } else { - println!("Usage: dargs ") + + if let Err(e) = dagrs.run(&args.filepath) { + println!("[Error] {}", e); } } #[test] -fn test1() { - let res = DagEngine::new().run("test/test1.yaml").unwrap(); - assert_eq!(res, false); +fn test_dag() { + let res = DagEngine::new().run("test/test_dag.yaml").unwrap(); + assert_eq!(res, true) } #[test] -fn test2() { - let res = DagEngine::new().run("test/test2.yaml").unwrap(); - assert_eq!(res, true) +fn test_loop() { + let res = DagEngine::new().run("test/test_loop1.yaml").unwrap(); + assert_eq!(res, false) } #[test] -fn test3() { - let res = DagEngine::new().run("test/test3.yaml").unwrap(); +fn test_complex_loop() { + let res = DagEngine::new().run("test/test_loop2.yaml").unwrap(); assert_eq!(res, false) } #[test] -fn test4() { - let res = DagEngine::new().run("test/test4.yaml").unwrap(); - assert_eq!(res, false) +fn test_format_error1() { + use error_handler::{DagError, FormatErrorMark}; + let res = DagEngine::new().run("test/test_error1.yaml"); + assert_eq!( + res, + Err(DagError::format_error("a".into(), FormatErrorMark::NoName)) + ); +} + +#[test] +fn test_format_error2() { + use error_handler::{DagError, FormatErrorMark}; + let res = DagEngine::new().run("test/test_error2.yaml"); + assert_eq!( + res, + Err(DagError::format_error("".into(), FormatErrorMark::StartWordError)) + ); +} + + +#[test] +fn test_rely_error() { + use error_handler::{DagError, FormatErrorMark}; + let res = DagEngine::new().run("test/test_error3.yaml"); + assert_eq!( + res, + Err(DagError::format_error("a".into(), FormatErrorMark::RelyIDIllegal)) + ); } \ No newline at end of file diff --git a/dagrs/src/task.rs b/dagrs/src/task.rs index b5900dfe..892418a9 100644 --- a/dagrs/src/task.rs +++ b/dagrs/src/task.rs @@ -1,44 +1,75 @@ -/* - * @Author: Yinwhe - * @Date: 2022-01-25 17:49:19 - * @LastEditors: Yinwhe - * @LastEditTime: 2022-02-06 13:40:58 - * @Description: Task implementation - */ - -use crate::error_handler::DagError; +//! Task Implementations, used to store task infos + +use crate::error_handler::{DagError, FormatErrorMark}; use yaml_rust::Yaml; +/// Task Struct #[derive(Debug)] pub struct Task { - name: String, - relys: Vec, + /// Task ID, must be unique + pub ID: String, + /// Task Name, can repeat + pub name: String, + /// Dependency relations, store relied tasks' ID + pub relys: Vec, } impl Task { /// Parse Task from Yaml - pub fn from_yaml(info: &Yaml) -> Result { + /// + /// # Example + /// ``` + /// let task = Task::from_yaml(id, yaml); + /// ``` + /// Here `id` and `yaml` comes from: + /// ``` + /// let yaml_tasks = YamlLoader::load_from_str(&yaml_cont)?; + /// let yaml_tasks = yaml_tasks[0]["dagrs"] + /// .as_hash() + /// .ok_or(DagError::format_error("", FormatErrorMark::StartWordError))?; + /// + /// for(id, yaml) in yaml_tasks{ + /// ... + /// } + /// ``` + pub fn from_yaml(id: &str, info: &Yaml) -> Result { // Get name first + let name = info["name"] .as_str() - .ok_or(DagError::error("Task name not found"))? + .ok_or(DagError::format_error(id, FormatErrorMark::NoName))? .to_owned(); // relys can be empty let mut relys = Vec::new(); if let Some(rely_tasks) = info["rely"].as_vec() { for rely_task_id in rely_tasks { - let id = rely_task_id + let rely_task_id = rely_task_id .as_str() - .ok_or(DagError::error("Rely tasks id error"))? + .ok_or(DagError::format_error(id, FormatErrorMark::RelyIDIllegal))? .to_owned(); - relys.push(id) + relys.push(rely_task_id) } } - Ok(Task { name, relys }) + Ok(Task { + ID: id.into(), + name, + relys, + }) } + + /// Get dependency tasks list + /// + /// # Example + /// Usually used like: + /// ``` + /// let relys = tasks.get_rely_list(); + /// for rely_task in relys{ + /// ... + /// } + /// ``` pub fn get_rely_list(&self) -> &Vec { &self.relys } diff --git a/dagrs/test/test2.yaml b/dagrs/test/test_dag.yaml similarity index 100% rename from dagrs/test/test2.yaml rename to dagrs/test/test_dag.yaml diff --git a/dagrs/test/test_error1.yaml b/dagrs/test/test_error1.yaml new file mode 100644 index 00000000..3a43f677 --- /dev/null +++ b/dagrs/test/test_error1.yaml @@ -0,0 +1,6 @@ +dagrs: + a: + # no name + rely: [b] + b: + name: "任务2" \ No newline at end of file diff --git a/dagrs/test/test_error2.yaml b/dagrs/test/test_error2.yaml new file mode 100644 index 00000000..d2bfdb1a --- /dev/null +++ b/dagrs/test/test_error2.yaml @@ -0,0 +1,3 @@ +a: + name: "任务1" + rely: [a] \ No newline at end of file diff --git a/dagrs/test/test4.yaml b/dagrs/test/test_error3.yaml similarity index 71% rename from dagrs/test/test4.yaml rename to dagrs/test/test_error3.yaml index b0852cfe..390da24c 100644 --- a/dagrs/test/test4.yaml +++ b/dagrs/test/test_error3.yaml @@ -1,4 +1,4 @@ dagrs: a: name: "任务1" - rely: [a] \ No newline at end of file + rely: [b] \ No newline at end of file diff --git a/dagrs/test/test1.yaml b/dagrs/test/test_loop1.yaml similarity index 100% rename from dagrs/test/test1.yaml rename to dagrs/test/test_loop1.yaml diff --git a/dagrs/test/test3.yaml b/dagrs/test/test_loop2.yaml similarity index 100% rename from dagrs/test/test3.yaml rename to dagrs/test/test_loop2.yaml -- Gitee