From 6cfa5b348383cf959037e54cbf19ff35f1b0c220 Mon Sep 17 00:00:00 2001 From: Yinwhe Date: Wed, 27 Apr 2022 21:17:03 +0800 Subject: [PATCH] feat: Task return value support Phase 1 feat: task input value support feat: task input value support --- dagrs/Cargo.toml | 7 +- dagrs/README.md | 7 + dagrs/src/{main.rs => _main.rs} | 133 ++++++-------- dagrs/src/dag_engine.rs | 96 ++++++++--- dagrs/src/error_handler.rs | 100 ++++------- dagrs/src/graph.rs | 6 +- dagrs/src/lib.rs | 85 +++++++++ dagrs/src/task/mod.rs | 9 + dagrs/src/task/state.rs | 84 +++++++++ dagrs/src/task/task.rs | 178 +++++++++++++++++++ dagrs/src/{task.rs => task/yaml_task.rs} | 210 ++--------------------- 11 files changed, 538 insertions(+), 377 deletions(-) rename dagrs/src/{main.rs => _main.rs} (60%) create mode 100644 dagrs/src/lib.rs create mode 100644 dagrs/src/task/mod.rs create mode 100644 dagrs/src/task/state.rs create mode 100644 dagrs/src/task/task.rs rename dagrs/src/{task.rs => task/yaml_task.rs} (46%) diff --git a/dagrs/Cargo.toml b/dagrs/Cargo.toml index 10458104..ab8671a0 100644 --- a/dagrs/Cargo.toml +++ b/dagrs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dagrs" -version = "0.1.0" +version = "0.2.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -12,4 +12,7 @@ bimap = "0.6.1" deno_core = "0.121.0" log = "0.4.14" simplelog = "^0.10.0" -clap = { version = "3.0.14", features = ["derive"] } \ No newline at end of file +clap = { version = "3.0.14", features = ["derive"] } +anymap = "1.0.0-beta.2" +crossbeam = "0.8.1" +thiserror = "1.0.30" \ No newline at end of file diff --git a/dagrs/README.md b/dagrs/README.md index f9642183..894ad4b3 100644 --- a/dagrs/README.md +++ b/dagrs/README.md @@ -333,3 +333,10 @@ log 文件如下: 03:47:55 [INFO] Task[name: Task 3] exec done, success: true, return value: Global { data: 0x7f864600d2a0, isolate_handle: IsolateHandle(IsolateAnnex { isolate: 0x0, isolate_mutex: Mutex { data: (), poisoned: false, .. } }) } ``` + +TODO: +- [ ] 增加 Task 输入输出值功能,定义输入输出值传递的条件 +- [ ] 增加 Engine 整体环境变量功能 +- [ ] 增加 Task 和 Engine 任务执行成功和失败的功能 +- [ ] 优化错误处理 +- [ ] 优化 Log \ No newline at end of file diff --git a/dagrs/src/main.rs b/dagrs/src/_main.rs similarity index 60% rename from dagrs/src/main.rs rename to dagrs/src/_main.rs index 6273d558..d2564d6a 100644 --- a/dagrs/src/main.rs +++ b/dagrs/src/_main.rs @@ -7,6 +7,8 @@ extern crate yaml_rust; #[macro_use] extern crate log; extern crate simplelog; +extern crate anymap; +extern crate crossbeam; mod dag_engine; mod error_handler; @@ -45,43 +47,6 @@ fn main() { } } -/// Initialize a logger, and set it's path. -/// -/// # Example -/// ``` -/// init_logger(Some("test/dagrs.log")); // set path mannully -/// // or -/// init_logger(None); // use default path ($HOME/.dagrs/dagrs.log) -/// ``` -/// **Note:** path must exists. -pub fn init_logger(path: Option<&str>) { - let log_path = if let Some(s) = path { - s.to_owned() - } else { - if let Ok(home) = env::var("HOME") { - create_dir(format!("{}/.dagrs", home)).unwrap_or(()); - format!("{}/.dagrs/dagrs.log", home) - } else { - "./dagrs.log".to_owned() - } - }; - - CombinedLogger::init(vec![ - TermLogger::new( - LevelFilter::Info, - Config::default(), - TerminalMode::Mixed, - ColorChoice::Auto, - ), - WriteLogger::new( - LevelFilter::Info, - Config::default(), - File::create(log_path).unwrap(), - ), - ]) - .unwrap(); -} - #[test] fn test_runscript() { let res = DagEngine::new() @@ -160,20 +125,20 @@ fn test_no_runscript() { #[test] fn test_prom1() { - use crate::task::{Retval, TaskTrait, TaskWrapper}; + use crate::task::{Retval, TaskTrait, TaskWrapper, Inputval}; struct T1 {} impl TaskTrait for T1 { - fn run(&self) -> Option { + fn run(&self, input: Inputval) -> Retval { println!("T1!"); - None + Retval::empty() } } struct T2 {} impl TaskTrait for T2 { - fn run(&self) -> Option { + fn run(&self, input: Inputval) -> Retval { println!("T2!"); - None + Retval::empty() } } @@ -192,45 +157,45 @@ fn test_prom1() { dag.run().unwrap(); } -#[test] -fn test_prom2() { - use crate::task::{Retval, RunScript, RunType, TaskTrait, TaskWrapper}; - struct T { - run_script: RunScript, - } - - impl TaskTrait for T { - fn run(&self) -> Option { - Some(self.run_script.exec()) - } - } - - let mut t1 = TaskWrapper::new( - T { - run_script: RunScript::new("echo T1", RunType::SH), - }, - "Task 1", - ); - let mut t2 = TaskWrapper::new( - T { - run_script: RunScript::new("echo T2", RunType::SH), - }, - "Task 2", - ); - let t3 = TaskWrapper::new( - T { - run_script: RunScript::new(r#"Deno.core.print("T3\n")"#, RunType::DENO), - }, - "Task 3", - ); - - t2.add_relys(&[&t1, &t3]); - t1.add_relys(&[&t3]); - - let mut dag = DagEngine::new(); - dag.add_task(t1); - dag.add_task(t2); - dag.add_task(t3); - - dag.run().unwrap(); -} +// #[test] +// fn test_prom2() { +// use crate::task::{Retval, RunScript, RunType, TaskTrait, TaskWrapper}; +// struct T { +// run_script: RunScript, +// } + +// impl TaskTrait for T { +// fn run(&self) -> Option { +// Some(self.run_script.exec()) +// } +// } + +// let mut t1 = TaskWrapper::new( +// T { +// run_script: RunScript::new("echo T1", RunType::SH), +// }, +// "Task 1", +// ); +// let mut t2 = TaskWrapper::new( +// T { +// run_script: RunScript::new("echo T2", RunType::SH), +// }, +// "Task 2", +// ); +// let t3 = TaskWrapper::new( +// T { +// run_script: RunScript::new(r#"Deno.core.print("T3\n")"#, RunType::DENO), +// }, +// "Task 3", +// ); + +// t2.add_relys(&[&t1, &t3]); +// t1.add_relys(&[&t3]); + +// let mut dag = DagEngine::new(); +// dag.add_task(t1); +// dag.add_task(t2); +// dag.add_task(t3); + +// dag.run().unwrap(); +// } diff --git a/dagrs/src/dag_engine.rs b/dagrs/src/dag_engine.rs index d7e5fec3..39e98b54 100644 --- a/dagrs/src/dag_engine.rs +++ b/dagrs/src/dag_engine.rs @@ -1,9 +1,9 @@ //! Dag Engine is dagrs's main body use crate::{ - error_handler::{DagError, InnerError}, + error_handler::{DagError, RunningError}, graph::Graph, - task::{TaskWrapper, YamlTask}, + task::{ExecState, Inputval, Retval, TaskWrapper, YamlTask}, }; use std::collections::HashMap; @@ -13,6 +13,9 @@ pub struct DagEngine { tasks: HashMap, /// Store dependency relations rely_graph: Graph, + /// Store a task's running result + execstate_store: HashMap, + // TODO: Environment } impl DagEngine { @@ -26,6 +29,7 @@ impl DagEngine { DagEngine { tasks: HashMap::new(), rely_graph: Graph::new(), + execstate_store: HashMap::new(), } } @@ -57,11 +61,24 @@ impl DagEngine { /// /// This method is similar to `run`, but read tasks from yaml file, /// thus no need to add tasks mannually. - pub fn run_from_yaml(&mut self, filename: &str) -> Result { + fn run_from_yaml(&mut self, filename: &str) -> Result { self.read_tasks(filename)?; self.run() } + /// Read tasks into engine throuh yaml + /// + /// # Example + /// ``` + /// let yaml_tasks = dagrs.read_task("test/test.yaml"); + /// ``` + /// This operation will read all info in yaml file into `dagrs.tasks` if no error occurs. + fn read_tasks(&mut self, filename: &str) -> Result<(), DagError> { + let tasks = YamlTask::from_yaml(filename)?; + tasks.into_iter().map(|t| self.add_task(t)).count(); + Ok(()) + } + /// Add new task into dagrs /// /// # Example @@ -77,17 +94,32 @@ impl DagEngine { self.tasks.insert(task.get_id(), task); } - /// Read tasks into engine throuh yaml - /// - /// # Example - /// ``` - /// let yaml_tasks = dagrs.read_task("test/test.yaml"); - /// ``` - /// This operation will read all info in yaml file into `dagrs.tasks` if no error occurs. - fn read_tasks(&mut self, filename: &str) -> Result<(), DagError> { - let tasks = YamlTask::from_yaml(filename)?; - tasks.into_iter().map(|t| self.add_task(t)).count(); - Ok(()) + /// Push a task's [`ExecState`] into hash store + fn push_execstate(&mut self, id: usize, state: ExecState) { + assert!( + !self.execstate_store.contains_key(&id), + "[Error] Repetitive push execstate, id: {}", + id + ); + self.execstate_store.insert(id, state); + } + + /// Fetch given task's [`ExecState`], this won't delete it from the hash map. + fn pull_execstate(&self, id: &usize) -> &ExecState { + self.execstate_store + .get(id) + .expect("[Error] Pull execstate fails") + } + + /// Prepare given task's [`Inputval`]. + fn form_input(&self, id: &usize) -> Inputval { + let relys = self.tasks[id].get_rely_list(); + Inputval::new( + relys + .iter() + .map(|rely_id| self.pull_execstate(rely_id).get_dmap()) + .collect(), + ) } /// create rely map between tasks @@ -114,10 +146,10 @@ impl DagEngine { for rely_task_id in task.get_rely_list() { // Rely task existence check let rely_index = self.rely_graph.find_index_by_id(&rely_task_id).ok_or( - DagError::inner_error(InnerError::RelyTaskIllegal(task.get_name())), + DagError::running_error(RunningError::RelyTaskIllegal(task.get_name())), )?; - self.rely_graph.add_edge(index, rely_index); + self.rely_graph.add_edge(rely_index, index); } } @@ -131,19 +163,43 @@ impl DagEngine { /// dagrs.check_dag(); /// ``` /// This opeartions will judge the graph and give out a execution sequence if possible. - fn check_dag(&self) -> bool { + fn check_dag(&mut self) -> bool { if let Some(seq) = self.rely_graph.topo_sort() { let seq = seq .into_iter() .map(|index| self.rely_graph.find_id_by_index(index).unwrap()) .collect(); self.print_seq(&seq); + + // Start Executing seq.iter() .map(|id| { info!("Executing Task[name: {}]", self.tasks[id].get_name()); - if let Some(v) = self.tasks[id].run() { - info!("Task[name: {}] exec done, success: {}, return value: {}", self.tasks[id].get_name(), v.success, v.value); - } + + // Init state is empty. + let mut state = ExecState::empty(); + crossbeam::scope(|scope| { + let task_inner = self.tasks[id].get_inner(); + + let input = self.form_input(id); + let handle = scope.spawn(|_| task_inner.lock().unwrap().run(input)); + + // Recore executing state. + state = if let Ok(val) = handle.join() { + ExecState::new(true, val) + } else { + ExecState::new(false, Retval::empty()) + }; + }) + .expect("[Error] Create child task thread fails."); + + info!( + "Finish Task[name: {}], success: {}", + self.tasks[id].get_name(), + state.success() + ); + // Push executing state in to store. + self.push_execstate(*id, state); }) .count(); true diff --git a/dagrs/src/error_handler.rs b/dagrs/src/error_handler.rs index ef8456d3..99de1d24 100644 --- a/dagrs/src/error_handler.rs +++ b/dagrs/src/error_handler.rs @@ -1,93 +1,49 @@ //! A simple error handle, can output error type and info -use std::fmt::Display; +use thiserror::Error; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Error)] /// A synthesis of all possible errors. pub enum DagError { /// IO Error, like file not exist, etc. /// Here we simplefy it to a message(String). - IOError(String), + #[error("{0}")] + IOError(#[from] std::io::Error), /// YAML Error, like format error, etc. + #[error("{0}")] YamlError(YamlError), /// Error that occurs when running dagrs. - InnerError(InnerError), + #[error("{0}")] + RunningError(RunningError), } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Error)] /// Format Error, point out which part has what kinds of error. -pub enum FormatError { - /// Not start with 'dagrs'. +pub enum YamlFormatError { + #[error("Not start with 'dagrs'")] StartWordError, - /// A task have no name filed, `String` points out task's id. + #[error("Task[{0}] has no name field")] NoName(String), - /// Run field format error + #[error("Task[{0}] run script format error")] RunScriptError(String) } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Error)] /// Error that occurs when parsing YAML file. pub enum YamlError { - /// Yaml Parser defined error type. - YamlParserError(yaml_rust::ScanError), - /// Format Error type. - YamlFormatError(FormatError), + #[error("{0}")] + YamlParserError(#[from] yaml_rust::ScanError), + #[error("{0}")] + YamlFormatError(YamlFormatError), } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Error)] /// Error that occurs when running dagrs -pub enum InnerError { - /// Dependency task not exist +pub enum RunningError { + #[error("Task[{0}] dependency task not exist")] RelyTaskIllegal(String), -} - -impl Display for FormatError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::StartWordError => write!(f, "YAML file not start with 'dagrs:'"), - Self::NoName(id) => write!(f, "Task[ID:{}] name not found", id), - Self::RunScriptError(id) => write!(f, "Task[ID:{}] run script format error", id), - } - } -} - -impl Display for YamlError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::YamlFormatError(e) => e.fmt(f), - Self::YamlParserError(e) => e.fmt(f), - } - } -} - -impl Display for InnerError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::RelyTaskIllegal(name) => write!(f, "Task[Name:{}] rely tasks not exist!", name), - } - } -} - -impl Display for DagError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self { - Self::IOError(e) => e.fmt(f), - Self::YamlError(e) => e.fmt(f), - Self::InnerError(e) => e.fmt(f), - } - } -} - -impl From for DagError { - fn from(e: std::io::Error) -> Self { - Self::IOError(e.to_string()) - } -} - -impl From for DagError { - fn from(e: yaml_rust::ScanError) -> Self { - Self::YamlError(YamlError::YamlParserError(e)) - } + #[error("Task[{0}] run script fails")] + RunScriptFailure(String) } impl DagError { @@ -98,7 +54,7 @@ impl DagError { /// DagError::format_error(FormatError::NoName("a")); /// ``` /// This will throw a error that says, task 'a' has no name field. - pub fn format_error(error: FormatError) -> Self { + pub fn format_error(error: YamlFormatError) -> Self { Self::YamlError(YamlError::YamlFormatError(error)) } @@ -109,7 +65,13 @@ impl DagError { /// DagError::inner_error(InnerError::RelyTaskIllegal("task 1")) /// ``` /// This will throw a error that says, task with name "task 1" has non-exist rely tasks. - pub fn inner_error(error: InnerError) -> Self { - Self::InnerError(error) + pub fn running_error(error: RunningError) -> Self { + Self::RunningError(error) } } + +impl From for DagError { + fn from(e: yaml_rust::ScanError) -> Self { + Self::YamlError(YamlError::YamlParserError(e)) + } +} \ No newline at end of file diff --git a/dagrs/src/graph.rs b/dagrs/src/graph.rs index b651852e..2bfa107a 100644 --- a/dagrs/src/graph.rs +++ b/dagrs/src/graph.rs @@ -58,9 +58,9 @@ impl Graph { self.nodes.insert(id, index); } - /// Add an edge into the graph + /// Add an edge into the graph. /// - /// #Example + /// # Example /// ``` /// g.add_edge(0, 1); /// ``` @@ -121,7 +121,7 @@ impl Graph { .count(); while !queue.is_empty() { - let v = queue.pop().unwrap(); + let v = queue.pop().unwrap(); // This unwrap is ok since `queue` is not empty sequence.push(v); count += 1; diff --git a/dagrs/src/lib.rs b/dagrs/src/lib.rs new file mode 100644 index 00000000..7df98b40 --- /dev/null +++ b/dagrs/src/lib.rs @@ -0,0 +1,85 @@ +extern crate bimap; +extern crate clap; +extern crate deno_core; +extern crate lazy_static; +extern crate yaml_rust; +#[macro_use] +extern crate log; +extern crate anymap; +extern crate crossbeam; +extern crate simplelog; + +mod dag_engine; +mod error_handler; +mod graph; +mod task; + +pub use dag_engine::DagEngine; +pub use task::TaskTrait; + +use simplelog::*; +use std::{ + env, + fs::{create_dir, File}, +}; + +pub fn init_log(logpath: Option<&str>) { + let logpath = if let Some(s) = logpath { + s.to_owned() + } else { + if let Ok(home) = env::var("HOME") { + create_dir(format!("{}/.dagrs", home)).unwrap_or(()); + format!("{}/.dagrs/dagrs.log", home) + } else { + "./dagrs.log".to_owned() + } + }; + + CombinedLogger::init(vec![ + TermLogger::new( + LevelFilter::Info, + Config::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + ), + WriteLogger::new( + LevelFilter::Info, + Config::default(), + File::create(logpath).unwrap(), + ), + ]) + .unwrap(); +} + + +#[test] +fn test_prom1() { + use crate::task::{Inputval, Retval, TaskTrait, TaskWrapper}; + struct T1 {} + impl TaskTrait for T1 { + fn run(&self, _input: Inputval) -> Retval { + println!("T1!"); + Retval::new(1i32) + } + } + + struct T2 {} + impl TaskTrait for T2 { + fn run(&self, input: Inputval) -> Retval { + let val_from_t1 = input.get::(0); + println!("T2, receive: {:?}", val_from_t1); + Retval::empty() + } + } + + let t1 = TaskWrapper::new(T1 {}, "Task 1"); + let mut t2 = TaskWrapper::new(T2 {}, "Task 2"); + + t2.rely_on(&[&t1]); + + let mut dag = DagEngine::new(); + dag.add_task(t1); + dag.add_task(t2); + + dag.run().unwrap(); +} \ No newline at end of file diff --git a/dagrs/src/task/mod.rs b/dagrs/src/task/mod.rs new file mode 100644 index 00000000..5f6fd456 --- /dev/null +++ b/dagrs/src/task/mod.rs @@ -0,0 +1,9 @@ +pub use self::task::*; +pub use self::yaml_task::YamlTask; +pub use self::state::Retval; +pub use self::state::Inputval; +pub use self::state::ExecState; + +mod task; +mod yaml_task; +mod state; \ No newline at end of file diff --git a/dagrs/src/task/state.rs b/dagrs/src/task/state.rs new file mode 100644 index 00000000..6e023a7f --- /dev/null +++ b/dagrs/src/task/state.rs @@ -0,0 +1,84 @@ +use anymap::{CloneAny, Map}; + +type DMap = Map; + +/// Describe task's running result +pub struct ExecState { + success: bool, + retval: Retval, +} + +/// Task's return value +pub struct Retval(Option); + +/// Task's input value +pub struct Inputval(Vec>); + +impl ExecState { + /// Get a new [`ExecState`]. + /// + /// `success`: task finish without panic? + /// + /// `retval`: task's return value + pub fn new(success: bool, retval: Retval) -> Self { + Self { success, retval } + } + + /// Get empty [`ExecState`]. + pub fn empty() -> Self { + Self { + success: false, + retval: Retval::empty(), + } + } + + /// Get [`ExecState`]'s return value. + /// + /// This method will clone [`DMap`] stored in [`ExecState`]'s [`Retval`]. + pub fn get_dmap(&self) -> Option { + self.retval.0.clone() + } + + /// The task execution succeed or not. + /// + /// `true` means no panic occurs. + pub fn success(&self) -> bool { + self.success + } +} + +impl Retval { + #[allow(unused)] + /// Get a new [`Retval`]. + /// + /// Since the return value may be transfered between threads, + /// [`Send`], [`Sync`], [`CloneAny`] is needed. + pub fn new(val: H) -> Self { + let mut map = DMap::new(); + assert!(map.insert(val).is_none(), "[Error] map insert fails."); + Self(Some(map)) + } + + /// Get empty [`Retval`]. + pub fn empty() -> Self { + Self(None) + } +} + +impl Inputval { + /// Get a new [`Inputval`], values stored in vector are ordered + /// by that of the given [`TaskWrapper`]'s `rely_list`. + pub fn new(vals: Vec>) -> Self { + Self(vals) + } + + #[allow(unused)] + /// This method get needed input value from [`Inputval`]. + pub fn get(&self, index: usize) -> Option<&H> { + if let Some(Some(dmap)) = self.0.get(index) { + dmap.get::() + } else { + None + } + } +} diff --git a/dagrs/src/task/task.rs b/dagrs/src/task/task.rs new file mode 100644 index 00000000..8a07085e --- /dev/null +++ b/dagrs/src/task/task.rs @@ -0,0 +1,178 @@ +use crate::error_handler::DagError; + +use super::{ExecState, Retval, Inputval}; +use deno_core::{JsRuntime, RuntimeOptions}; +use lazy_static::lazy_static; +use std::process::Command; +use std::sync::Mutex; +use std::thread; + +/// Task Trait. +/// +/// Any struct implements this trait can be added into dagrs. +pub trait TaskTrait { + fn run(&self, input: Inputval) -> Retval; +} + +/// Wrapper for task that impl [`TaskTrait`]. +/// +/// Since task will be executed in seperated threads, `send` is needed. +pub struct TaskWrapper { + id: usize, + name: String, + rely_list: Vec, + inner: Mutex>, +} + +impl TaskWrapper { + /// Allocate a new TaskWrapper. + /// + /// # Example + /// ``` + /// let t = TaskWrapper::new(Task{}, "Demo Task") + /// ``` + /// + /// `Task` is a struct that impl [`TaskTrait`]. + /// + /// **Note:** This method will take the ownership of struct that impl [`TaskTrait`]. + pub fn new(task: impl TaskTrait + 'static + Send, name: &str) -> Self { + TaskWrapper { + id: ID_ALLOCATOR.lock().unwrap().alloc(), + name: name.to_owned(), + rely_list: Vec::new(), + inner: Mutex::new(Box::new(task)), + } + } + + #[allow(unused)] + /// Tasks that shall be executed after this one. + /// + /// # Example + /// ``` + /// let mut t1 = TaskWrapper::new(T1{}, "Task 1"); + /// let mut t2 = TaskWrapper::new(T2{}, "Task 2"); + /// t2.add_relys(&[&t1]); + /// ``` + /// In above code, `t2` will be executed before `t1`. + pub fn rely_on(&mut self, relys: &[&TaskWrapper]) { + self.rely_list.extend(relys.iter().map(|t| t.get_id())) + } + + /// Tasks that shall be executed after this one. + /// + /// # Example + /// ``` + /// let mut t1 = TaskWrapper::new(T1{}, "Task 1"); + /// let mut t2 = TaskWrapper::new(T2{}, "Task 2"); + /// t2.add_relys_by_ids(&[t1.get_id()]); + /// ``` + /// Similar to `add_relys`, but this method tasks `id` rather than a task. + /// + /// In above code, `t2` will be executed before `t1`. + pub fn add_relys_by_ids(&mut self, relys: &[usize]) { + self.rely_list.extend(relys.iter()) + } + + pub fn get_rely_list(&self) -> Vec { + self.rely_list.clone() + } + + pub fn get_id(&self) -> usize { + self.id + } + + pub fn get_name(&self) -> String { + self.name.to_owned() + } + + pub fn get_inner(&self) -> &Mutex> { + &self.inner + } +} + +/// IDAllocator for TaskWrapper +struct IDAllocator { + id: usize, +} + +impl IDAllocator { + pub fn alloc(&mut self) -> usize { + self.id += 1; + + // Return values + self.id - 1 + } +} + +lazy_static! { + /// Instance of IDAllocator + static ref ID_ALLOCATOR: Mutex = Mutex::new(IDAllocator { id: 0 }); +} + + +/// Can be used to run a script cmd or file. +#[derive(Debug)] +pub struct RunScript { + script: String, + executor: RunType, +} + +/// Run script type, now a script can be run in `sh` or embeded `deno` +#[derive(Debug)] +pub enum RunType { + SH, + DENO, +} + +impl RunScript { + /// Generate a new run script + /// + /// # Example + /// ``` + /// let r = RunScript::new("echo Hello!", RunType::SH); + /// r.exec(); + /// ``` + pub fn new(script: &str, executor: RunType) -> Self { + Self { + script: script.to_owned(), + executor, + } + } + + /// Execute the script. + /// + /// # Example + /// ``` + /// let r = RunScript::new("echo Hello!", RunType::SH); + /// r.exec(); + /// ``` + pub fn exec(&self) -> Result { + match self.executor { + RunType::SH => self.run_sh(), + RunType::DENO => self.run_deno(), + } + } + + fn run_sh(&self) -> Result { + let res = Command::new("sh") + .arg("-c") + .arg(&self.script) + .output() + .map(|output| format!("{}", String::from_utf8(output.stdout).unwrap())); + + res.map_err(|err| err.into()) + } + + fn run_deno(&self) -> Result { + let script = self.script.clone(); + let handle = thread::spawn(move || { + let output = JsRuntime::new(RuntimeOptions { + ..Default::default() + }) + .execute_script("", &script); + }); + + // TODO + unimplemented!() + } +} diff --git a/dagrs/src/task.rs b/dagrs/src/task/yaml_task.rs similarity index 46% rename from dagrs/src/task.rs rename to dagrs/src/task/yaml_task.rs index f1a0e0ba..23a99915 100644 --- a/dagrs/src/task.rs +++ b/dagrs/src/task/yaml_task.rs @@ -1,197 +1,8 @@ -//! Task Implementations, used to store task infos - -use crate::error_handler::{DagError, FormatError, InnerError}; -use deno_core::{JsRuntime, RuntimeOptions}; -use lazy_static::lazy_static; -use std::process::Command; -use std::sync::Mutex; +use super::{Retval, RunScript, RunType, TaskTrait, TaskWrapper, Inputval}; +use crate::error_handler::{DagError, YamlFormatError, RunningError}; use std::{collections::HashMap, fs::File, io::Read}; use yaml_rust::{Yaml, YamlLoader}; - -/// Can be used to run a script cmd or file. -#[derive(Debug)] -pub struct RunScript { - script: String, - executor: RunType, -} - -/// Run script type, now a script can be run in `sh` or embeded `deno` -#[derive(Debug)] -pub enum RunType { - SH, - DENO, -} - -/// Return value for task, not supported now. -pub struct Retval { - pub success: bool, - pub value: String, -} - -impl RunScript { - /// Generate a new run script - /// - /// # Example - /// ``` - /// let r = RunScript::new("echo Hello!", RunType::SH); - /// r.exec(); - /// ``` - pub fn new(script: &str, executor: RunType) -> Self { - Self { - script: script.to_owned(), - executor, - } - } - - /// Execute the script. - /// - /// # Example - /// ``` - /// let r = RunScript::new("echo Hello!", RunType::SH); - /// r.exec(); - /// ``` - pub fn exec(&self) -> Retval { - match self.executor { - RunType::SH => self.run_sh(), - RunType::DENO => self.run_deno(), - } - } - - fn run_sh(&self) -> Retval { - let output = Command::new("sh") - .arg("-c") - .arg(&self.script) - .output() - .unwrap(); - // Reprint result - print!("{}", String::from_utf8(output.stdout).unwrap()); - - Retval { - success: output.status.success(), - value: "".into(), - } - } - - fn run_deno(&self) -> Retval { - let output = JsRuntime::new(RuntimeOptions { - ..Default::default() - }) - .execute_script("", &self.script); - - match output { - Ok(val) => Retval { - success: true, - value: format!("{:?}", val), - }, - Err(e) => Retval { - success: false, - value: format!("{}", e), - }, - } - } -} - -/// Task Trait. -/// -/// Any struct implements this trait can be added into dagrs. -pub trait TaskTrait { - fn run(&self) -> Option; -} - -/// Wrapper for task that impl [`TaskTrait`]. -pub struct TaskWrapper { - id: usize, - name: String, - rely_list: Vec, - inner: Box, -} - -impl TaskWrapper { - /// Allocate a new TaskWrapper. - /// - /// # Example - /// ``` - /// let t = TaskWrapper::new(Task{}, "Demo Task") - /// ``` - /// - /// `Task` is a struct that impl [`TaskTrait`]. - /// - /// **Note:** This method will take the ownership of struct that impl [`TaskTrait`]. - pub fn new(task: impl TaskTrait + 'static, name: &str) -> Self { - TaskWrapper { - id: ID_ALLOCATOR.lock().unwrap().alloc(), - name: name.to_owned(), - rely_list: Vec::new(), - inner: Box::new(task), - } - } - - #[allow(unused)] - /// Tasks that shall be executed after this one. - /// - /// # Example - /// ``` - /// let mut t1 = TaskWrapper::new(T1{}, "Task 1"); - /// let mut t2 = TaskWrapper::new(T2{}, "Task 2"); - /// t2.add_relys(&[&t1]); - /// ``` - /// In above code, `t2` will be executed before `t1`. - pub fn add_relys(&mut self, relys: &[&TaskWrapper]) { - self.rely_list.extend(relys.iter().map(|t| t.get_id())) - } - - /// Tasks that shall be executed after this one. - /// - /// # Example - /// ``` - /// let mut t1 = TaskWrapper::new(T1{}, "Task 1"); - /// let mut t2 = TaskWrapper::new(T2{}, "Task 2"); - /// t2.add_relys_by_ids(&[t1.get_id()]); - /// ``` - /// Similar to `add_relys`, but this method tasks `id` rather than a task. - /// - /// In above code, `t2` will be executed before `t1`. - pub fn add_relys_by_ids(&mut self, relys: &[usize]) { - self.rely_list.extend(relys.iter()) - } - - pub fn get_rely_list(&self) -> Vec { - self.rely_list.clone() - } - - pub fn get_id(&self) -> usize { - self.id - } - - pub fn get_name(&self) -> String { - self.name.to_owned() - } - - pub fn run(&self) -> Option { - self.inner.run() - } -} - -/// IDAllocator for TaskWrapper -struct IDAllocator { - id: usize, -} - -impl IDAllocator { - pub fn alloc(&mut self) -> usize { - self.id += 1; - - // Return values - self.id - 1 - } -} - -lazy_static! { - /// Instance of IDAllocator - static ref ID_ALLOCATOR: Mutex = Mutex::new(IDAllocator { id: 0 }); -} - #[derive(Debug)] /// Task Struct for YAML file. struct YamlTaskInner { @@ -215,8 +26,9 @@ pub struct YamlTask { } impl TaskTrait for YamlTaskInner { - fn run(&self) -> Option { - Some(self.run.exec()) + fn run(&self, input: Inputval) -> Retval { + // TODO + Retval::empty() } } @@ -242,19 +54,19 @@ impl YamlTask { // Get name first let name = info["name"] .as_str() - .ok_or(DagError::format_error(FormatError::NoName(id.to_owned())))? + .ok_or(DagError::format_error(YamlFormatError::NoName(id.to_owned())))? .to_owned(); // Get run script let run = &info["run"]; let executor = match run["type"].as_str().ok_or(DagError::format_error( - FormatError::RunScriptError(id.into()), + YamlFormatError::RunScriptError(id.into()), ))? { "sh" => RunType::SH, "deno" => RunType::DENO, _ => { - return Err(DagError::format_error(FormatError::RunScriptError( + return Err(DagError::format_error(YamlFormatError::RunScriptError( id.into(), ))) } @@ -263,7 +75,7 @@ impl YamlTask { let run_script = run["script"] .as_str() - .ok_or(DagError::format_error(FormatError::RunScriptError( + .ok_or(DagError::format_error(YamlFormatError::RunScriptError( id.into(), )))?; @@ -304,7 +116,7 @@ impl YamlTask { let yaml_tasks = YamlLoader::load_from_str(&yaml_cont)?; let yaml_tasks = yaml_tasks[0]["dagrs"] .as_hash() - .ok_or(DagError::format_error(FormatError::StartWordError))?; + .ok_or(DagError::format_error(YamlFormatError::StartWordError))?; let mut tasks = Vec::new(); // Read tasks @@ -349,7 +161,7 @@ impl YamlTask { for rely in &temp_hash_id2rely[&task.get_id()] { // Rely task existence check if !temp_hash_yaml2id.contains_key(rely) { - return Err(DagError::inner_error(InnerError::RelyTaskIllegal( + return Err(DagError::running_error(RunningError::RelyTaskIllegal( task.get_name(), ))); } -- Gitee