From 5a5cac4fe7970b9dad4a23622f9623ce642883ca Mon Sep 17 00:00:00 2001 From: Yinwhe Date: Wed, 18 May 2022 20:54:18 +0800 Subject: [PATCH] doc: Update docs docs: Update docs --- dagrs/Cargo.toml | 2 +- dagrs/README.md | 324 ++++++++++++------------------ dagrs/examples/hello.rs | 38 ++++ dagrs/examples/hello_env.rs | 39 ++++ dagrs/examples/hello_script.rs | 26 +++ dagrs/src/engine/dag_engine.rs | 41 +--- dagrs/src/engine/env_variables.rs | 23 ++- dagrs/src/engine/error_handler.rs | 11 +- dagrs/src/lib.rs | 17 +- dagrs/src/main.rs | 7 +- dagrs/src/task/state.rs | 36 +++- dagrs/src/task/task.rs | 69 ++++--- dagrs/src/task/yaml_task.rs | 2 +- dagrs/test/test_value_pass1.txt | 1 + 14 files changed, 366 insertions(+), 270 deletions(-) create mode 100644 dagrs/examples/hello.rs create mode 100644 dagrs/examples/hello_env.rs create mode 100644 dagrs/examples/hello_script.rs create mode 100644 dagrs/test/test_value_pass1.txt diff --git a/dagrs/Cargo.toml b/dagrs/Cargo.toml index 1fc25a45..3c3ddab1 100644 --- a/dagrs/Cargo.toml +++ b/dagrs/Cargo.toml @@ -11,7 +11,7 @@ lazy_static = "1.4.0" bimap = "0.6.1" deno_core = "0.121.0" log = "0.4.14" -simplelog = "^0.10.0" +simplelog = "0.12.0" clap = { version = "3.0.14", features = ["derive"] } anymap = "1.0.0-beta.2" crossbeam = "0.8.1" diff --git a/dagrs/README.md b/dagrs/README.md index 894ad4b3..4a2c064b 100644 --- a/dagrs/README.md +++ b/dagrs/README.md @@ -2,8 +2,6 @@ 本项目是用 Rust 写的 DAG 执行引擎,开发文档请参考:[使用 Rust 编写 DAG 执行引擎](https://openeuler.feishu.cn/docs/doccnVLprAY6vIMv6W1vgfLnfrf)。 - - ## 用法 确保 Rust 编译环境可用(`cargo build`),然后在此文件夹中运行`cargo build --release`,在`target/release/`中获取可执行文件,并将其放入PATH。 @@ -14,7 +12,6 @@ - 程序员 - 通过实现 `Task Trait` 进行任务的定义和调度运行。 - ## YAML 此部分是面向普通用户的,即用户并不通过 rust 编程,而是利用 YAML 文件对任务进行描述并调度运行。YAML 文件的一个示例如下: @@ -23,7 +20,8 @@ dagrs: a: name: 任务1 - rely: [b] + after: [b] + from: [b] run: type: sh script: ./test/test.sh @@ -38,206 +36,178 @@ dagrs: - `a,b` 是任务的标识符(也可理解为 ID),主要作为标识使用,无具体含义。该字段必须存在且不能重复(否则会覆盖早先定义)。 - `name` 是任务的名称,在后续调度时会输出到 log 中以便用户查看。该字段必须存在,可以重复。 -- `rely` 是任务的依赖关系,如 `rely: [b]` 表示 `a` 应该在 `b` **之前**执行(注意不要弄反)。该字段可以省略。 +- `after` 是任务的执行顺序,如 `after: [b]` 就表明 `a` 要在 `b` 之后执行。 +- `from` 是任务输入值的来源,`from: [b]` 表示 `a` 在开始执行时,会得到 `b` 的执行结果,以字符串的形式输入。 - `run` 是任务的内容定义,包括 `type` 和 `script` 两个子字段。该字段及其子字段必须存在。 - `type` 是任务的执行方式,当前支持 shell 执行(sh),和 deno 执行(deno)。 - `script` 是任务的执行内容,可以是具体的命令,也可以是一个文件。 -一个稍复杂的例子: +另一个实际涉及输入输出的例子: ```yaml dagrs: a: name: "任务1" - rely: [b, c] + after: [b] + from: [b] run: type: sh - script: echo a + script: echo > ./test/test_value_pass1.txt b: name: "任务2" - rely: [c, f, g] - run: - type: sh - script: echo b - c: - name: "任务3" - rely: [e, g] - run: - type: sh - script: echo c - d: - name: "任务4" - rely: [c, e] - run: - type: sh - script: echo d - e: - name: "任务5" - rely: [h] - run: - type: sh - script: echo e - f: - name: "任务6" - rely: [g] - run: - type: deno - script: Deno.core.print("f\n") - g: - name: "任务7" - rely: [h] run: type: deno - script: Deno.core.print("g\n") - h: - name: "任务8" - run: - type: sh - script: ./test/test.sh + script: let a = 1+4; a*2 ``` +在上面的描述中: +- 任务 `b` 是一个用内置 `deno` 来执行的任务,其返回值显然是 `10` +- 随后 `a` 会被执行,输入值将以字符串的形式拼接到 `script` 的最后面,即以下指令被执行: + `echo > ./test/test_value_pass1.txt 10` +- 执行结束后,会得到一个文件 `test/test_value_pass1.txt`,其中的内容就会是 `10` 。 - +**Notice:** 当前 deno 执行只支持有返回值,但输入值并未实现(`deno_core` 的一些接口问题导致)。 **如何运行?** 在编写好 YAML 文件后,可以通过 cli 进行运行: ```bash -$ ./target/debug/dagrs --help -dagrs 0.1.0 +$ ./target/release/dagrs --help +dagrs 0.2.0 Command Line input USAGE: - dagrs [OPTIONS] --filepath + dagrs [OPTIONS] + +ARGS: + YAML file path OPTIONS: - -f, --filepath YAML file path - -h, --help Print help information - -l, --logpath Log file path - -V, --version Print version information + -h, --help Print help information + -l, --logpath Log file path + -V, --version Print version information ``` -例如运行上述第一个 YAML 的情况: +例如运行上述带输入输出的 YAML 的情况: ```bash -$ ./target/debug/dagrs -f test/test_dag1.yaml -02:50:59 [INFO] [Start] -> 任务1 -> 任务2 -> [End] -02:50:59 [INFO] Executing Task[name: 任务1] -exec sh file success -02:50:59 [INFO] Task[name: 任务1] exec done, success: true, return value: -02:50:59 [INFO] Executing Task[name: 任务2] +$ ./target/release/dagrs test/test_value_pass1.yaml +08:31:43 [INFO] [Start] -> 任务2 -> 任务1 -> [End] +08:31:43 [INFO] Executing Task[name: 任务2] cargo:rerun-if-changed=/Users/wyffeiwhe/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/deno_core-0.121.0/00_primordials.js cargo:rerun-if-changed=/Users/wyffeiwhe/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/deno_core-0.121.0/01_core.js cargo:rerun-if-changed=/Users/wyffeiwhe/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/deno_core-0.121.0/02_error.js -02:50:59 [INFO] Task[name: 任务2] exec done, success: false, return value: ReferenceError: print is not defined - at :1:1 +08:31:43 [INFO] Finish Task[name: 任务2], success: true +08:31:43 [INFO] Executing Task[name: 任务1] +08:31:43 [INFO] Finish Task[name: 任务1], success: true ``` -可以看到详细的运行情况输出,同时 log 文件可在 `$HOME/.dagrs/dagrs.log$` 下找到(这是默认地址,可以通过 `-l` 选项来自定义。 +可以看到详细的运行情况输出,同时 log 文件可在 `$HOME/.dagrs/dagrs.log` 下找到(这是默认地址,可以通过 `-l` 选项来自定义。 log 文件记录任务的执行顺序以及执行结果,其内容如下: ```log -02:50:59 [INFO] [Start] -> 任务1 -> 任务2 -> [End] -02:50:59 [INFO] Executing Task[name: 任务1] -02:50:59 [INFO] Task[name: 任务1] exec done, success: true, return value: -02:50:59 [INFO] Executing Task[name: 任务2] -02:50:59 [INFO] Task[name: 任务2] exec done, success: false, return value: ReferenceError: print is not defined - at :1:1 +$ cat ~/.dagrs/dagrs.log +08:31:43 [INFO] [Start] -> 任务2 -> 任务1 -> [End] +08:31:43 [INFO] Executing Task[name: 任务2] +08:31:43 [INFO] Finish Task[name: 任务2], success: true +08:31:43 [INFO] Executing Task[name: 任务1] +08:31:43 [INFO] Finish Task[name: 任务1], success: true ``` - - ## TaskTrait -程序员可以通过实现 `TaskTrait` 来更灵活的定义自己的任务。 `TaskTrait` 的定义如下: +Rust Programmer 可以通过实现 `TaskTrait` 来更灵活的定义自己的任务。 `TaskTrait` 的定义如下: ```rust /// Task Trait. /// /// Any struct implements this trait can be added into dagrs. pub trait TaskTrait { - fn run(&self) -> Option; + fn run(&self, input: Inputval, env: EnvVar) -> Retval; } ``` - `run` 是任务的执行内容,在被调度执行时由 dagrs 调用。 -- `Retval` 是任务的返回值,现在还不支持,**请设置为 None**。 +- `input` 是任务的输入,由 `dagrs` 来管理。 +- `env` 是整个 `dagrs` 的全局变量,所有任务可见。 +- `Retval` 是任务的返回值。 - -程序员实现的 task struct 需要放到 `TaskWrapper` 中进行使用,并通过其提供的 `add_rely` 函数进行依赖设置,具体可见下方的例子。 - +程序员实现的 task struct 需要放到 `TaskWrapper` 中进行使用,并通过其提供的 `exec_after` 和 `input_from` 函数进行依赖设置,具体可见下方的例子。 **如何使用?** -一个例子如下: +一个[例子](./examples/hello.rs)如下: ```rust -use crate::task::{TaskTrait, TaskWrapper, Retval}; -// Set log file path -init_logger(Some("./dagrs.log")); +extern crate dagrs; + +use dagrs::{DagEngine, EnvVar, Inputval, Retval, TaskTrait, TaskWrapper, init_logger}; -// Define a task struct struct T1 {} + impl TaskTrait for T1 { - fn run(&self) -> Option { - println!("T1!"); - None - } + fn run(&self, _input: Inputval, _env: EnvVar) -> Retval { + let hello_dagrs = String::from("Hello Dagrs!"); + Retval::new(hello_dagrs) + } } struct T2 {} + impl TaskTrait for T2 { - fn run(&self) -> Option { - println!("T2!"); - None - } + fn run(&self, mut input: Inputval, _env: EnvVar) -> Retval { + let val = input.get::(0).unwrap(); + println!("{}", val); + Retval::empty() + } } -// Instance -let mut t1 = TaskWrapper::new(T1 {}, "Task 1"); -let mut t2 = TaskWrapper::new(T2 {}, "Task 2"); -let t3 = TaskWrapper::new(T1 {}, "Task 3"); +fn main() { + // Use dagrs provided logger + init_logger(None); + + let t1 = TaskWrapper::new(T1{}, "Task 1"); + let mut t2 = TaskWrapper::new(T2{}, "Task 2"); + let mut dagrs = DagEngine::new(); -// Set up rely -t2.add_relys(&[&t1, &t3]); -t1.add_relys(&[&t3]); + // Set up dependencies + t2.exec_after(&[&t1]); + t2.input_from(&[&t1]); -let mut dag = DagEngine::new(); -dag.add_task(t1); -dag.add_task(t2); -dag.add_task(t3); + dagrs.add_tasks(vec![t1, t2]); + assert!(dagrs.run().unwrap()) +} -dag.run().unwrap(); ``` -执行的输出如下: +运行的输出如下: ```bash -03:41:40 [INFO] [Start] -> Task 2 -> Task 1 -> Task 3 -> [End] -03:41:40 [INFO] Executing Task[name: Task 2] -T2! -03:41:40 [INFO] Executing Task[name: Task 1] -T1! -03:41:40 [INFO] Executing Task[name: Task 3] -T1! +08:45:24 [INFO] [Start] -> Task 1 -> Task 2 -> [End] +08:45:24 [INFO] Executing Task[name: Task 1] +08:45:24 [INFO] Finish Task[name: Task 1], success: true +08:45:24 [INFO] Executing Task[name: Task 2] +Hello Dagrs! +08:45:24 [INFO] Finish Task[name: Task 2], success: true ``` -log 文件如下: - -```log -03:41:40 [INFO] [Start] -> Task 2 -> Task 1 -> Task 3 -> [End] -03:41:40 [INFO] Executing Task[name: Task 2] -03:41:40 [INFO] Executing Task[name: Task 1] -03:41:40 [INFO] Executing Task[name: Task 3] -``` +一些解释: +- `input` 提供一个 `get` 方法,用来获取任务的输入值,其接受一个输入值存放的 `index`。 + - 当定义只有一个输入值来源时(如例子中 `t2` 的输入只来自 `t1`),那么将 `index` 设置为 0 即可。 + - 如果有多个来源,假设 `t3.input_from(&[&t2, &t1])`,那么 index 就是定义任务输入时,任务的排列顺序(`&[&t2, &t1]`,如 `get(0)` 就是拿 `t2` 的返回值,`get(1)` 就是拿 `t1` 的返回值。 +- `env` 提供 `get` 和 `set`,[例子参考](./examples/hello_env.rs)。 + - `set` 即设置环境变量,其名称必须是一个字符串。 + - `get` 即获取一个环境变量的值。 +- `Retval` 是任务的返回值,提供 `new` 和 `empty` 两个方法。 +**Notice:** 整个自定义的任务都应该是 `Sync` 和 `Send` 的,原因是:任务是被放到一个线程中执行调度的。 **如何运行脚本?** @@ -250,93 +220,59 @@ pub struct RunScript { executor: RunType, } -pub enum RunType { - SH, - DENO, -} ``` -`RunScript` 本身提供了 `exec` 函数,故可以将 `TaskTrait` 中的 `run` 函数实现为 `RunScript` 的 `exec` 来实现运行脚本,一个例子如下: +这里: +- `script` 即脚本,可以是命令本身("echo hello!"),也可以是脚本的路径("./test/test.sh")。 +- `executor` 是任务的执行方式,`RunType` 是一个 enum 类型: + ```rust + pub enum RunType { + SH, + DENO, + } + ``` +`RunScript` 提供了 `exec` 函数: ```rust -use crate::task::{RunScript, RunType, TaskTrait, TaskWrapper, Retval}; -// Set log file path -init_logger(Some("./dagrs.log")); +pub fn exec(&self, input: Inputval) -> Result {} +``` +如果执行正常,则将结果以 `String` 的形式返回,否则返回一个 `DagError` 的错误类型。 + +一个简单的[例子](./examples/hello_script.rs): +```rust +extern crate dagrs; + +use dagrs::{DagEngine, EnvVar, Inputval, Retval, TaskTrait, TaskWrapper, init_logger, RunScript, RunType}; + +struct T {} -struct T { - run_script: RunScript, -} -// Wrap exec in run impl TaskTrait for T { - fn run(&self) -> Option { - Some(self.run_script.exec()) - } + fn run(&self, _input: Inputval, _env: EnvVar) -> Retval { + let script = RunScript::new("echo 'Hello Dagrs!'", RunType::SH); + + let res = script.exec(None); + println!("{:?}", res); + Retval::empty() + } } -let mut t1 = TaskWrapper::new( - T { - run_script: RunScript::new("echo T1", RunType::SH), - }, - "Task 1", -); -let mut t2 = TaskWrapper::new( - T { - run_script: RunScript::new("echo T2", RunType::SH), - }, - "Task 2", -); -let t3 = TaskWrapper::new( - T { - run_script: RunScript::new(r#"Deno.core.print("T3\n")"#, RunType::DENO), - }, - "Task 3", -); - -t2.add_relys(&[&t1, &t3]); -t1.add_relys(&[&t3]); - -let mut dag = DagEngine::new(); -dag.add_task(t1); -dag.add_task(t2); -dag.add_task(t3); - -dag.run().unwrap(); -``` +fn main() { + // Use dagrs provided logger + init_logger(None); -运行结果为: + let t = TaskWrapper::new(T{}, "Task"); + let mut dagrs = DagEngine::new(); -```bash -03:47:55 [INFO] [Start] -> Task 2 -> Task 1 -> Task 3 -> [End] -03:47:55 [INFO] Executing Task[name: Task 2] -T2 -03:47:55 [INFO] Task[name: Task 2] exec done, success: true, return value: -03:47:55 [INFO] Executing Task[name: Task 1] -T1 -03:47:55 [INFO] Task[name: Task 1] exec done, success: true, return value: -03:47:55 [INFO] Executing Task[name: Task 3] -cargo:rerun-if-changed=/Users/wyffeiwhe/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/deno_core-0.121.0/00_primordials.js -cargo:rerun-if-changed=/Users/wyffeiwhe/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/deno_core-0.121.0/01_core.js -cargo:rerun-if-changed=/Users/wyffeiwhe/.cargo/registry/src/mirrors.ustc.edu.cn-61ef6e0cd06fb9b8/deno_core-0.121.0/02_error.js -T3 -03:47:55 [INFO] Task[name: Task 3] exec done, success: true, return value: Global { data: 0x7f864600d2a0, isolate_handle: IsolateHandle(IsolateAnnex { isolate: 0x0, isolate_mutex: Mutex { data: (), poisoned: false, .. } }) } + dagrs.add_tasks(vec![t]); + assert!(dagrs.run().unwrap()) +} ``` -log 文件如下: - -```log -03:47:55 [INFO] [Start] -> Task 2 -> Task 1 -> Task 3 -> [End] -03:47:55 [INFO] Executing Task[name: Task 2] -03:47:55 [INFO] Task[name: Task 2] exec done, success: true, return value: -03:47:55 [INFO] Executing Task[name: Task 1] -03:47:55 [INFO] Task[name: Task 1] exec done, success: true, return value: -03:47:55 [INFO] Executing Task[name: Task 3] -03:47:55 [INFO] Task[name: Task 3] exec done, success: true, return value: Global { data: 0x7f864600d2a0, isolate_handle: IsolateHandle(IsolateAnnex { isolate: 0x0, isolate_mutex: Mutex { data: (), poisoned: false, .. } }) } +其执行结果为: +```bash +09:12:48 [INFO] [Start] -> Task -> [End] +09:12:48 [INFO] Executing Task[name: Task] +Ok("Hello Dagrs!\n") +09:12:48 [INFO] Finish Task[name: Task], success: true ``` - -TODO: -- [ ] 增加 Task 输入输出值功能,定义输入输出值传递的条件 -- [ ] 增加 Engine 整体环境变量功能 -- [ ] 增加 Task 和 Engine 任务执行成功和失败的功能 -- [ ] 优化错误处理 -- [ ] 优化 Log \ No newline at end of file diff --git a/dagrs/examples/hello.rs b/dagrs/examples/hello.rs new file mode 100644 index 00000000..f25fd4e5 --- /dev/null +++ b/dagrs/examples/hello.rs @@ -0,0 +1,38 @@ +extern crate dagrs; + +use dagrs::{DagEngine, EnvVar, Inputval, Retval, TaskTrait, TaskWrapper, init_logger}; + +struct T1 {} + +impl TaskTrait for T1 { + fn run(&self, _input: Inputval, _env: EnvVar) -> Retval { + let hello_dagrs = String::from("Hello Dagrs!"); + Retval::new(hello_dagrs) + } +} + +struct T2 {} + +impl TaskTrait for T2 { + fn run(&self, mut input: Inputval, _env: EnvVar) -> Retval { + let val = input.get::(0).unwrap(); + println!("{}", val); + Retval::empty() + } +} + +fn main() { + // Use dagrs provided logger + init_logger(None); + + let t1 = TaskWrapper::new(T1{}, "Task 1"); + let mut t2 = TaskWrapper::new(T2{}, "Task 2"); + let mut dagrs = DagEngine::new(); + + // Set up dependencies + t2.exec_after(&[&t1]); + t2.input_from(&[&t1]); + + dagrs.add_tasks(vec![t1, t2]); + assert!(dagrs.run().unwrap()) +} diff --git a/dagrs/examples/hello_env.rs b/dagrs/examples/hello_env.rs new file mode 100644 index 00000000..94470f5f --- /dev/null +++ b/dagrs/examples/hello_env.rs @@ -0,0 +1,39 @@ +extern crate dagrs; + +use dagrs::{DagEngine, EnvVar, Inputval, Retval, TaskTrait, TaskWrapper, init_logger}; + +struct T1 {} + +impl TaskTrait for T1 { + fn run(&self, _input: Inputval, mut env: EnvVar) -> Retval { + let hello_dagrs = String::from("Hello Dagrs!"); + env.set("you_need_it", hello_dagrs); + Retval::empty() + } +} + +struct T2 {} + +impl TaskTrait for T2 { + fn run(&self, _input: Inputval, env: EnvVar) -> Retval { + let val = env.get::("you_need_it").unwrap(); + println!("{}", val); + Retval::empty() + } +} + +fn main() { + // Use dagrs provided logger + init_logger(None); + + let t1 = TaskWrapper::new(T1{}, "Task 1"); + let mut t2 = TaskWrapper::new(T2{}, "Task 2"); + let mut dagrs = DagEngine::new(); + + // Set up dependencies + t2.exec_after(&[&t1]); + t2.input_from(&[&t1]); + + dagrs.add_tasks(vec![t1, t2]); + assert!(dagrs.run().unwrap()) +} diff --git a/dagrs/examples/hello_script.rs b/dagrs/examples/hello_script.rs new file mode 100644 index 00000000..191f887e --- /dev/null +++ b/dagrs/examples/hello_script.rs @@ -0,0 +1,26 @@ +extern crate dagrs; + +use dagrs::{DagEngine, EnvVar, Inputval, Retval, TaskTrait, TaskWrapper, init_logger, RunScript, RunType}; + +struct T {} + +impl TaskTrait for T { + fn run(&self, _input: Inputval, _env: EnvVar) -> Retval { + let script = RunScript::new("echo 'Hello Dagrs!'", RunType::SH); + + let res = script.exec(None); + println!("{:?}", res); + Retval::empty() + } +} + +fn main() { + // Use dagrs provided logger + init_logger(None); + + let t = TaskWrapper::new(T{}, "Task"); + let mut dagrs = DagEngine::new(); + + dagrs.add_tasks(vec![t]); + assert!(dagrs.run().unwrap()) +} diff --git a/dagrs/src/engine/dag_engine.rs b/dagrs/src/engine/dag_engine.rs index 619f69ad..bfeb2b70 100644 --- a/dagrs/src/engine/dag_engine.rs +++ b/dagrs/src/engine/dag_engine.rs @@ -45,9 +45,7 @@ impl DagEngine { /// # Example /// ``` /// let dagrs = DagEngine::new(); - /// dagrs.add_task(task1); - /// dagrs.add_task(task2); - /// dagrs.run("test/test_dag.yaml"); + /// dagrs.add_tasks(vec![task1, task2]); /// ``` /// /// Here `task1` and `task2` are user defined task wrapped in [`TaskWrapper`]. @@ -64,7 +62,7 @@ impl DagEngine { /// # Example /// ``` /// let dagrs = DagEngine::new(); - /// dagrs.run_from_yaml("test/test_dag.yaml"); + /// dagrs.run_from_yaml("test/test_dag1.yaml"); /// ``` /// /// This method is similar to `run`, but read tasks from yaml file, @@ -74,12 +72,8 @@ impl DagEngine { self.run() } - /// Read tasks into engine throuh yaml - /// - /// # Example - /// ``` - /// let yaml_tasks = dagrs.read_task("test/test.yaml"); - /// ``` + /// Read tasks into engine through yaml. + /// /// This operation will read all info in yaml file into `dagrs.tasks` if no error occurs. fn read_tasks(&mut self, filename: &str) -> Result<(), DagError> { let tasks = YamlTask::from_yaml(filename)?; @@ -113,14 +107,14 @@ impl DagEngine { self.execstate_store.insert(id, state); } - /// Fetch given task's [`ExecState`], this won't delete it from the hash map. + /// Fetch a task's [`ExecState`], this won't delete it from the hash map. fn pull_execstate(&self, id: &usize) -> &ExecState { self.execstate_store .get(id) .expect("[Error] Pull execstate fails") } - /// Prepare given task's [`Inputval`]. + /// Prepare a task's [`Inputval`]. fn form_input(&self, id: &usize) -> Inputval { let froms = self.tasks[id].get_input_from_list(); Inputval::new( @@ -131,12 +125,8 @@ impl DagEngine { ) } - /// create rely map between tasks + /// create rely map between tasks. /// - /// # Example - /// ``` - /// dagrs.create_graph(); - /// ``` /// This operation will initialize `dagrs.rely_graph` if no error occurs. fn create_graph(&mut self) -> Result<(), DagError> { let size = self.tasks.len(); @@ -165,13 +155,10 @@ impl DagEngine { Ok(()) } - /// Check whether it's DAG or not + /// Check whether it's DAG or not. /// - /// # Example - /// ``` - /// dagrs.check_dag(); - /// ``` - /// This opeartions will judge the graph and give out a execution sequence if possible. + /// If it is a DAG, dagrs will start executing tasks in a feasible order and + /// return true when execution done, or it return a false. async fn check_dag(&mut self) -> bool { if let Some(seq) = self.rely_graph.topo_sort() { let seq = seq @@ -214,14 +201,6 @@ impl DagEngine { } /// Print possible execution sequnces. - /// - /// # Example - /// ``` - /// if let Some(seq) = self.rely_graph.topo_sort() { - /// self.print_seq(&seq); - /// ... - /// } - /// ``` fn print_seq(&self, seq: &Vec) { let mut res = String::from("[Start]"); seq.iter() diff --git a/dagrs/src/engine/env_variables.rs b/dagrs/src/engine/env_variables.rs index 5007d9ff..0ebd81cb 100644 --- a/dagrs/src/engine/env_variables.rs +++ b/dagrs/src/engine/env_variables.rs @@ -1,3 +1,5 @@ +//! Implementation for global environment variables. + use crate::task::DMap; use anymap::CloneAny; use std::{ @@ -5,15 +7,27 @@ use std::{ sync::{Arc, Mutex}, }; -// Global environment variables +/// Global environment variables. +/// +/// Since it will be shared between tasks, [`Arc`] and [`Mutex`] +/// are needed. pub struct EnvVar(Arc>>); impl EnvVar { + /// Allocate a new [`EnvVar`]. pub fn new() -> Self { Self(Arc::new(Mutex::new(HashMap::new()))) } #[allow(unused)] + /// Set a gloval variables. + /// + /// # Example + /// ```rust + /// env.set("Hello", "World".to_string()); + /// ``` + /// + /// Lock operations are wrapped inside, so no need to worry. pub fn set(&mut self, name: &str, var: H) { let mut v = DMap::new(); v.insert(var); @@ -22,6 +36,13 @@ impl EnvVar { #[allow(unused)] /// This method get needed input value from [`Inputval`]. + /// + /// # Example + /// ```rust + /// env.set("Hello", "World".to_string()); + /// let res = env.get("Hello").unwrap(); + /// assert_eq!(res, "World".to_string()); + /// ``` pub fn get(&self, name: &str) -> Option { if let Some(dmap) = self.0.lock().unwrap().get(name) { dmap.clone().remove() diff --git a/dagrs/src/engine/error_handler.rs b/dagrs/src/engine/error_handler.rs index c0d4a3d1..d47c3157 100644 --- a/dagrs/src/engine/error_handler.rs +++ b/dagrs/src/engine/error_handler.rs @@ -1,4 +1,4 @@ -//! A simple error handle, can output error type and info +//! A simple error handler implementation. use thiserror::Error; @@ -6,7 +6,6 @@ use thiserror::Error; /// A synthesis of all possible errors. pub enum DagError { /// IO Error, like file not exist, etc. - /// Here we simplefy it to a message(String). #[error("{0}")] IOError(#[from] std::io::Error), /// YAML Error, like format error, etc. @@ -51,18 +50,18 @@ impl DagError { /// /// # Example /// ``` - /// DagError::format_error(FormatError::NoName("a")); + /// DagError::format_error(YamlFormatError::NoName("a".to_string())); /// ``` - /// This will throw a error that says, task 'a' has no name field. + /// This will throw a error that says, yaml task 'a' has no name field. pub fn format_error(error: YamlFormatError) -> Self { Self::YamlError(YamlError::YamlFormatError(error)) } - /// Throw a inner error + /// Throw a running error /// /// # Example /// ``` - /// DagError::inner_error(InnerError::RelyTaskIllegal("task 1")) + /// DagError::running_error(RunningError::RelyTaskIllegal("task 1".to_string())) /// ``` /// This will throw a error that says, task with name "task 1" has non-exist rely tasks. pub fn running_error(error: RunningError) -> Self { diff --git a/dagrs/src/lib.rs b/dagrs/src/lib.rs index 542bbcaa..1b430f13 100644 --- a/dagrs/src/lib.rs +++ b/dagrs/src/lib.rs @@ -12,7 +12,7 @@ mod engine; mod task; pub use engine::{DagEngine, DagError, EnvVar, RunningError, YamlError, YamlFormatError}; -pub use task::TaskTrait; +pub use task::{Inputval, Retval, RunScript, RunType, TaskTrait, TaskWrapper}; use simplelog::*; use std::{ @@ -20,6 +20,21 @@ use std::{ fs::{create_dir, File}, }; +/// Init a logger. +/// +/// # Example +/// ```rust +/// // Default path (HOME/.dagrs/dagrs.log) +/// init_logger(None); +/// // or +/// init_logger(Some("./dagrs.log")); +/// ``` +/// +/// **Note**, this function shall only be called once. +/// +/// Default logger is [Simplelog](https://crates.io/crates/simplelog), you can +/// also use other log implementations. Just remember to initialize them before +/// running dagrs. pub fn init_logger(logpath: Option<&str>) { let logpath = if let Some(s) = logpath { s.to_owned() diff --git a/dagrs/src/main.rs b/dagrs/src/main.rs index 3a162f57..eef2f19e 100644 --- a/dagrs/src/main.rs +++ b/dagrs/src/main.rs @@ -1,5 +1,3 @@ -#![doc = include_str!("../README.md")] - use clap::Parser; use dagrs::{init_logger, DagEngine}; use log::*; @@ -9,8 +7,7 @@ use log::*; /// Command Line input struct Args { /// YAML file path - #[clap(short, long)] - filepath: String, + file: String, /// Log file path #[clap(short, long)] logpath: Option, @@ -22,7 +19,7 @@ fn main() { init_logger(args.logpath.as_deref()); - if let Err(e) = dagrs.run_from_yaml(&args.filepath) { + if let Err(e) = dagrs.run_from_yaml(&args.file) { error!("[Error] {}", e); } } diff --git a/dagrs/src/task/state.rs b/dagrs/src/task/state.rs index f7d6bc03..5bc1b474 100644 --- a/dagrs/src/task/state.rs +++ b/dagrs/src/task/state.rs @@ -6,7 +6,9 @@ pub type DMap = Map; /// Describe task's running result pub struct ExecState { + /// The execution succeed or not success: bool, + /// Return value of the execution. retval: Retval, } @@ -28,7 +30,7 @@ impl ExecState { /// Get [`ExecState`]'s return value. /// - /// This method will clone [`DMap`] stored in [`ExecState`]'s [`Retval`]. + /// This method will clone [`DMap`] that are stored in [`ExecState`]'s [`Retval`]. pub fn get_dmap(&self) -> Option { self.retval.0.clone() } @@ -47,6 +49,11 @@ impl Retval { /// /// Since the return value may be transfered between threads, /// [`Send`], [`Sync`], [`CloneAny`] is needed. + /// + /// # Example + /// ```rust + /// let retval = Retval::new(123); + /// ``` pub fn new(val: H) -> Self { let mut map = DMap::new(); assert!(map.insert(val).is_none(), "[Error] map insert fails."); @@ -54,6 +61,11 @@ impl Retval { } /// Get empty [`Retval`]. + /// + /// # Example + /// ```rust + /// let retval = Retval::empty(); + /// ``` pub fn empty() -> Self { Self(None) } @@ -67,7 +79,22 @@ impl Inputval { } #[allow(unused)] - /// This method get needed input value from [`Inputval`]. + /// This method get needed input value from [`Inputval`], + /// and, it takes an index as input. + /// + /// When input from only one task's return value, + /// just set index `0`. If from muti-tasks' return values, the index depends on + /// which return value you want. The order of the return values are the same + /// as you defined in [`input_from`] function. + /// + /// # Example + /// ```rust + /// // previous definition of `t3` + /// t3.input_from(&[&t1, &t2]); + /// // then you wanna get input + /// let input_from_t1 = input.get(0); + /// let input_from_t2 = input.get(1); + /// ``` pub fn get(&mut self, index: usize) -> Option { if let Some(Some(dmap)) = self.0.get_mut(index) { dmap.remove() @@ -76,8 +103,9 @@ impl Inputval { } } - pub fn get_iter(&self) -> Iter>> - { + /// Since [`Inputval`] can contain mult-input values, and it's implemented + /// by [`Vec`] actually, of course it can be turned into a iterater. + pub fn get_iter(&self) -> Iter>> { self.0.iter() } } diff --git a/dagrs/src/task/task.rs b/dagrs/src/task/task.rs index d72ed34e..dd0c2c6c 100644 --- a/dagrs/src/task/task.rs +++ b/dagrs/src/task/task.rs @@ -1,10 +1,10 @@ -use crate::engine::{DagError, RunningError, EnvVar}; +use crate::engine::{DagError, EnvVar, RunningError}; use super::{Inputval, Retval}; use deno_core::{serde_json, serde_v8, v8, JsRuntime, RuntimeOptions}; use lazy_static::lazy_static; use std::process::Command; -use std::sync::{Mutex}; +use std::sync::Mutex; /// Task Trait. /// @@ -14,8 +14,6 @@ pub trait TaskTrait { } /// Wrapper for task that impl [`TaskTrait`]. -/// -/// Since task will be executed in seperated threads, `send` is needed. pub struct TaskWrapper { id: usize, name: String, @@ -32,7 +30,8 @@ impl TaskWrapper { /// let t = TaskWrapper::new(Task{}, "Demo Task") /// ``` /// - /// `Task` is a struct that impl [`TaskTrait`]. + /// `Task` is a struct that impl [`TaskTrait`]. Since task will be + /// executed in seperated threads, [`send`] and [`sync`] is needed. /// /// **Note:** This method will take the ownership of struct that impl [`TaskTrait`]. pub fn new(task: impl TaskTrait + 'static + Send + Sync, name: &str) -> Self { @@ -49,7 +48,7 @@ impl TaskWrapper { /// Tasks that shall be executed before this one. /// /// # Example - /// ``` + /// ```rust /// let mut t1 = TaskWrapper::new(T1{}, "Task 1"); /// let mut t2 = TaskWrapper::new(T2{}, "Task 2"); /// t2.exec_after(&[&t1]); @@ -60,6 +59,14 @@ impl TaskWrapper { } /// Input will come from the given tasks' exec result. + /// + /// # Example + /// ```rust + /// t3.exec_after(&[&t1, &t2, &t4]) + /// t3.input_from(&[&t1, &t2]); + /// ``` + /// + /// In aboving code, t3 will have input from `t1` and `t2`'s return value. pub fn input_from(&mut self, needed: &[&TaskWrapper]) { self.input_from.extend(needed.iter().map(|t| t.get_id())) } @@ -123,7 +130,9 @@ pub struct RunScript { executor: RunType, } -/// Run script type, now a script can be run in `sh` or embeded `deno` +/// Run script type, now a script can be run in `sh` or embeded `deno`. +/// +/// **Note** this features is not quite perfect, or rather, need lots of improvements. #[derive(Debug)] pub enum RunType { SH, @@ -131,11 +140,16 @@ pub enum RunType { } impl RunScript { - /// Generate a new run script + /// Generate a new run script. /// /// # Example /// ``` - /// let r = RunScript::new("echo Hello!", RunType::SH); + /// // `script` can be a commnad + /// let r = RunScript::new("echo Hello", RunType::SH); + /// r.exec(); + /// + /// // or a script path + /// let r = RunScript::new("test/test.sh", RunType::SH); /// r.exec(); /// ``` pub fn new(script: &str, executor: RunType) -> Self { @@ -149,35 +163,38 @@ impl RunScript { /// /// # Example /// ``` - /// let r = RunScript::new("echo Hello!", RunType::SH); + /// let r = RunScript::new("echo Hello", RunType::SH); /// r.exec(); /// ``` - pub fn exec(&self, input: Inputval) -> Result { + /// If execution succeeds, it returns the result in [`String`] type, or it + /// returns a [`DagError`]. + pub fn exec(&self, input: Option) -> Result { let res = match self.executor { RunType::SH => self.run_sh(input), RunType::DENO => self.run_deno(input), }; - + res } - fn run_sh(&self, input: Inputval) -> Result { + fn run_sh(&self, input: Option) -> Result { let mut cmd = format!("{} ", self.script); - input - .get_iter() - .map(|input| { - cmd.push_str( - if let Some(dmap) = input { - if let Some(str) = dmap.get::() { - str + if let Some(input) = input { + input + .get_iter() + .map(|input| { + cmd.push_str(if let Some(dmap) = input { + if let Some(str) = dmap.get::() { + str + } else { + "" + } } else { "" - } - } else { - "" + }) }) - }).count(); - + .count(); + } let res = Command::new("sh") .arg("-c") @@ -188,7 +205,7 @@ impl RunScript { res.map_err(|err| err.into()) } - fn run_deno(&self, _input: Inputval) -> Result { + fn run_deno(&self, _input: Option) -> Result { let script = self.script.clone(); let mut context = JsRuntime::new(RuntimeOptions { ..Default::default() diff --git a/dagrs/src/task/yaml_task.rs b/dagrs/src/task/yaml_task.rs index 46f5d403..22b990f0 100644 --- a/dagrs/src/task/yaml_task.rs +++ b/dagrs/src/task/yaml_task.rs @@ -30,7 +30,7 @@ pub struct YamlTask { impl TaskTrait for YamlTaskInner { fn run(&self, input: Inputval, _env: EnvVar) -> Retval { - if let Ok(res) = self.run.exec(input) { + if let Ok(res) = self.run.exec(Some(input)) { Retval::new(res) } else { Retval::empty() diff --git a/dagrs/test/test_value_pass1.txt b/dagrs/test/test_value_pass1.txt new file mode 100644 index 00000000..f599e28b --- /dev/null +++ b/dagrs/test/test_value_pass1.txt @@ -0,0 +1 @@ +10 -- Gitee