# AsyncSchedule **Repository Path**: it168man/AsyncSchedule ## Basic Information - **Project Name**: AsyncSchedule - **Description**: github地址:https://github.com/Demon-mark/AsyncSchedule,借鉴京东异步任务编排,利用三色标记法设计的异步任务编排工具 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2024-09-08 - **Last Updated**: 2024-09-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # asyncSchedule 事件驱动型异步任务编排框架 ## 相关概念 + AWorker:最小工作单元,提供具体的异步任务 + AWorkerWrapper:最小执行单元,为异步任务提供回调能力,传递异步任务参数 + Scheduler:与AWorkerNode配合通过三色标记法实现不同任务之间的依赖关系 + Async:线程池包装,与Scheduler配合完成异步任务及其编排 ## 特点 + 灵活的事件回调 + 相互依赖的任务之间数据传递 + 支持并行、一对多、多对一、多对多及复杂流程编排 + 支持强弱依赖关系,分别对应allOf、anyOf + 支持任务的快速失败 + 循环数据传递的预检及运行时检查 + 任务超时的快速失败 ## 相关接口 + AWorker:覆写这个接口的run方法来定义自己的业务逻辑,param参数代表了未来会传递给这个接口的参数,results则代表了可能的其他任务为该任务提供的结果 + Scheduler.newWorker(String name, AWorker worker):通过这个接口来告知调度器需要调度的worker,不要忘记为你的任务提供参数(.setParam()) + Scheduler.resultTransfer(String from, String to):通过这个接口,调度器将知道数据需要从名字为from的worker传递到名字为to的worker,当调用这个接口时,需要保证from,to所代表的worker已经被正确实例化(未来可能提供预配置来使调度器能够调度未来的worker) + Async.execute(Scheduler scheduler):运行你的任务 ## 使用建议 + 框架的核心为Scheduler,通过Scheduler对象,能够快速地进行容器编排及参数传递,Scheduler是全局的调度器,通过它可以来注册你的任务并为其命名,在运行结束后,还可以通过它和你所指定的任务名来获取你的任务执行结果 + 如何注册一个任务?通过Scheduler对象的newWorker()方法,通过这个方法,你可以在注册一个对象的同时通过链式调用来自定义你的操作,比如,你可以使用callback来指定你自己的回调方法,但是要注意,所有链式调用中的重复调用会只保留最后一次调用的设置,也就是说,如果调用了多次callback(),那么最终的回调的执行只是最后一次callback()时所指定的回调。 + 如何建立一个数据流向?工具支持单向的数据流通,即只支持数据从一个方法流入另一个方法,而不能反过来,如果不能理解,只需要想一下,工具的数据流通核心是将上一步的返回值汇总并传递到下一个方法。很快就可以在示例代码中看到,通过数据流通,可以很轻易的编排出整个异步任务的全貌。 + 什么是快速失败?对于强依赖任务,即一个任务的执行依赖于之前全部任务的正常完成,如果有一个或几个之前的任务失败,那么这个任务就不应该被执行,对于弱依赖任务,即一个任务的执行只需要前面的一个或几个任务结束即可,不要求全部任务的正常返回,这时只要有一个之前的任务正常结束,这个任务就应当继续执行,其快速失败只发生在他的强依赖任务失败或其弱依赖任务全部失败 + 回调能做什么?对于回调定义如下: ```java public interface ACallback { /** * 这个回调将在任务即将开始时调用,在这里可以有机会对参数进行最后一次检查 * @param param 传入的参数 */ void onBegin(@Nullable P param); /** * 当任务有了执行结果时,这个回调会被触发,其触发时机为:当次任务正常执行之后,下个任务onBegin之前 * @param result 当次任务的执行结果 */ void onResult(@Nullable R result); /** * 在当次任务出错时将会被调用 * @param throwable 错误原因 * @param result 执行结果的引用 * @return 是否继续执行,返回为true将会使用result引用继续下一次任务,返回为false则会抛出异常信息终止此次任务 */ boolean onError(@NotNull Throwable throwable, @Nullable R result); /** * 当任务完成时调用,无论是否正常完成都会被调用 * @param param 执行参数 * @param result 执行结果 * @param throwable 异常信息 */ void onComplete(@Nullable P param, @Nullable R result, @Nullable Throwable throwable); /** * 当任务呗快速失败时调用 * @param throwable 被快速失败的原因 * @param results 快速失败前的结果 */ void onFail(Throwable throwable, Map results); } ``` 使用回调函数将使你有可能介入任务执行的各个阶段,并更改一些状态信息,不用担心,所有的数据都是线程私有的,因此不会有并发安全隐患。 此外,尤其要注意的是,一些操作可能影响任务的行为,例如onError方法,当你返回false时,程序将不会忽略异常信息,并对依赖该任务的其他任务执行快速失败,而当你返回true时,程序会忽略异常信息并继续将result进行传递 总之,工具希望能够通过回调函数来让你知道到底发生了什么,并竭尽可能地为你提供所有需要的信息。 其实,对于Scheduler是如何实现的,实际上就是利用回调函数来完成的。 ## 示例代码 所有的实例代码都可以在test包下找到 + ### 并行执行 通过Scheduler对象创建一系列的worker同时执行! ```java public class parallel { @Test public void parallel() { Scheduler scheduler = new Scheduler(); for (int i = 0; i < 50; i++) { scheduler.newWorker("runner" + i, new Runner()) .param("runner" + i) .callback(new DefaultCallbackAdapter() { @Override public void onResult(String result) { System.out.println("result" + result); } @Override public boolean onError(Throwable throwable, String result) { throwable.printStackTrace(); return true; } }).build(); } scheduler.run(); Map results = scheduler.results(); results.forEach(new BiConsumer() { @Override public void accept(String s, AWorkerResult result) { System.out.println(s + ": " + result.getResult()); } }); } } ``` + ### 一对多数据传递 通过must,将数据从任意一个地方传递到另外任意一个地方! 以下代码展示了这样的场景: ![alt 一对多数据传递]() ```java public class Param { @Test public void oneToMany() { Scheduler scheduler = new Scheduler(); scheduler.newWorker("runner1", new Runner()) .param("runner1 param").build(); scheduler.newWorker("runner2", new Runner()) .param("runner2 param").build(); scheduler.newWorker("runner3", new Runner()) .param("runner3 param").build(); // 将runner1的返回值传递到runner2进行处理 scheduler.must("runner1", "runner2"); scheduler.must("runner1", "runner3"); scheduler.run(); Map results = scheduler.results(); } } ``` + ### 多对一数据传递 通过must不仅可以将一个数据源绑定到其他多个worker,也可以将多个worker作为数据源绑定到其他worker 以下代码展示了这样的场景: ![alt 多对一数据传递]() ```java public class Param { @Test public void manyToOne() { Scheduler scheduler = new Scheduler(); scheduler.newWorker("runner1", new Runner()) .param("runner1 param").build(); scheduler.newWorker("runner2", new Runner()) .param("runner2 param").build(); scheduler.newWorker("runner3", new Runner()) .param("runner3 param").build(); scheduler.must("runner1", "runner3"); scheduler.must("runner2", "runner3"); scheduler.run(); Map results = scheduler.results(); } } ``` + ### 多对多数据传递 不要太局限,你可以在任意多个worker间传递数据 以下代码的场景如图所示 ![alt 多对多数据传递]() ```java public class Param { @Test public void manyToMany() { Scheduler scheduler = new Scheduler(); scheduler.newWorker("runner1", new Runner()) .param("runner1 param").build(); scheduler.newWorker("runner2", new Runner()) .param("runner2 param").build(); scheduler.newWorker("runner3", new Runner()) .param("runner3 param").build(); scheduler.newWorker("runner4", new Runner()) .param("runner4 param").build(); scheduler.must("runner1", "runner3"); scheduler.must("runner2", "runner3"); scheduler.must("runner1", "runner4"); scheduler.must("runner2", "runner4"); scheduler.run(); Map results = scheduler.results(); results.forEach((s, result) -> System.out.println(s + ": " +result.getResult())); } } ``` + ### 复杂流程设计 再复杂一点: ![alt 复杂流程设计]() ```java public class Multi { @Test public void multi() { Scheduler scheduler = new Scheduler(); for (int i = 0; i < 7; i++) { String name = "runner" + (i + 1); scheduler.newWorker(name, new Runner()).param(name + " param").build(); } scheduler.must("runner1", "runner2"); scheduler.must("runner1", "runner3"); scheduler.must("runner2", "runner3"); scheduler.must("runner3", "runner4"); scheduler.must("runner4", "runner5"); scheduler.must("runner6", "runner7"); scheduler.must("runner7", "runner5"); scheduler.run(); Map results = scheduler.results(); results.forEach((s, result) -> System.out.println(s + ": " +result.getResult())); } } ``` + ### 循环依赖任务预检及运行时检测 工具提供了一些能力,来避免一些可能的错误,例如以下代码,是不能够正常执行的,因为数据从1到2到3,最后又流向了1 如果确实有让1重复处理的需求,可以多实例化一个4,让其保持和1同样的功能,然后将3的数据流指向4即可 ```java public class check { @Test public void circleCheck() { Scheduler scheduler = new Scheduler(); scheduler.newWorker("runner1", new Runner()) .param("runner1 param").build(); scheduler.newWorker("runner2", new Runner()) .param("runner2 param").build(); scheduler.newWorker("runner3", new Runner()) .param("runner3 param").build(); scheduler.must("runner1", "runner2"); scheduler.must("runner2", "runner3"); scheduler.must("runner3", "runner1"); // crack! scheduler.run(); } } ``` + ### 强依赖设置 ```java public class MustDependency { @Test public void mustTest() { Scheduler scheduler = new Scheduler(); scheduler.newWorker("runner1", new Runner()) .param("runner1 param").build(); scheduler.newWorker("runner2", new Runner()) .param("runner2 param").build(); scheduler.newWorker("runner3", new Runner()) .param("runner3 param").build(); scheduler.must("runner1", "runner3"); scheduler.must("runner2", "runner3"); scheduler.run(); // blocking scheduler.results(); } } ``` + ### 弱依赖设置 除了must方法外,还可以使用need方法来实现和must相同的功能,唯一的不同是其在快速失败方面和must表现出不同的行为 也就是说,must为数据流通提供了强依赖性,类似于allOf,而need为数据流通提供了弱依赖性,类似于anyOf ```java public class NeedDependency { @Test public void needTest() { Scheduler scheduler = new Scheduler(); scheduler.newWorker("runner1", new Runner()) .param("runner1 param").build(); scheduler.newWorker("runner2", new Runner()) .param("runner2 param").build(); scheduler.newWorker("runner3", new Runner()) .param("runner3 param").build(); scheduler.need("runner1", "runner3"); scheduler.need("runner2", "runner3"); scheduler.run(); // blocking scheduler.results(); } } ``` + ### 快速失败 1. #### 强依赖任务的快速失败 执行以下代码,你会发现runner3的方法不会被调用,而控制台会告诉你相关的警告信息 ```java public class FastFail { @Test public void mustFastFailTest() { System.out.println(System.currentTimeMillis()); Scheduler scheduler = new Scheduler(); scheduler.newWorker("runner1", new Runner()) .param("runner1 param").build(); scheduler.newWorker("runner2", new ErrorRunner()) .param("runner2 param").build(); scheduler.newWorker("runner3", new LongTimeRunner()) .param("runner3 param").build(); scheduler.must("runner1", "runner3"); scheduler.must("runner2", "runner3"); scheduler.run(); // blocking Map results = scheduler.results(); System.out.println(System.currentTimeMillis()); } } ``` 2. #### 弱依赖任务的快速失败 相较于强依赖任务的不可挽回,执行以下代码你会发现虽然控制台发出了警告,但是runner3依旧被执行了,这是因为runner1的正常执行 ```java public class FastFail { @Test public void needFastFailTest() { Scheduler scheduler = new Scheduler(); scheduler.newWorker("runner1", new Runner()) .param("runner1 param").build(); scheduler.newWorker("runner2", new ErrorRunner()) .param("runner2 param").build(); scheduler.newWorker("runner3", new Runner()) .param("runner3 param").build(); scheduler.need("runner1", "runner3"); scheduler.need("runner2", "runner3"); scheduler.run(); // blocking Map results = scheduler.results(); } } ```