From 5f90faa08b4246922a63d1e136dce6a721e0ded9 Mon Sep 17 00:00:00 2001 From: Yinwhe Date: Wed, 9 Feb 2022 17:43:07 +0800 Subject: [PATCH 1/2] dagrs add TaskTrait dagrs: Rewrite its functions dagrs: support yaml file (phase 1) dagrs: support yaml file (phase 2) dagrs: Task Trait for user and programmer --- dagrs/Cargo.toml | 2 +- dagrs/src/dag_engine.rs | 141 ++++++++++++------- dagrs/src/error_handler.rs | 105 +++++++++----- dagrs/src/graph.rs | 69 ++++------ dagrs/src/main.rs | 145 ++++++++++---------- dagrs/src/task.rs | 263 +++++++++++++++++++++++++++++++----- dagrs/test/test.yaml | 4 + dagrs/test/test_dag.yaml | 10 +- dagrs/test/test_error1.yaml | 4 +- dagrs/test/test_error2.yaml | 3 +- dagrs/test/test_error3.yaml | 3 +- dagrs/test/test_error4.yaml | 3 + dagrs/test/test_loop1.yaml | 7 +- dagrs/test/test_loop2.yaml | 10 +- 14 files changed, 517 insertions(+), 252 deletions(-) create mode 100644 dagrs/test/test.yaml create mode 100644 dagrs/test/test_error4.yaml diff --git a/dagrs/Cargo.toml b/dagrs/Cargo.toml index d38d983f..638a4812 100644 --- a/dagrs/Cargo.toml +++ b/dagrs/Cargo.toml @@ -7,5 +7,5 @@ edition = "2021" [dependencies] yaml-rust = "0.4.5" -bimap = "0.6.1" +lazy_static = "1.4.0" clap = { version = "3.0.14", features = ["derive"] } \ No newline at end of file diff --git a/dagrs/src/dag_engine.rs b/dagrs/src/dag_engine.rs index 6a3f5540..fd7add3b 100644 --- a/dagrs/src/dag_engine.rs +++ b/dagrs/src/dag_engine.rs @@ -1,29 +1,27 @@ //! Dag Engine is dagrs's main body use crate::{ - error_handler::{DagError, FormatErrorMark}, + error_handler::{DagError, InnerError}, graph::Graph, - task::Task, + task::{TaskWrapper, YamlTask}, }; -use std::{collections::HashMap, fs::File, io::Read}; -use yaml_rust::YamlLoader; +use std::collections::HashMap; /// dagrs's function is wrapped in DagEngine struct pub struct DagEngine { /// Store all tasks' infos - tasks: HashMap, + tasks: HashMap, /// Store dependency relations rely_graph: Graph, } impl DagEngine { - /// Allocate a new DagEngine - /// + /// Allocate a new DagEngine. + /// /// # Example /// ``` /// let dagrs = DagEngine::new(); /// ``` - /// This function is usually used with `run`. pub fn new() -> DagEngine { DagEngine { tasks: HashMap::new(), @@ -31,51 +29,69 @@ impl DagEngine { } } - /// Do dagrs's job - /// + /// Do dagrs's job. + /// /// # Example /// ``` /// let dagrs = DagEngine::new(); + /// dagrs.add_task(task1); + /// dagrs.add_task(task2); /// dagrs.run("test/test_dag.yaml"); /// ``` - pub fn run(&mut self, tasks_list: &str) -> Result { - self.read_tasks(tasks_list)?; + /// + /// Here `task1` and `task2` are user defined task wrapped in [`TaskWrapper`]. + /// + /// **Note:** This method must be called after all tasks have been added into dagrs. + pub fn run(&mut self) -> Result { self.create_graph()?; Ok(self.check_dag()) } + /// Do dagrs's job from yaml file. + /// + /// # Example + /// ``` + /// let dagrs = DagEngine::new(); + /// dagrs.run_from_yaml("test/test_dag.yaml"); + /// ``` + /// + /// This method is similar to `run`, but read tasks from yaml file, + /// thus no need to add tasks mannually. + pub fn run_from_yaml(&mut self, filename: &str) -> Result { + self.read_tasks(filename)?; + self.run() + } + + /// Add new task into dagrs + /// + /// # Example + /// ``` + /// let dagrs = DagEngine::new(); + /// dagrs.add_task(task1); + /// dagrs.add_task(task2); + /// dagrs.run("test/test_dag.yaml"); + /// ``` + /// + /// Here `task1` and `task2` are user defined task wrapped in [`TaskWrapper`]. + pub fn add_task(&mut self, task: TaskWrapper) { + self.tasks.insert(task.get_id(), task); + } + /// Read tasks into engine throuh yaml - /// + /// /// # Example /// ``` /// let yaml_tasks = dagrs.read_task("test/test.yaml"); /// ``` /// This operation will read all info in yaml file into `dagrs.tasks` if no error occurs. fn read_tasks(&mut self, filename: &str) -> Result<(), DagError> { - let mut yaml_cont = String::new(); - - let mut yaml_file = File::open(filename)?; - yaml_file.read_to_string(&mut yaml_cont)?; - - // Parse Yaml - let yaml_tasks = YamlLoader::load_from_str(&yaml_cont)?; - let yaml_tasks = yaml_tasks[0]["dagrs"] - .as_hash() - .ok_or(DagError::format_error("", FormatErrorMark::StartWordError))?; - - // Read tasks - for (v, w) in yaml_tasks { - let id = v.as_str().unwrap(); // .ok_or(DagError::form("task id error"))?; - let task = Task::from_yaml(id, w)?; - - self.tasks.insert(id.to_owned(), task); - } - + let tasks = YamlTask::from_yaml(filename)?; + tasks.into_iter().map(|t| self.add_task(t)).count(); Ok(()) } /// create rely map between tasks - /// + /// /// # Example /// ``` /// dagrs.create_graph(); @@ -85,23 +101,17 @@ impl DagEngine { let size = self.tasks.len(); self.rely_graph.set_graph_size(size); - // Add Node (create id - index mapping) - self.tasks - .iter() - .map(|(n, _)| self.rely_graph.add_node(n)) - .count(); - // Form Graph - for (id, task) in &self.tasks { - let index = self.rely_graph.find_index_by_id(id).unwrap(); - + for (&id, task) in self.tasks.iter() { + assert!(id < size); for rely_task_id in task.get_rely_list() { - let rely_index = self - .rely_graph - .find_index_by_id(&rely_task_id) - .ok_or(DagError::format_error(id, FormatErrorMark::RelyIDIllegal))?; - - self.rely_graph.add_edge(index, rely_index); + // Rely task existence check + if !self.tasks.contains_key(&rely_task_id) { + return Err(DagError::inner_error(InnerError::RelyTaskIllegal( + self.tasks[&id].get_name(), + ))); + } + self.rely_graph.add_edge(id, rely_task_id); } } @@ -109,13 +119,42 @@ impl DagEngine { } /// Check whether it's DAG or not - /// + /// /// # Example /// ``` /// dagrs.check_dag(); /// ``` /// This opeartions will judge the graph and give out a execution sequence if possible. fn check_dag(&self) -> bool { - self.rely_graph.topo_sort() + if let Some(seq) = self.rely_graph.topo_sort() { + self.print_seq(&seq); + seq.iter() + .map(|id| { + println!("Executing Task[name: {}]", self.tasks[id].get_name()); + self.tasks[id].run(); + }) + .count(); + true + } else { + println!("Loop Detect"); + false + } } -} \ No newline at end of file + + /// Print possible execution sequnces. + /// + /// # Example + /// ``` + /// if let Some(seq) = self.rely_graph.topo_sort() { + /// self.print_seq(&seq); + /// ... + /// } + /// ``` + fn print_seq(&self, seq: &Vec) { + print!("[Start]"); + seq.iter() + .map(|id| print!(" -> {}", self.tasks[id].get_name())) + .count(); + println!(" -> [End]"); + } +} diff --git a/dagrs/src/error_handler.rs b/dagrs/src/error_handler.rs index 0e77daed..051f02d6 100644 --- a/dagrs/src/error_handler.rs +++ b/dagrs/src/error_handler.rs @@ -2,55 +2,78 @@ use std::fmt::Display; +#[derive(Debug, PartialEq, Eq)] +/// A synthesis of all possible errors. +pub enum DagError { + /// IO Error, like file not exist, etc. + /// Here we simplefy it to a message(String). + IOError(String), + /// YAML Error, like format error, etc. + YamlError(YamlError), + /// Error that occurs when running dagrs. + InnerError(InnerError), +} #[derive(Debug, PartialEq, Eq)] -/// Format Error may occur in yaml file -pub enum FormatErrorMark { - /// Not start with 'dagrs' +/// Format Error, point out which part has what kinds of error. +pub enum FormatError { + /// Not start with 'dagrs'. StartWordError, - /// A task have no name filed - NoName, - /// The task ID dependent on the task does not exist - RelyIDIllegal, + /// A task have no name filed, `String` points out task's id. + NoName(String), + /// No run scripts + NoRunScript(String), } #[derive(Debug, PartialEq, Eq)] -/// Format Error, point out which part has what error type. -pub struct FormatError { - /// which task definition has errors. - id: String, - /// Has what kinds of errors. - mark: FormatErrorMark, +/// Error that occurs when parsing YAML file. +pub enum YamlError { + /// Yaml Parser defined error type. + YamlParserError(yaml_rust::ScanError), + /// Format Error type. + YamlFormatError(FormatError), +} + +#[derive(Debug, PartialEq, Eq)] +/// Error that occurs when running dagrs +pub enum InnerError { + /// Dependency task not exist + RelyTaskIllegal(String), } impl Display for FormatError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.mark { - FormatErrorMark::StartWordError => write!(f, "Not start with 'dagrs:'"), - FormatErrorMark::NoName => write!(f, "Task[ID:{}] name not found", self.id), - FormatErrorMark::RelyIDIllegal => write!(f, "Task[ID:{}] rely ID not found", self.id), + match self { + Self::StartWordError => write!(f, "YAML file not start with 'dagrs:'"), + Self::NoName(id) => write!(f, "Task[ID:{}] name not found", id), + Self::NoRunScript(id) => write!(f, "Task[ID:{}] run script not found", id), } } } -#[derive(Debug, PartialEq, Eq)] -/// A synthesis of all possible errors. -pub enum DagError { - /// IO Error, like file not exist, etc. - /// Here we simplefy it to a message(String). - IOError(String), - /// Yaml Parser defined error type. - YamlParserError(yaml_rust::ScanError), - /// Format Error type. - YamlFormatError(FormatError), +impl Display for YamlError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::YamlFormatError(e) => e.fmt(f), + Self::YamlParserError(e) => e.fmt(f), + } + } +} + +impl Display for InnerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::RelyTaskIllegal(name) => write!(f, "Task[Name:{}] rely tasks not exist!", name), + } + } } impl Display for DagError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self { Self::IOError(e) => e.fmt(f), - Self::YamlParserError(e) => e.fmt(f), - Self::YamlFormatError(e) => e.fmt(f), + Self::YamlError(e) => e.fmt(f), + Self::InnerError(e) => e.fmt(f), } } } @@ -63,22 +86,30 @@ impl From for DagError { impl From for DagError { fn from(e: yaml_rust::ScanError) -> Self { - Self::YamlParserError(e) + Self::YamlError(YamlError::YamlParserError(e)) } } impl DagError { /// Throw a format error - /// + /// /// # Example /// ``` - /// DagError::format_error("a", FormatErrorMark::NoName); + /// DagError::format_error(FormatError::NoName("a")); /// ``` /// This will throw a error that says, task 'a' has no name field. - pub fn format_error(id: &str, mark: FormatErrorMark) -> Self { - Self::YamlFormatError(FormatError { - id: id.into(), - mark, - }) + pub fn format_error(error: FormatError) -> Self { + Self::YamlError(YamlError::YamlFormatError(error)) + } + + /// Throw a inner error + /// + /// # Example + /// ``` + /// DagError::inner_error(InnerError::RelyTaskIllegal("task 1")) + /// ``` + /// This will throw a error that says, task with name "task 1" has non-exist rely tasks. + pub fn inner_error(error: InnerError) -> Self { + Self::InnerError(error) } } diff --git a/dagrs/src/graph.rs b/dagrs/src/graph.rs index bc2225d1..b913a068 100644 --- a/dagrs/src/graph.rs +++ b/dagrs/src/graph.rs @@ -1,12 +1,9 @@ //! Graph stores dependency relations -use bimap::BiMap; - #[derive(Debug)] /// Graph Struct pub struct Graph { - /// Record node id and it's index - nodes: BiMap, + size: usize, /// Adjacency list adj: Vec>, /// Node's indegree, used for topo sort @@ -22,7 +19,7 @@ impl Graph { /// ``` pub fn new() -> Graph { Graph { - nodes: BiMap::new(), + size: 0, adj: Vec::new(), indegree: Vec::new(), } @@ -36,25 +33,11 @@ impl Graph { /// g.set_graph_size(size); /// ``` pub fn set_graph_size(&mut self, size: usize) { + self.size = size; self.adj.resize(size, Vec::new()); self.indegree.resize(size, 0) } - /// Add a node into the graph - /// - /// This operation will create a mapping between ID and its index. - /// - /// # Example - /// ``` - /// g.add_node("Node1"); - /// ``` - /// **Note:** `id` won't get repeated in dagrs, - /// since yaml parser will overwrite its info if a task's ID repeats. - pub fn add_node(&mut self, id: &str) { - let index = self.nodes.len(); - self.nodes.insert(id.to_owned(), index); - } - /// Add an edge into the graph /// /// ```Example @@ -67,39 +50,35 @@ impl Graph { self.indegree[w] += 1; } - /// Find a task's index by its ID - pub fn find_index_by_id(&self, id: &str) -> Option { - self.nodes.get_by_left(id).map(|i| i.to_owned()) - } - - /// Find a task's ID by its index - pub fn find_id_by_index(&self, index: usize) -> Option { - self.nodes.get_by_right(&index).map(|n| n.to_string()) - } - /// Get number of nodes in grapg - pub fn get_node_num(&self) -> usize { - self.nodes.len() - } - - /// Do topo sort in graph, returns true if DAG + /// Do topo sort in graph, returns a possible execution sequnce if DAG /// /// # Example /// ``` /// g.topo_sort(); /// ``` /// This operation will judge whether graph is a DAG or not, - /// returns true if yes, and false if no. + /// returns Some(Possible Sequence) if yes, and None if no. /// - /// This function has output, if graph is a DAG, it will print a possible execution sequence, - /// or it will print `Loop Detected`. /// /// **Note**: this function can only be called after graph's initialization (add nodes and edges, etc.) is done. - pub fn topo_sort(&self) -> bool { + /// + /// # Principle + /// Reference: [Topological Sorting](https://www.jianshu.com/p/b59db381561a) + /// + /// 1. For a grapg g, we record the indgree of every node. + /// + /// 2. Each time we start from a node with zero indegree, name it N0, and N0 can be executed since it has no dependency. + /// + /// 3. And then we decrease the indegree of N0's children (those tasks depend on N0), this would create some new zero indegree nodes. + /// + /// 4. Just repeat step 2, 3 until no more zero degree nodes can be generated. + /// If all tasks have been executed, then it's a DAG, or there must be a loop in the graph. + pub fn topo_sort(&self) -> Option> { let mut queue = Vec::new(); let mut indegree = self.indegree.clone(); let mut count = 0; - let mut sequence = String::new(); + let mut sequence = vec![]; indegree .iter() @@ -114,7 +93,7 @@ impl Graph { while !queue.is_empty() { let v = queue.pop().unwrap(); - sequence.push_str(&format!(" -> {}", self.find_id_by_index(v).unwrap())); + sequence.push(v); count += 1; self.adj[v] @@ -128,12 +107,10 @@ impl Graph { .count(); } - if count < self.get_node_num() { - println!("LOOP Detected"); - false + if count < self.size { + None } else { - println!("[Start]{} -> [End]", sequence); - true + Some(sequence) } } } diff --git a/dagrs/src/main.rs b/dagrs/src/main.rs index 6bd582bc..5c63b918 100644 --- a/dagrs/src/main.rs +++ b/dagrs/src/main.rs @@ -1,74 +1,17 @@ -/*! -本项目是用 Rust 写的 DAG 执行引擎,开发文档请参考:[使用 Rust 编写 DAG 执行引擎](https://openeuler.feishu.cn/docs/doccnVLprAY6vIMv6W1vgfLnfrf)。 - -## 用法 - -确保 Rust 编译环境可用(`cargo build`),然后在此文件夹中运行`cargo build --release`,在`target/release/`中获取可执行文件,并将其放入PATH。 - -命令行使用方式为: - -```bash -$ dagrs -h -dagrs 0.1.0 - -USAGE: - dagrs --filepath - -OPTIONS: - -f, --filepath YAML file path - -h, --help Print help information - -V, --version Print version information -``` - -例如: - -```bash -$ dagrs -f test/test_dag.yaml -[Start] -> d -> a -> b -> f -> c -> g -> e -> h -> [End] -``` - - - -## YAML 定义 - -YAML 定义 DAG 通过如下一个例子来说明: - -```YAML -dagrs: - a: - name: "任务1" - rely: [b, c] - b: - name: "任务2" - rely: [c] - c: - name: "任务3" - # empty if no rely -``` - -- 这里有三个任务:a,b 和 c。这里的 a,b,c 并不是 `name` ,而是标识任务的标识,可以认为是 ID。 -- 例如 a 指向 b 和 c,表示 a 在 b,c 之前执行,写做 `rely: [b, c]` 。 -- **注意,重复的 ID 会覆盖之前的任务定义。** - -最终形成的图如下,那么一个可行的执行顺序是: `A->B->C->D->E` - -img - -具体的任务定义可以后续随需要进行扩展。 - */ - -extern crate bimap; extern crate clap; +extern crate lazy_static; extern crate yaml_rust; mod dag_engine; mod error_handler; mod graph; -pub mod task; +mod task; use clap::Parser; use dag_engine::DagEngine; + + #[derive(Parser)] #[clap(version)] /// Command Line input @@ -78,60 +21,112 @@ struct Args { filepath: String, } + fn main() { let args = Args::parse(); - let mut dagrs = DagEngine::new(); + let mut dagrs: DagEngine = DagEngine::new(); - if let Err(e) = dagrs.run(&args.filepath) { + if let Err(e) = dagrs.run_from_yaml(&args.filepath) { println!("[Error] {}", e); } } +#[test] +fn test_yaml() { + let mut res = DagEngine::new(); + res.run_from_yaml("test/test.yaml").unwrap(); +} + + #[test] fn test_dag() { - let res = DagEngine::new().run("test/test_dag.yaml").unwrap(); + let res = DagEngine::new().run_from_yaml("test/test_dag.yaml").unwrap(); assert_eq!(res, true) } #[test] fn test_loop() { - let res = DagEngine::new().run("test/test_loop1.yaml").unwrap(); + let res = DagEngine::new().run_from_yaml("test/test_loop1.yaml").unwrap(); assert_eq!(res, false) } #[test] fn test_complex_loop() { - let res = DagEngine::new().run("test/test_loop2.yaml").unwrap(); + let res = DagEngine::new().run_from_yaml("test/test_loop2.yaml").unwrap(); assert_eq!(res, false) } #[test] fn test_format_error1() { - use error_handler::{DagError, FormatErrorMark}; - let res = DagEngine::new().run("test/test_error1.yaml"); + use crate::error_handler::{DagError, FormatError}; + let res = DagEngine::new().run_from_yaml("test/test_error1.yaml"); assert_eq!( res, - Err(DagError::format_error("a".into(), FormatErrorMark::NoName)) + Err(DagError::format_error(FormatError::NoName("a".into()))) ); } #[test] fn test_format_error2() { - use error_handler::{DagError, FormatErrorMark}; - let res = DagEngine::new().run("test/test_error2.yaml"); + use error_handler::{DagError, FormatError}; + let res = DagEngine::new().run_from_yaml("test/test_error2.yaml"); assert_eq!( res, - Err(DagError::format_error("".into(), FormatErrorMark::StartWordError)) + Err(DagError::format_error(FormatError::StartWordError)) ); } #[test] fn test_rely_error() { - use error_handler::{DagError, FormatErrorMark}; - let res = DagEngine::new().run("test/test_error3.yaml"); + use error_handler::{DagError, InnerError}; + let res = DagEngine::new().run_from_yaml("test/test_error3.yaml"); assert_eq!( res, - Err(DagError::format_error("a".into(), FormatErrorMark::RelyIDIllegal)) + Err(DagError::inner_error(InnerError::RelyTaskIllegal("任务1".into()))) ); +} + +#[test] +fn test_no_runscript() { + use error_handler::{DagError, FormatError}; + let res = DagEngine::new().run_from_yaml("test/test_error4.yaml"); + assert_eq!( + res, + Err(DagError::format_error(FormatError::NoRunScript("a".into()))) + ); +} + +#[test] +fn test_prom() { + use crate::task::{TaskTrait, TaskWrapper}; + + struct T1 {} + impl TaskTrait for T1 { + fn run(&self) { + println!("T1!"); + } + } + + struct T2 {} + impl TaskTrait for T2 { + fn run(&self) { + println!("T2!"); + } + } + + let mut t1 = TaskWrapper::new(T1{}, "Task 1"); + let mut t2 = TaskWrapper::new(T2{}, "Task 2"); + let mut t3 = TaskWrapper::new(T1{}, "Task 3"); + + t2.add_relys(&[&t1, &t3]); + t1.add_relys(&[&t3]); + + + let mut dag = DagEngine::new(); + dag.add_task(t1); + dag.add_task(t2); + dag.add_task(t3); + + dag.run(); } \ No newline at end of file diff --git a/dagrs/src/task.rs b/dagrs/src/task.rs index 892418a9..0b6955c3 100644 --- a/dagrs/src/task.rs +++ b/dagrs/src/task.rs @@ -1,25 +1,151 @@ //! Task Implementations, used to store task infos -use crate::error_handler::{DagError, FormatErrorMark}; -use yaml_rust::Yaml; +use crate::error_handler::{DagError, FormatError, InnerError}; +use lazy_static::lazy_static; +use std::process::Command; +use std::sync::Mutex; +use std::{collections::HashMap, fs::File, io::Read}; +use yaml_rust::{Yaml, YamlLoader}; + +/// Task Trait. +/// +/// Any struct implements this trait can be added into dagrs. +pub trait TaskTrait { + fn run(&self); +} + +/// Wrapper for task that impl [`TaskTrait`]. +pub struct TaskWrapper { + id: usize, + name: String, + rely_list: Vec, + inner: Box, +} + +impl TaskWrapper { + /// Allocate a new TaskWrapper. + /// + /// # Example + /// ``` + /// let t = TaskWrapper::new(Task{}, "Demo Task") + /// ``` + /// + /// `Task` is a struct that impl [`TaskTrait`]. + /// + /// **Note:** This method will take the ownership of struct that impl [`TaskTrait`]. + pub fn new(task: impl TaskTrait + 'static, name: &str) -> Self { + TaskWrapper { + id: ID_ALLOCATOR.lock().unwrap().alloc(), + name: name.to_owned(), + rely_list: Vec::new(), + inner: Box::new(task), + } + } + + #[allow(unused)] + /// Tasks that shall be executed after this one. + /// + /// # Example + /// ``` + /// let mut t1 = TaskWrapper::new(T1{}, "Task 1"); + /// let mut t2 = TaskWrapper::new(T2{}, "Task 2"); + /// t2.add_relys(&[&t1]); + /// ``` + /// In above code, `t2` will be executed before `t1`. + pub fn add_relys(&mut self, relys: &[&TaskWrapper]) { + self.rely_list.extend(relys.iter().map(|t| t.get_id())) + } + + /// Tasks that shall be executed after this one. + /// + /// # Example + /// ``` + /// let mut t1 = TaskWrapper::new(T1{}, "Task 1"); + /// let mut t2 = TaskWrapper::new(T2{}, "Task 2"); + /// t2.add_relys_by_ids(&[t1.get_id()]); + /// ``` + /// Similar to `add_relys`, but this method tasks `id` rather than a task. + /// + /// In above code, `t2` will be executed before `t1`. + pub fn add_relys_by_ids(&mut self, relys: &[usize]) { + self.rely_list.extend(relys.iter()) + } + + pub fn get_rely_list(&self) -> Vec { + self.rely_list.clone() + } + + pub fn get_id(&self) -> usize { + self.id + } + + pub fn get_name(&self) -> String { + self.name.to_owned() + } + + pub fn run(&self) { + self.inner.run() + } +} + +/// IDAllocator for TaskWrapper +struct IDAllocator { + id: usize, +} + +impl IDAllocator { + pub fn alloc(&mut self) -> usize { + self.id += 1; + + // Return values + self.id - 1 + } +} + +lazy_static! { + static ref ID_ALLOCATOR: Mutex = Mutex::new(IDAllocator { id: 0 }); +} -/// Task Struct #[derive(Debug)] -pub struct Task { - /// Task ID, must be unique - pub ID: String, - /// Task Name, can repeat - pub name: String, - /// Dependency relations, store relied tasks' ID - pub relys: Vec, +/// Task Struct for YAML file. +struct YamlTaskInner { + /// Running Script + run: String, } -impl Task { - /// Parse Task from Yaml - /// +/// Task struct for YAML file. +#[derive(Debug)] +pub struct YamlTask { + /// Task's id in yaml file. + /// + /// Be careful that `yaml_id` is different from [`TaskWrapper`]'s id. + yaml_id: String, + /// Task's name. + name: String, + /// Record tasks' `yaml_id` that shall be executed after this task. + relys: Vec, + /// A field shall be wrapper into [`TaskWrapper`] later. + inner: YamlTaskInner, +} + +impl TaskTrait for YamlTaskInner { + fn run(&self) { + let output = Command::new("bash") + .arg("-c") + .arg(self.run.clone()) + .output() + .unwrap(); + println!("Exec stdout: {}", String::from_utf8(output.stdout).unwrap()); + println!("Exec stderr: {}", String::from_utf8(output.stderr).unwrap()); + } +} + +impl YamlTask { + /// Parse a task from yaml. + /// /// # Example /// ``` - /// let task = Task::from_yaml(id, yaml); + /// let task = Task::parse_one(id, yaml); /// ``` /// Here `id` and `yaml` comes from: /// ``` @@ -27,50 +153,115 @@ impl Task { /// let yaml_tasks = yaml_tasks[0]["dagrs"] /// .as_hash() /// .ok_or(DagError::format_error("", FormatErrorMark::StartWordError))?; - /// + /// /// for(id, yaml) in yaml_tasks{ /// ... /// } /// ``` - pub fn from_yaml(id: &str, info: &Yaml) -> Result { + fn parse_one(id: &str, info: &Yaml) -> Result { // Get name first - let name = info["name"] .as_str() - .ok_or(DagError::format_error(id, FormatErrorMark::NoName))? + .ok_or(DagError::format_error(FormatError::NoName(id.to_owned())))? + .to_owned(); + + // Get run script + let run = info["run"] + .as_str() + .ok_or(DagError::format_error(FormatError::NoRunScript( + id.to_owned(), + )))? .to_owned(); // relys can be empty let mut relys = Vec::new(); if let Some(rely_tasks) = info["rely"].as_vec() { - for rely_task_id in rely_tasks { - let rely_task_id = rely_task_id - .as_str() - .ok_or(DagError::format_error(id, FormatErrorMark::RelyIDIllegal))? - .to_owned(); - relys.push(rely_task_id) - } + rely_tasks + .iter() + .map(|rely_task_id| relys.push(rely_task_id.as_str().unwrap().to_owned())) + .count(); } - Ok(Task { - ID: id.into(), + let inner = YamlTaskInner { run }; + + Ok(YamlTask { + yaml_id: id.to_string(), name, relys, + inner, }) } + /// Parse all tasks from yaml file. + /// + /// # Example + /// ``` + /// let tasks = YamlTask::parse_tasks("test/test_dag.yaml")?; + /// ``` + fn parse_tasks(filename: &str) -> Result, DagError> { + let mut yaml_cont = String::new(); + + let mut yaml_file = File::open(filename)?; + yaml_file.read_to_string(&mut yaml_cont)?; + + // Parse Yaml + let yaml_tasks = YamlLoader::load_from_str(&yaml_cont)?; + let yaml_tasks = yaml_tasks[0]["dagrs"] + .as_hash() + .ok_or(DagError::format_error(FormatError::StartWordError))?; + + let mut tasks = Vec::new(); + // Read tasks + for (v, w) in yaml_tasks { + let id = v.as_str().unwrap(); + let task = YamlTask::parse_one(id, w)?; + + tasks.push(task); + } + + Ok(tasks) + } - /// Get dependency tasks list - /// + /// Parse all tasks from yaml file into format recognized by dagrs. + /// /// # Example - /// Usually used like: /// ``` - /// let relys = tasks.get_rely_list(); - /// for rely_task in relys{ - /// ... - /// } + /// let tasks = YamlTask::from_yaml(filename)?; /// ``` - pub fn get_rely_list(&self) -> &Vec { - &self.relys + /// + /// Used in [`crate::DagEngine`]. + pub fn from_yaml(filename: &str) -> Result, DagError> { + let tasks = YamlTask::parse_tasks(filename)?; + let mut res = Vec::new(); + let mut temp_hash_yaml2id = HashMap::new(); + let mut temp_hash_id2rely = HashMap::new(); + + // Wrap tasks + tasks + .into_iter() + .map(|t| { + let task = TaskWrapper::new(t.inner, &t.name); + temp_hash_id2rely.insert(task.get_id(), t.relys); + temp_hash_yaml2id.insert(t.yaml_id, task.get_id()); + res.push(task); + }) + .count(); + + // Add Dependency + for task in &mut res { + let mut relys = Vec::new(); + for rely in &temp_hash_id2rely[&task.get_id()] { + // Rely task existence check + if !temp_hash_yaml2id.contains_key(rely) { + return Err(DagError::inner_error(InnerError::RelyTaskIllegal( + task.get_name(), + ))); + } + relys.push(temp_hash_yaml2id[rely]) + } + task.add_relys_by_ids(&relys) + } + + Ok(res) } } diff --git a/dagrs/test/test.yaml b/dagrs/test/test.yaml new file mode 100644 index 00000000..e3c4888e --- /dev/null +++ b/dagrs/test/test.yaml @@ -0,0 +1,4 @@ +dagrs: + a: + name: "Task 1" + run: "ls" \ No newline at end of file diff --git a/dagrs/test/test_dag.yaml b/dagrs/test/test_dag.yaml index 62ebc582..c5f8f706 100644 --- a/dagrs/test/test_dag.yaml +++ b/dagrs/test/test_dag.yaml @@ -2,23 +2,31 @@ dagrs: a: name: "任务1" rely: [b, c] + run: "1" b: name: "任务2" rely: [c, f, g] + run: "2" c: name: "任务3" rely: [e, g] + run: "3" d: name: "任务4" rely: [c, e] + run: "4" e: name: "任务5" rely: [h] + run: "5" f: name: "任务6" rely: [g] + run: "6" g: name: "任务7" rely: [h] + run: "7" h: - name: "任务8" \ No newline at end of file + name: "任务8" + run: "8" \ No newline at end of file diff --git a/dagrs/test/test_error1.yaml b/dagrs/test/test_error1.yaml index 3a43f677..34e3ea5e 100644 --- a/dagrs/test/test_error1.yaml +++ b/dagrs/test/test_error1.yaml @@ -2,5 +2,7 @@ dagrs: a: # no name rely: [b] + run: "echo 123" b: - name: "任务2" \ No newline at end of file + name: "任务2" + run: "echo 456" \ No newline at end of file diff --git a/dagrs/test/test_error2.yaml b/dagrs/test/test_error2.yaml index d2bfdb1a..6488564b 100644 --- a/dagrs/test/test_error2.yaml +++ b/dagrs/test/test_error2.yaml @@ -1,3 +1,4 @@ a: name: "任务1" - rely: [a] \ No newline at end of file + rely: [a] + run: "echo a" \ No newline at end of file diff --git a/dagrs/test/test_error3.yaml b/dagrs/test/test_error3.yaml index 390da24c..2ce0ba83 100644 --- a/dagrs/test/test_error3.yaml +++ b/dagrs/test/test_error3.yaml @@ -1,4 +1,5 @@ dagrs: a: name: "任务1" - rely: [b] \ No newline at end of file + rely: [b] + run: "echo a" \ No newline at end of file diff --git a/dagrs/test/test_error4.yaml b/dagrs/test/test_error4.yaml new file mode 100644 index 00000000..b8763053 --- /dev/null +++ b/dagrs/test/test_error4.yaml @@ -0,0 +1,3 @@ +dagrs: + a: + name: "任务1" \ No newline at end of file diff --git a/dagrs/test/test_loop1.yaml b/dagrs/test/test_loop1.yaml index 391afa85..957be6b8 100644 --- a/dagrs/test/test_loop1.yaml +++ b/dagrs/test/test_loop1.yaml @@ -2,15 +2,20 @@ dagrs: a: name: "任务1" rely: [b, c] + run: "1" b: name: "任务2" rely: [c] + run: "2" c: name: "任务3" rely: [d] + run: "3" d: name: "任务4" rely: [e] + run: "4" e: name: "任务5" - rely: [c] \ No newline at end of file + rely: [c] + run: "4" \ No newline at end of file diff --git a/dagrs/test/test_loop2.yaml b/dagrs/test/test_loop2.yaml index 80ad3e74..697430a0 100644 --- a/dagrs/test/test_loop2.yaml +++ b/dagrs/test/test_loop2.yaml @@ -2,24 +2,32 @@ dagrs: a: name: "任务1" rely: [b, c] + run: "1" b: name: "任务2" rely: [c, f, g] + run: "2" c: name: "任务3" rely: [e, g] + run: "3" d: name: "任务4" rely: [c, e] + run: "4" e: name: "任务5" rely: [h] + run: "5" f: name: "任务6" rely: [g] + run: "6" g: name: "任务7" rely: [h] + run: "7" h: name: "任务8" - rely: [f] \ No newline at end of file + rely: [f] + run: "8" \ No newline at end of file -- Gitee From 51ab40327ff218c6fa8d77fab9dc54a2cc8f687b Mon Sep 17 00:00:00 2001 From: Yinwhe Date: Sat, 26 Feb 2022 11:17:21 +0000 Subject: [PATCH 2/2] Merge branch 'master' of gitee.com:openeuler/opensource-intern into master -- Gitee