diff --git a/dagrs/Cargo.toml b/dagrs/Cargo.toml index ab8671a09c1ff3fef46d7c84e7d2400518a11482..1fc25a4514630ea41a1d37c8b81dc3707c51ed01 100644 --- a/dagrs/Cargo.toml +++ b/dagrs/Cargo.toml @@ -15,4 +15,5 @@ simplelog = "^0.10.0" clap = { version = "3.0.14", features = ["derive"] } anymap = "1.0.0-beta.2" crossbeam = "0.8.1" -thiserror = "1.0.30" \ No newline at end of file +thiserror = "1.0.30" +tokio = { version = "1.18", features = ["rt", "rt-multi-thread"] } \ No newline at end of file diff --git a/dagrs/src/dag_engine.rs b/dagrs/src/engine/dag_engine.rs similarity index 69% rename from dagrs/src/dag_engine.rs rename to dagrs/src/engine/dag_engine.rs index 39e98b549f73366006e50d2291398e01b84c1723..619f69ad41f28020a609860c99fafb3f9ed65ee6 100644 --- a/dagrs/src/dag_engine.rs +++ b/dagrs/src/engine/dag_engine.rs @@ -1,21 +1,27 @@ //! Dag Engine is dagrs's main body -use crate::{ +use super::{ + env_variables::EnvVar, error_handler::{DagError, RunningError}, graph::Graph, - task::{ExecState, Inputval, Retval, TaskWrapper, YamlTask}, }; -use std::collections::HashMap; +use crate::task::{ExecState, Inputval, Retval, TaskWrapper, YamlTask}; +use log::*; +use std::{collections::HashMap, sync::Arc}; /// dagrs's function is wrapped in DagEngine struct pub struct DagEngine { - /// Store all tasks' infos - tasks: HashMap, + /// Store all tasks' infos. + /// + /// Arc but no mutex, because only one thread will change [`TaskWrapper`] + /// at a time. And no modification to [`TaskWrapper`] happens during the execution of it. + tasks: HashMap>, /// Store dependency relations rely_graph: Graph, /// Store a task's running result execstate_store: HashMap, - // TODO: Environment + // Environment Variables + env: EnvVar, } impl DagEngine { @@ -30,6 +36,7 @@ impl DagEngine { tasks: HashMap::new(), rely_graph: Graph::new(), execstate_store: HashMap::new(), + env: EnvVar::new(), } } @@ -48,7 +55,8 @@ impl DagEngine { /// **Note:** This method must be called after all tasks have been added into dagrs. pub fn run(&mut self) -> Result { self.create_graph()?; - Ok(self.check_dag()) + let rt = tokio::runtime::Runtime::new().unwrap(); + Ok(rt.block_on(async { self.check_dag().await })) } /// Do dagrs's job from yaml file. @@ -61,7 +69,7 @@ impl DagEngine { /// /// This method is similar to `run`, but read tasks from yaml file, /// thus no need to add tasks mannually. - fn run_from_yaml(&mut self, filename: &str) -> Result { + pub fn run_from_yaml(mut self, filename: &str) -> Result { self.read_tasks(filename)?; self.run() } @@ -75,23 +83,24 @@ impl DagEngine { /// This operation will read all info in yaml file into `dagrs.tasks` if no error occurs. fn read_tasks(&mut self, filename: &str) -> Result<(), DagError> { let tasks = YamlTask::from_yaml(filename)?; - tasks.into_iter().map(|t| self.add_task(t)).count(); + tasks.into_iter().map(|t| self.add_tasks(vec![t])).count(); Ok(()) } - /// Add new task into dagrs + /// Add new tasks into dagrs /// /// # Example /// ``` /// let dagrs = DagEngine::new(); - /// dagrs.add_task(task1); - /// dagrs.add_task(task2); - /// dagrs.run("test/test_dag.yaml"); + /// dagrs.add_tasks(vec![task1, task2]); + /// dagrs.run(); /// ``` /// /// Here `task1` and `task2` are user defined task wrapped in [`TaskWrapper`]. - pub fn add_task(&mut self, task: TaskWrapper) { - self.tasks.insert(task.get_id(), task); + pub fn add_tasks(&mut self, tasks: Vec) { + for task in tasks { + self.tasks.insert(task.get_id(), Arc::new(task)); + } } /// Push a task's [`ExecState`] into hash store @@ -113,11 +122,11 @@ impl DagEngine { /// Prepare given task's [`Inputval`]. fn form_input(&self, id: &usize) -> Inputval { - let relys = self.tasks[id].get_rely_list(); + let froms = self.tasks[id].get_input_from_list(); Inputval::new( - relys + froms .iter() - .map(|rely_id| self.pull_execstate(rely_id).get_dmap()) + .map(|from| self.pull_execstate(from).get_dmap()) .collect(), ) } @@ -143,7 +152,7 @@ impl DagEngine { for (&id, task) in self.tasks.iter() { let index = self.rely_graph.find_index_by_id(&id).unwrap(); - for rely_task_id in task.get_rely_list() { + for rely_task_id in task.get_exec_after_list() { // Rely task existence check let rely_index = self.rely_graph.find_index_by_id(&rely_task_id).ok_or( DagError::running_error(RunningError::RelyTaskIllegal(task.get_name())), @@ -163,7 +172,7 @@ impl DagEngine { /// dagrs.check_dag(); /// ``` /// This opeartions will judge the graph and give out a execution sequence if possible. - fn check_dag(&mut self) -> bool { + async fn check_dag(&mut self) -> bool { if let Some(seq) = self.rely_graph.topo_sort() { let seq = seq .into_iter() @@ -172,36 +181,31 @@ impl DagEngine { self.print_seq(&seq); // Start Executing - seq.iter() - .map(|id| { - info!("Executing Task[name: {}]", self.tasks[id].get_name()); - - // Init state is empty. - let mut state = ExecState::empty(); - crossbeam::scope(|scope| { - let task_inner = self.tasks[id].get_inner(); - - let input = self.form_input(id); - let handle = scope.spawn(|_| task_inner.lock().unwrap().run(input)); - - // Recore executing state. - state = if let Ok(val) = handle.join() { - ExecState::new(true, val) - } else { - ExecState::new(false, Retval::empty()) - }; - }) - .expect("[Error] Create child task thread fails."); - - info!( - "Finish Task[name: {}], success: {}", - self.tasks[id].get_name(), - state.success() - ); - // Push executing state in to store. - self.push_execstate(*id, state); - }) - .count(); + for id in seq { + info!("Executing Task[name: {}]", self.tasks[&id].get_name()); + + let input = self.form_input(&id); + let env = self.env.clone(); + + let task = self.tasks[&id].clone(); + let handle = tokio::spawn(async move { task.run(input, env) }); + + // Recore executing state. + let state = if let Ok(val) = handle.await { + ExecState::new(true, val) + } else { + ExecState::new(false, Retval::empty()) + }; + + info!( + "Finish Task[name: {}], success: {}", + self.tasks[&id].get_name(), + state.success() + ); + // Push executing state in to store. + self.push_execstate(id, state); + } + true } else { error!("Loop Detect"); diff --git a/dagrs/src/engine/env_variables.rs b/dagrs/src/engine/env_variables.rs new file mode 100644 index 0000000000000000000000000000000000000000..5007d9ff6046455799b466fa7417a11bf2084d10 --- /dev/null +++ b/dagrs/src/engine/env_variables.rs @@ -0,0 +1,38 @@ +use crate::task::DMap; +use anymap::CloneAny; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +// Global environment variables +pub struct EnvVar(Arc>>); + +impl EnvVar { + pub fn new() -> Self { + Self(Arc::new(Mutex::new(HashMap::new()))) + } + + #[allow(unused)] + pub fn set(&mut self, name: &str, var: H) { + let mut v = DMap::new(); + v.insert(var); + self.0.lock().unwrap().insert(name.to_owned(), v); + } + + #[allow(unused)] + /// This method get needed input value from [`Inputval`]. + pub fn get(&self, name: &str) -> Option { + if let Some(dmap) = self.0.lock().unwrap().get(name) { + dmap.clone().remove() + } else { + None + } + } +} + +impl Clone for EnvVar { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} \ No newline at end of file diff --git a/dagrs/src/error_handler.rs b/dagrs/src/engine/error_handler.rs similarity index 95% rename from dagrs/src/error_handler.rs rename to dagrs/src/engine/error_handler.rs index 99de1d248ed39d45eae9199c4264db5cd11de49c..c0d4a3d16b6435bdd0e58321878f9c3204b99998 100644 --- a/dagrs/src/error_handler.rs +++ b/dagrs/src/engine/error_handler.rs @@ -42,8 +42,8 @@ pub enum YamlError { pub enum RunningError { #[error("Task[{0}] dependency task not exist")] RelyTaskIllegal(String), - #[error("Task[{0}] run script fails")] - RunScriptFailure(String) + #[error("Task[{0}] run script fails, details: {1}")] + RunScriptFailure(String, String) } impl DagError { diff --git a/dagrs/src/graph.rs b/dagrs/src/engine/graph.rs similarity index 100% rename from dagrs/src/graph.rs rename to dagrs/src/engine/graph.rs diff --git a/dagrs/src/engine/mod.rs b/dagrs/src/engine/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..ffc5efc342951613ed811dc44c7141a65147419c --- /dev/null +++ b/dagrs/src/engine/mod.rs @@ -0,0 +1,8 @@ +mod dag_engine; +mod error_handler; +mod graph; +mod env_variables; + +pub use error_handler::*; +pub use dag_engine::DagEngine; +pub use env_variables::EnvVar; \ No newline at end of file diff --git a/dagrs/src/lib.rs b/dagrs/src/lib.rs index 7df98b40349f62acca9a23eb571fb490198c7a8f..542bbcaa2301a329e91053fa13e3c4f2c1a90cbb 100644 --- a/dagrs/src/lib.rs +++ b/dagrs/src/lib.rs @@ -1,20 +1,17 @@ +extern crate anymap; extern crate bimap; extern crate clap; +extern crate crossbeam; extern crate deno_core; extern crate lazy_static; -extern crate yaml_rust; -#[macro_use] extern crate log; -extern crate anymap; -extern crate crossbeam; extern crate simplelog; +extern crate yaml_rust; -mod dag_engine; -mod error_handler; -mod graph; +mod engine; mod task; -pub use dag_engine::DagEngine; +pub use engine::{DagEngine, DagError, EnvVar, RunningError, YamlError, YamlFormatError}; pub use task::TaskTrait; use simplelog::*; @@ -23,7 +20,7 @@ use std::{ fs::{create_dir, File}, }; -pub fn init_log(logpath: Option<&str>) { +pub fn init_logger(logpath: Option<&str>) { let logpath = if let Some(s) = logpath { s.to_owned() } else { @@ -51,21 +48,20 @@ pub fn init_log(logpath: Option<&str>) { .unwrap(); } - #[test] -fn test_prom1() { +fn test_value_pass1() { use crate::task::{Inputval, Retval, TaskTrait, TaskWrapper}; struct T1 {} impl TaskTrait for T1 { - fn run(&self, _input: Inputval) -> Retval { - println!("T1!"); + fn run(&self, _input: Inputval, _env: EnvVar) -> Retval { + println!("T1, return 1"); Retval::new(1i32) } } struct T2 {} impl TaskTrait for T2 { - fn run(&self, input: Inputval) -> Retval { + fn run(&self, mut input: Inputval, _env: EnvVar) -> Retval { let val_from_t1 = input.get::(0); println!("T2, receive: {:?}", val_from_t1); Retval::empty() @@ -75,11 +71,66 @@ fn test_prom1() { let t1 = TaskWrapper::new(T1 {}, "Task 1"); let mut t2 = TaskWrapper::new(T2 {}, "Task 2"); - t2.rely_on(&[&t1]); + t2.exec_after(&[&t1]); + t2.input_from(&[&t1]); let mut dag = DagEngine::new(); - dag.add_task(t1); - dag.add_task(t2); + dag.add_tasks(vec![t1, t2]); dag.run().unwrap(); -} \ No newline at end of file +} + +#[test] +fn test_value_pass2() { + use crate::task::{Inputval, Retval, TaskTrait, TaskWrapper}; + struct T1 {} + impl TaskTrait for T1 { + fn run(&self, _input: Inputval, mut env: EnvVar) -> Retval { + println!("T1, return 1, set env [Hello: World]"); + env.set("Hello", "World".to_string()); + Retval::new(1i32) + } + } + + struct T2 {} + impl TaskTrait for T2 { + fn run(&self, mut input: Inputval, _env: EnvVar) -> Retval { + let val_from_t1 = input.get::(0); + println!("T2, receive from T1: {:?}, return '123'", val_from_t1); + Retval::new("123".to_string()) + } + } + + struct T3 {} + impl TaskTrait for T3 { + fn run(&self, mut input: Inputval, env: EnvVar) -> Retval { + // Order of input value is the same as the order of tasks + // passed in `input_from`. + let val_from_t1 = input.get::(0); + let val_from_t2 = input.get::(1); + let eval = env.get::("Hello"); + + println!( + "T3, receive from T1: {:?}, T2: {:?}, env: {:?}", + val_from_t1, val_from_t2, eval + ); + + Retval::empty() + } + } + + let t1 = TaskWrapper::new(T1 {}, "Task 1"); + let mut t2 = TaskWrapper::new(T2 {}, "Task 2"); + let mut t3 = TaskWrapper::new(T3 {}, "Task 3"); + + t2.exec_after(&[&t1]); + t2.input_from(&[&t1]); + + t3.exec_after(&[&t1, &t2]); + t3.input_from(&[&t1, &t2]); + + let mut dag = DagEngine::new(); + dag.add_tasks(vec![t1, t2, t3]); + + dag.run().unwrap(); +} diff --git a/dagrs/src/_main.rs b/dagrs/src/main.rs similarity index 36% rename from dagrs/src/_main.rs rename to dagrs/src/main.rs index d2564d6a552de76874bdda5dc182162010f831f3..3a162f57c82d8e35f8ad7713951d4a622c9d13e7 100644 --- a/dagrs/src/_main.rs +++ b/dagrs/src/main.rs @@ -1,28 +1,8 @@ #![doc = include_str!("../README.md")] -extern crate bimap; -extern crate clap; -extern crate deno_core; -extern crate lazy_static; -extern crate yaml_rust; -#[macro_use] -extern crate log; -extern crate simplelog; -extern crate anymap; -extern crate crossbeam; - -mod dag_engine; -mod error_handler; -mod graph; -mod task; - -use std::{ - env, - fs::{create_dir, File}, -}; use clap::Parser; -use dag_engine::DagEngine; -use simplelog::*; +use dagrs::{init_logger, DagEngine}; +use log::*; #[derive(Parser)] #[clap(version)] @@ -38,7 +18,7 @@ struct Args { fn main() { let args = Args::parse(); - let mut dagrs: DagEngine = DagEngine::new(); + let dagrs: DagEngine = DagEngine::new(); init_logger(args.logpath.as_deref()); @@ -47,6 +27,14 @@ fn main() { } } +#[test] +fn test_dag() { + let res = DagEngine::new() + .run_from_yaml("test/test_dag2.yaml") + .unwrap(); + assert_eq!(res, true) +} + #[test] fn test_runscript() { let res = DagEngine::new() @@ -56,13 +44,50 @@ fn test_runscript() { } #[test] -fn test_dag() { +fn test_value_pass1() { + use std::fs::File; + use std::io::Read; + let res = DagEngine::new() - .run_from_yaml("test/test_dag2.yaml") + .run_from_yaml("test/test_value_pass1.yaml") .unwrap(); - assert_eq!(res, true) + assert_eq!(res, true); + + let mut buf = String::new(); + File::open("./test/test_value_pass1.txt") + .expect("Test Fails, File not exist.") + .read_to_string(&mut buf) + .expect("Test Fails, Read file fails."); + + assert_eq!(buf, "10\n"); } +#[test] +fn test_value_pass2() { + use std::fs::File; + use std::io::Read; + + let res = DagEngine::new() + .run_from_yaml("test/test_value_pass2.yaml") + .unwrap(); + assert_eq!(res, true); + + let mut buf1 = String::new(); + File::open("./test/test_value_pass2.txt") + .expect("Test Fails, File not exist.") + .read_to_string(&mut buf1) + .expect("Test Fails, Read file fails."); + + let mut buf2 = String::new(); + File::open("./README.md") + .expect("Test Fails, File not exist.") + .read_to_string(&mut buf2) + .expect("Test Fails, Read file fails."); + + assert_eq!(buf1, buf2); +} + + #[test] fn test_loop() { let res = DagEngine::new() @@ -81,121 +106,50 @@ fn test_complex_loop() { #[test] fn test_format_error1() { - use crate::error_handler::{DagError, FormatError}; + use dagrs::{DagError, YamlError, YamlFormatError}; let res = DagEngine::new().run_from_yaml("test/test_error1.yaml"); - assert_eq!( + + assert!(matches!( res, - Err(DagError::format_error(FormatError::NoName("a".into()))) - ); + Err(DagError::YamlError(YamlError::YamlFormatError( + YamlFormatError::NoName(_) + ))) + )); } #[test] fn test_format_error2() { - use error_handler::{DagError, FormatError}; + use dagrs::{DagError, YamlError, YamlFormatError}; let res = DagEngine::new().run_from_yaml("test/test_error2.yaml"); - assert_eq!( + + assert!(matches!( res, - Err(DagError::format_error(FormatError::StartWordError)) - ); + Err(DagError::YamlError(YamlError::YamlFormatError( + YamlFormatError::StartWordError + ))) + )); } #[test] fn test_rely_error() { - use error_handler::{DagError, InnerError}; + use dagrs::{DagError, RunningError}; let res = DagEngine::new().run_from_yaml("test/test_error3.yaml"); - assert_eq!( + + assert!(matches!( res, - Err(DagError::inner_error(InnerError::RelyTaskIllegal( - "任务1".into() - ))) - ); + Err(DagError::RunningError(RunningError::RelyTaskIllegal(_))) + )); } #[test] fn test_no_runscript() { - use error_handler::{DagError, FormatError}; + use dagrs::{DagError, YamlError, YamlFormatError}; let res = DagEngine::new().run_from_yaml("test/test_error4.yaml"); - assert_eq!( + + assert!(matches!( res, - Err(DagError::format_error(FormatError::RunScriptError( - "a".into() + Err(DagError::YamlError(YamlError::YamlFormatError( + YamlFormatError::RunScriptError(_) ))) - ); -} - -#[test] -fn test_prom1() { - use crate::task::{Retval, TaskTrait, TaskWrapper, Inputval}; - struct T1 {} - impl TaskTrait for T1 { - fn run(&self, input: Inputval) -> Retval { - println!("T1!"); - Retval::empty() - } - } - - struct T2 {} - impl TaskTrait for T2 { - fn run(&self, input: Inputval) -> Retval { - println!("T2!"); - Retval::empty() - } - } - - let mut t1 = TaskWrapper::new(T1 {}, "Task 1"); - let mut t2 = TaskWrapper::new(T2 {}, "Task 2"); - let t3 = TaskWrapper::new(T1 {}, "Task 3"); - - t2.add_relys(&[&t1, &t3]); - t1.add_relys(&[&t3]); - - let mut dag = DagEngine::new(); - dag.add_task(t1); - dag.add_task(t2); - dag.add_task(t3); - - dag.run().unwrap(); + )); } - -// #[test] -// fn test_prom2() { -// use crate::task::{Retval, RunScript, RunType, TaskTrait, TaskWrapper}; -// struct T { -// run_script: RunScript, -// } - -// impl TaskTrait for T { -// fn run(&self) -> Option { -// Some(self.run_script.exec()) -// } -// } - -// let mut t1 = TaskWrapper::new( -// T { -// run_script: RunScript::new("echo T1", RunType::SH), -// }, -// "Task 1", -// ); -// let mut t2 = TaskWrapper::new( -// T { -// run_script: RunScript::new("echo T2", RunType::SH), -// }, -// "Task 2", -// ); -// let t3 = TaskWrapper::new( -// T { -// run_script: RunScript::new(r#"Deno.core.print("T3\n")"#, RunType::DENO), -// }, -// "Task 3", -// ); - -// t2.add_relys(&[&t1, &t3]); -// t1.add_relys(&[&t3]); - -// let mut dag = DagEngine::new(); -// dag.add_task(t1); -// dag.add_task(t2); -// dag.add_task(t3); - -// dag.run().unwrap(); -// } diff --git a/dagrs/src/task/mod.rs b/dagrs/src/task/mod.rs index 5f6fd4562930346570cfcece5c1160cb4f35696a..9727315b6c6280edd18ea7b5c031fdcc5770294f 100644 --- a/dagrs/src/task/mod.rs +++ b/dagrs/src/task/mod.rs @@ -1,8 +1,7 @@ pub use self::task::*; pub use self::yaml_task::YamlTask; pub use self::state::Retval; -pub use self::state::Inputval; -pub use self::state::ExecState; +pub use self::state::{Inputval, ExecState, DMap}; mod task; mod yaml_task; diff --git a/dagrs/src/task/state.rs b/dagrs/src/task/state.rs index 6e023a7fc4722a7851d5c882705cb848b463d5ee..f7d6bc03ee6db29dd6f09f0fa928da3611b5ae67 100644 --- a/dagrs/src/task/state.rs +++ b/dagrs/src/task/state.rs @@ -1,6 +1,8 @@ +use std::slice::Iter; + use anymap::{CloneAny, Map}; -type DMap = Map; +pub type DMap = Map; /// Describe task's running result pub struct ExecState { @@ -16,31 +18,23 @@ pub struct Inputval(Vec>); impl ExecState { /// Get a new [`ExecState`]. - /// + /// /// `success`: task finish without panic? - /// + /// /// `retval`: task's return value pub fn new(success: bool, retval: Retval) -> Self { Self { success, retval } } - /// Get empty [`ExecState`]. - pub fn empty() -> Self { - Self { - success: false, - retval: Retval::empty(), - } - } - /// Get [`ExecState`]'s return value. - /// + /// /// This method will clone [`DMap`] stored in [`ExecState`]'s [`Retval`]. pub fn get_dmap(&self) -> Option { self.retval.0.clone() } /// The task execution succeed or not. - /// + /// /// `true` means no panic occurs. pub fn success(&self) -> bool { self.success @@ -50,7 +44,7 @@ impl ExecState { impl Retval { #[allow(unused)] /// Get a new [`Retval`]. - /// + /// /// Since the return value may be transfered between threads, /// [`Send`], [`Sync`], [`CloneAny`] is needed. pub fn new(val: H) -> Self { @@ -66,7 +60,7 @@ impl Retval { } impl Inputval { - /// Get a new [`Inputval`], values stored in vector are ordered + /// Get a new [`Inputval`], values stored in vector are ordered /// by that of the given [`TaskWrapper`]'s `rely_list`. pub fn new(vals: Vec>) -> Self { Self(vals) @@ -74,11 +68,16 @@ impl Inputval { #[allow(unused)] /// This method get needed input value from [`Inputval`]. - pub fn get(&self, index: usize) -> Option<&H> { - if let Some(Some(dmap)) = self.0.get(index) { - dmap.get::() + pub fn get(&mut self, index: usize) -> Option { + if let Some(Some(dmap)) = self.0.get_mut(index) { + dmap.remove() } else { None } } + + pub fn get_iter(&self) -> Iter>> + { + self.0.iter() + } } diff --git a/dagrs/src/task/task.rs b/dagrs/src/task/task.rs index 8a07085ed36cc04697cf7a218cbdbf081deb2d25..d72ed34ec9ecf4289a494d4641f06c25769f6946 100644 --- a/dagrs/src/task/task.rs +++ b/dagrs/src/task/task.rs @@ -1,27 +1,27 @@ -use crate::error_handler::DagError; +use crate::engine::{DagError, RunningError, EnvVar}; -use super::{ExecState, Retval, Inputval}; -use deno_core::{JsRuntime, RuntimeOptions}; +use super::{Inputval, Retval}; +use deno_core::{serde_json, serde_v8, v8, JsRuntime, RuntimeOptions}; use lazy_static::lazy_static; use std::process::Command; -use std::sync::Mutex; -use std::thread; +use std::sync::{Mutex}; /// Task Trait. /// /// Any struct implements this trait can be added into dagrs. pub trait TaskTrait { - fn run(&self, input: Inputval) -> Retval; + fn run(&self, input: Inputval, env: EnvVar) -> Retval; } /// Wrapper for task that impl [`TaskTrait`]. -/// +/// /// Since task will be executed in seperated threads, `send` is needed. pub struct TaskWrapper { id: usize, name: String, - rely_list: Vec, - inner: Mutex>, + exec_after: Vec, + input_from: Vec, + inner: Box, } impl TaskWrapper { @@ -35,46 +35,53 @@ impl TaskWrapper { /// `Task` is a struct that impl [`TaskTrait`]. /// /// **Note:** This method will take the ownership of struct that impl [`TaskTrait`]. - pub fn new(task: impl TaskTrait + 'static + Send, name: &str) -> Self { + pub fn new(task: impl TaskTrait + 'static + Send + Sync, name: &str) -> Self { TaskWrapper { id: ID_ALLOCATOR.lock().unwrap().alloc(), name: name.to_owned(), - rely_list: Vec::new(), - inner: Mutex::new(Box::new(task)), + exec_after: Vec::new(), + input_from: Vec::new(), + inner: Box::new(task), } } #[allow(unused)] - /// Tasks that shall be executed after this one. + /// Tasks that shall be executed before this one. /// /// # Example /// ``` /// let mut t1 = TaskWrapper::new(T1{}, "Task 1"); /// let mut t2 = TaskWrapper::new(T2{}, "Task 2"); - /// t2.add_relys(&[&t1]); + /// t2.exec_after(&[&t1]); /// ``` - /// In above code, `t2` will be executed before `t1`. - pub fn rely_on(&mut self, relys: &[&TaskWrapper]) { - self.rely_list.extend(relys.iter().map(|t| t.get_id())) + /// In above code, `t1` will be executed before `t2`. + pub fn exec_after(&mut self, relys: &[&TaskWrapper]) { + self.exec_after.extend(relys.iter().map(|t| t.get_id())) } - /// Tasks that shall be executed after this one. - /// - /// # Example - /// ``` - /// let mut t1 = TaskWrapper::new(T1{}, "Task 1"); - /// let mut t2 = TaskWrapper::new(T2{}, "Task 2"); - /// t2.add_relys_by_ids(&[t1.get_id()]); - /// ``` - /// Similar to `add_relys`, but this method tasks `id` rather than a task. - /// - /// In above code, `t2` will be executed before `t1`. - pub fn add_relys_by_ids(&mut self, relys: &[usize]) { - self.rely_list.extend(relys.iter()) + /// Input will come from the given tasks' exec result. + pub fn input_from(&mut self, needed: &[&TaskWrapper]) { + self.input_from.extend(needed.iter().map(|t| t.get_id())) + } + + /// The same as `exec_after`, but input are tasks' ids + /// rather than reference to [`TaskWrapper`]. + pub fn exec_after_id(&mut self, relys: &[usize]) { + self.exec_after.extend(relys) + } + + /// The same as `input_from`, but input are tasks' ids + /// rather than reference to [`TaskWrapper`]. + pub fn input_from_id(&mut self, needed: &[usize]) { + self.input_from.extend(needed) } - pub fn get_rely_list(&self) -> Vec { - self.rely_list.clone() + pub fn get_exec_after_list(&self) -> Vec { + self.exec_after.clone() + } + + pub fn get_input_from_list(&self) -> Vec { + self.input_from.clone() } pub fn get_id(&self) -> usize { @@ -85,8 +92,8 @@ impl TaskWrapper { self.name.to_owned() } - pub fn get_inner(&self) -> &Mutex> { - &self.inner + pub fn run(&self, input: Inputval, env: EnvVar) -> Retval { + self.inner.run(input, env) } } @@ -106,10 +113,9 @@ impl IDAllocator { lazy_static! { /// Instance of IDAllocator - static ref ID_ALLOCATOR: Mutex = Mutex::new(IDAllocator { id: 0 }); + static ref ID_ALLOCATOR: Mutex = Mutex::new(IDAllocator { id: 1 }); } - /// Can be used to run a script cmd or file. #[derive(Debug)] pub struct RunScript { @@ -146,33 +152,66 @@ impl RunScript { /// let r = RunScript::new("echo Hello!", RunType::SH); /// r.exec(); /// ``` - pub fn exec(&self) -> Result { - match self.executor { - RunType::SH => self.run_sh(), - RunType::DENO => self.run_deno(), - } + pub fn exec(&self, input: Inputval) -> Result { + let res = match self.executor { + RunType::SH => self.run_sh(input), + RunType::DENO => self.run_deno(input), + }; + + res } - fn run_sh(&self) -> Result { + fn run_sh(&self, input: Inputval) -> Result { + let mut cmd = format!("{} ", self.script); + input + .get_iter() + .map(|input| { + cmd.push_str( + if let Some(dmap) = input { + if let Some(str) = dmap.get::() { + str + } else { + "" + } + } else { + "" + }) + }).count(); + + let res = Command::new("sh") .arg("-c") - .arg(&self.script) + .arg(&cmd) .output() .map(|output| format!("{}", String::from_utf8(output.stdout).unwrap())); - + res.map_err(|err| err.into()) } - fn run_deno(&self) -> Result { + fn run_deno(&self, _input: Inputval) -> Result { let script = self.script.clone(); - let handle = thread::spawn(move || { - let output = JsRuntime::new(RuntimeOptions { - ..Default::default() - }) - .execute_script("", &script); + let mut context = JsRuntime::new(RuntimeOptions { + ..Default::default() }); - - // TODO - unimplemented!() + match context.execute_script("", &script) { + Ok(global) => { + let scope = &mut context.handle_scope(); + let local = v8::Local::new(scope, global); + + let deserialized_value = serde_v8::from_v8::(scope, local); + + match deserialized_value { + Ok(value) => Ok(value.to_string()), + Err(err) => Err(DagError::running_error(RunningError::RunScriptFailure( + "?".into(), + format!("Cannot deserialize value: {:?}", err), + ))), + } + } + Err(err) => Err(DagError::running_error(RunningError::RunScriptFailure( + "?".into(), + format!("{:?}", err), + ))), + } } } diff --git a/dagrs/src/task/yaml_task.rs b/dagrs/src/task/yaml_task.rs index 23a99915904c86cd8f7db2d9c52161efe7c9ef3f..46f5d403ea3ebbd0070c910c3324370f05729e7a 100644 --- a/dagrs/src/task/yaml_task.rs +++ b/dagrs/src/task/yaml_task.rs @@ -1,6 +1,6 @@ -use super::{Retval, RunScript, RunType, TaskTrait, TaskWrapper, Inputval}; -use crate::error_handler::{DagError, YamlFormatError, RunningError}; -use std::{collections::HashMap, fs::File, io::Read}; +use super::{Inputval, Retval, RunScript, RunType, TaskTrait, TaskWrapper}; +use crate::engine::{DagError, YamlFormatError, EnvVar}; +use std::{cell::Cell, collections::HashMap, fs::File, io::Read}; use yaml_rust::{Yaml, YamlLoader}; #[derive(Debug)] @@ -11,7 +11,6 @@ struct YamlTaskInner { } /// Task struct for YAML file. -#[derive(Debug)] pub struct YamlTask { /// Task's id in yaml file. /// @@ -19,16 +18,23 @@ pub struct YamlTask { yaml_id: String, /// Task's name. name: String, - /// Record tasks' `yaml_id` that shall be executed after this task. - relys: Vec, + /// Record tasks' `yaml_id` that shall be executed before this task. + afters: Vec, + /// Record tasks' `yaml_id` that shall give their execution results to this task. + froms: Vec, /// A field shall be wrapper into [`TaskWrapper`] later. - inner: YamlTaskInner, + /// + /// Why [`Cell`] and [`Option`]? Useful in funtion `from_yaml`. + inner: Cell>, } impl TaskTrait for YamlTaskInner { - fn run(&self, input: Inputval) -> Retval { - // TODO - Retval::empty() + fn run(&self, input: Inputval, _env: EnvVar) -> Retval { + if let Ok(res) = self.run.exec(input) { + Retval::new(res) + } else { + Retval::empty() + } } } @@ -54,7 +60,9 @@ impl YamlTask { // Get name first let name = info["name"] .as_str() - .ok_or(DagError::format_error(YamlFormatError::NoName(id.to_owned())))? + .ok_or(DagError::format_error(YamlFormatError::NoName( + id.to_owned(), + )))? .to_owned(); // Get run script @@ -72,30 +80,37 @@ impl YamlTask { } }; - let run_script = - run["script"] - .as_str() - .ok_or(DagError::format_error(YamlFormatError::RunScriptError( - id.into(), - )))?; + let run_script = run["script"].as_str().ok_or(DagError::format_error( + YamlFormatError::RunScriptError(id.into()), + ))?; - // relys can be empty - let mut relys = Vec::new(); - if let Some(rely_tasks) = info["rely"].as_vec() { - rely_tasks + // afters can be empty + let mut afters = Vec::new(); + if let Some(after_tasks) = info["after"].as_vec() { + after_tasks .iter() - .map(|rely_task_id| relys.push(rely_task_id.as_str().unwrap().to_owned())) + .map(|task_id| afters.push(task_id.as_str().unwrap().to_owned())) .count(); } - let inner = YamlTaskInner { + // froms can be empty, too + let mut froms = Vec::new(); + if let Some(from_tasks) = info["from"].as_vec() { + from_tasks + .iter() + .map(|task_id| froms.push(task_id.as_str().unwrap().to_owned())) + .count(); + } + + let inner = Cell::new(Some(YamlTaskInner { run: RunScript::new(run_script, executor), - }; + })); Ok(YamlTask { yaml_id: id.to_string(), name, - relys, + afters, + froms, inner, }) } @@ -139,37 +154,41 @@ impl YamlTask { /// /// Used in [`crate::DagEngine`]. pub fn from_yaml(filename: &str) -> Result, DagError> { - let tasks = YamlTask::parse_tasks(filename)?; - let mut res = Vec::new(); - let mut temp_hash_yaml2id = HashMap::new(); - let mut temp_hash_id2rely = HashMap::new(); - - // Wrap tasks - tasks - .into_iter() - .map(|t| { - let task = TaskWrapper::new(t.inner, &t.name); - temp_hash_id2rely.insert(task.get_id(), t.relys); - temp_hash_yaml2id.insert(t.yaml_id, task.get_id()); - res.push(task); - }) - .count(); - - // Add Dependency - for task in &mut res { - let mut relys = Vec::new(); - for rely in &temp_hash_id2rely[&task.get_id()] { - // Rely task existence check - if !temp_hash_yaml2id.contains_key(rely) { - return Err(DagError::running_error(RunningError::RelyTaskIllegal( - task.get_name(), - ))); - } - relys.push(temp_hash_yaml2id[rely]) - } - task.add_relys_by_ids(&relys) + let yaml_tasks = YamlTask::parse_tasks(filename)?; + let mut tasks = Vec::new(); + let mut yid2id = HashMap::new(); + + // Form tasks + for ytask in &yaml_tasks { + let task = TaskWrapper::new( + ytask + .inner + .replace(None) + .expect("[Fatal] Abnormal error occurs."), + &ytask.name, + ); + yid2id.insert(ytask.yaml_id.clone(), task.get_id()); + tasks.push(task); + } + + for (index, ytask) in yaml_tasks.iter().enumerate() { + let afters: Vec = ytask + .afters + .iter() + .map(|after| yid2id.get(after).unwrap_or(&0).to_owned()) + .collect(); + // Task 0 won't exist in normal state, thus this will trigger an RelyTaskIllegal Error later. + + let froms: Vec = ytask + .froms + .iter() + .map(|from| yid2id.get(from).unwrap_or(&0).to_owned()) + .collect(); + + tasks[index].exec_after_id(&afters); + tasks[index].input_from_id(&froms); } - Ok(res) + Ok(tasks) } } diff --git a/dagrs/test/test_dag1.yaml b/dagrs/test/test_dag1.yaml index ee6568ff0b7d695ff6a00c186010ee94953eafa9..d6861dcc2f24dc4e532c83fdda691d23e8289b95 100644 --- a/dagrs/test/test_dag1.yaml +++ b/dagrs/test/test_dag1.yaml @@ -1,7 +1,7 @@ dagrs: a: name: "任务1" - rely: [b] + after: [b] run: type: sh script: ./test/test.sh @@ -9,4 +9,4 @@ dagrs: name: "任务2" run: type: deno - script: print("Hello!") \ No newline at end of file + script: Deno.core.print("Hello!") \ No newline at end of file diff --git a/dagrs/test/test_dag2.yaml b/dagrs/test/test_dag2.yaml index e46a70faaad7d0d1ded1570c221a73b2c890ecec..093de8f45393342edb5c25d168dfb68a7bbc07c4 100644 --- a/dagrs/test/test_dag2.yaml +++ b/dagrs/test/test_dag2.yaml @@ -1,43 +1,43 @@ dagrs: a: name: "任务1" - rely: [b, c] + after: [b, c] run: type: sh script: echo a b: name: "任务2" - rely: [c, f, g] + after: [c, f, g] run: type: sh script: echo b c: name: "任务3" - rely: [e, g] + after: [e, g] run: type: sh script: echo c d: name: "任务4" - rely: [c, e] + after: [c, e] run: type: sh script: echo d e: name: "任务5" - rely: [h] + after: [h] run: type: sh script: echo e f: name: "任务6" - rely: [g] + after: [g] run: type: deno script: Deno.core.print("f\n") g: name: "任务7" - rely: [h] + after: [h] run: type: deno script: Deno.core.print("g\n") diff --git a/dagrs/test/test_error1.yaml b/dagrs/test/test_error1.yaml index f442cc7ca07eb9d3188448e5a31ec322993309d9..d133fc75ea88432ba000fc4ae9a9f4d3d99170b0 100644 --- a/dagrs/test/test_error1.yaml +++ b/dagrs/test/test_error1.yaml @@ -1,7 +1,7 @@ dagrs: a: # no name - rely: [b] + after: [b] run: type: sh script: echo x diff --git a/dagrs/test/test_error2.yaml b/dagrs/test/test_error2.yaml index 5d22582edd980f264a7e6e61406bd8a5725bfc5d..c43d523621427268357e4a73d6de14cf951cd97e 100644 --- a/dagrs/test/test_error2.yaml +++ b/dagrs/test/test_error2.yaml @@ -1,6 +1,6 @@ a: name: "任务1" - rely: [a] + after: [a] run: type: sh script: echo x \ No newline at end of file diff --git a/dagrs/test/test_error3.yaml b/dagrs/test/test_error3.yaml index 0e0328a5d0fe0f129b66ed6354f322e4522ae9bc..6efab9b11892a2062b360c205278f302f4ae75f8 100644 --- a/dagrs/test/test_error3.yaml +++ b/dagrs/test/test_error3.yaml @@ -1,7 +1,7 @@ dagrs: a: name: "任务1" - rely: [b] + after: [b] run: type: sh script: echo x \ No newline at end of file diff --git a/dagrs/test/test_loop1.yaml b/dagrs/test/test_loop1.yaml index cdbdaf3b618e26c5b5c11634fdbb3ef974d76747..d5a331263d6ec5597366208c0cbb609351de1927 100644 --- a/dagrs/test/test_loop1.yaml +++ b/dagrs/test/test_loop1.yaml @@ -1,31 +1,31 @@ dagrs: a: name: "任务1" - rely: [b, c] + after: [b, c] run: type: sh script: echo x b: name: "任务2" - rely: [c] + after: [c] run: type: sh script: echo x c: name: "任务3" - rely: [d] + after: [d] run: type: sh script: echo x d: name: "任务4" - rely: [e] + after: [e] run: type: sh script: echo x e: name: "任务5" - rely: [c] + after: [c] run: type: sh script: echo x \ No newline at end of file diff --git a/dagrs/test/test_loop2.yaml b/dagrs/test/test_loop2.yaml index b87c79d4ce22eb6f010f4e4c40854d33f28539c0..69c09868c7c4fdc22b46207f968adebb2aa73acf 100644 --- a/dagrs/test/test_loop2.yaml +++ b/dagrs/test/test_loop2.yaml @@ -1,49 +1,49 @@ dagrs: a: name: "任务1" - rely: [b, c] + after: [b, c] run: type: sh script: echo x b: name: "任务2" - rely: [c, f, g] + after: [c, f, g] run: type: sh script: echo x c: name: "任务3" - rely: [e, g] + after: [e, g] run: type: sh script: echo x d: name: "任务4" - rely: [c, e] + after: [c, e] run: type: sh script: echo x e: name: "任务5" - rely: [h] + after: [h] run: type: sh script: echo x f: name: "任务6" - rely: [g] + after: [g] run: type: sh script: echo x g: name: "任务7" - rely: [h] + after: [h] run: type: sh script: echo x h: name: "任务8" - rely: [f] + after: [f] run: type: sh script: echo x \ No newline at end of file diff --git a/dagrs/test/test_value_pass1.yaml b/dagrs/test/test_value_pass1.yaml new file mode 100644 index 0000000000000000000000000000000000000000..f227d3db464bc191314cf45ab3985a44b50589c9 --- /dev/null +++ b/dagrs/test/test_value_pass1.yaml @@ -0,0 +1,13 @@ +dagrs: + a: + name: "任务1" + after: [b] + from: [b] + run: + type: sh + script: echo > ./test/test_value_pass1.txt + b: + name: "任务2" + run: + type: deno + script: let a = 1+4; a*2 \ No newline at end of file diff --git a/dagrs/test/test_value_pass2.yaml b/dagrs/test/test_value_pass2.yaml new file mode 100644 index 0000000000000000000000000000000000000000..b4ba7d9910961c435c064ef355dd5b58b78ad7df --- /dev/null +++ b/dagrs/test/test_value_pass2.yaml @@ -0,0 +1,13 @@ +dagrs: + a: + name: "任务1" + run: + type: sh + script: ls README.md + b: + name: "任务2" + after: [a] + from: [a] + run: + type: sh + script: cat > ./test/test_value_pass2.txt