# AsyncSchedule
**Repository Path**: it168man/AsyncSchedule
## Basic Information
- **Project Name**: AsyncSchedule
- **Description**: github地址:https://github.com/Demon-mark/AsyncSchedule,借鉴京东异步任务编排,利用三色标记法设计的异步任务编排工具
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 2
- **Created**: 2024-09-08
- **Last Updated**: 2024-09-08
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# asyncSchedule 事件驱动型异步任务编排框架
## 相关概念
+ AWorker:最小工作单元,提供具体的异步任务
+ AWorkerWrapper:最小执行单元,为异步任务提供回调能力,传递异步任务参数
+ Scheduler:与AWorkerNode配合通过三色标记法实现不同任务之间的依赖关系
+ Async:线程池包装,与Scheduler配合完成异步任务及其编排
## 特点
+ 灵活的事件回调
+ 相互依赖的任务之间数据传递
+ 支持并行、一对多、多对一、多对多及复杂流程编排
+ 支持强弱依赖关系,分别对应allOf、anyOf
+ 支持任务的快速失败
+ 循环数据传递的预检及运行时检查
+ 任务超时的快速失败
## 相关接口
+ AWorker:覆写这个接口的run方法来定义自己的业务逻辑,param参数代表了未来会传递给这个接口的参数,results则代表了可能的其他任务为该任务提供的结果
+ Scheduler.newWorker(String name, AWorker worker):通过这个接口来告知调度器需要调度的worker,不要忘记为你的任务提供参数(.setParam())
+ Scheduler.resultTransfer(String from, String to):通过这个接口,调度器将知道数据需要从名字为from的worker传递到名字为to的worker,当调用这个接口时,需要保证from,to所代表的worker已经被正确实例化(未来可能提供预配置来使调度器能够调度未来的worker)
+ Async.execute(Scheduler scheduler):运行你的任务
## 使用建议
+ 框架的核心为Scheduler,通过Scheduler对象,能够快速地进行容器编排及参数传递,Scheduler是全局的调度器,通过它可以来注册你的任务并为其命名,在运行结束后,还可以通过它和你所指定的任务名来获取你的任务执行结果
+ 如何注册一个任务?通过Scheduler对象的newWorker()方法,通过这个方法,你可以在注册一个对象的同时通过链式调用来自定义你的操作,比如,你可以使用callback来指定你自己的回调方法,但是要注意,所有链式调用中的重复调用会只保留最后一次调用的设置,也就是说,如果调用了多次callback(),那么最终的回调的执行只是最后一次callback()时所指定的回调。
+ 如何建立一个数据流向?工具支持单向的数据流通,即只支持数据从一个方法流入另一个方法,而不能反过来,如果不能理解,只需要想一下,工具的数据流通核心是将上一步的返回值汇总并传递到下一个方法。很快就可以在示例代码中看到,通过数据流通,可以很轻易的编排出整个异步任务的全貌。
+ 什么是快速失败?对于强依赖任务,即一个任务的执行依赖于之前全部任务的正常完成,如果有一个或几个之前的任务失败,那么这个任务就不应该被执行,对于弱依赖任务,即一个任务的执行只需要前面的一个或几个任务结束即可,不要求全部任务的正常返回,这时只要有一个之前的任务正常结束,这个任务就应当继续执行,其快速失败只发生在他的强依赖任务失败或其弱依赖任务全部失败
+ 回调能做什么?对于回调定义如下:
```java
public interface ACallback
{
/**
* 这个回调将在任务即将开始时调用,在这里可以有机会对参数进行最后一次检查
* @param param 传入的参数
*/
void onBegin(@Nullable P param);
/**
* 当任务有了执行结果时,这个回调会被触发,其触发时机为:当次任务正常执行之后,下个任务onBegin之前
* @param result 当次任务的执行结果
*/
void onResult(@Nullable R result);
/**
* 在当次任务出错时将会被调用
* @param throwable 错误原因
* @param result 执行结果的引用
* @return 是否继续执行,返回为true将会使用result引用继续下一次任务,返回为false则会抛出异常信息终止此次任务
*/
boolean onError(@NotNull Throwable throwable, @Nullable R result);
/**
* 当任务完成时调用,无论是否正常完成都会被调用
* @param param 执行参数
* @param result 执行结果
* @param throwable 异常信息
*/
void onComplete(@Nullable P param,
@Nullable R result,
@Nullable Throwable throwable);
/**
* 当任务呗快速失败时调用
* @param throwable 被快速失败的原因
* @param results 快速失败前的结果
*/
void onFail(Throwable throwable, Map results);
}
```
使用回调函数将使你有可能介入任务执行的各个阶段,并更改一些状态信息,不用担心,所有的数据都是线程私有的,因此不会有并发安全隐患。
此外,尤其要注意的是,一些操作可能影响任务的行为,例如onError方法,当你返回false时,程序将不会忽略异常信息,并对依赖该任务的其他任务执行快速失败,而当你返回true时,程序会忽略异常信息并继续将result进行传递
总之,工具希望能够通过回调函数来让你知道到底发生了什么,并竭尽可能地为你提供所有需要的信息。
其实,对于Scheduler是如何实现的,实际上就是利用回调函数来完成的。
## 示例代码
所有的实例代码都可以在test包下找到
+ ### 并行执行
通过Scheduler对象创建一系列的worker同时执行!
```java
public class parallel {
@Test
public void parallel() {
Scheduler scheduler = new Scheduler();
for (int i = 0; i < 50; i++) {
scheduler.newWorker("runner" + i, new Runner())
.param("runner" + i)
.callback(new DefaultCallbackAdapter() {
@Override
public void onResult(String result) {
System.out.println("result" + result);
}
@Override
public boolean onError(Throwable throwable, String result) {
throwable.printStackTrace();
return true;
}
}).build();
}
scheduler.run();
Map results = scheduler.results();
results.forEach(new BiConsumer() {
@Override
public void accept(String s, AWorkerResult result) {
System.out.println(s + ": " + result.getResult());
}
});
}
}
```
+ ### 一对多数据传递
通过must,将数据从任意一个地方传递到另外任意一个地方!
以下代码展示了这样的场景:

```java
public class Param {
@Test
public void oneToMany() {
Scheduler scheduler = new Scheduler();
scheduler.newWorker("runner1", new Runner())
.param("runner1 param").build();
scheduler.newWorker("runner2", new Runner())
.param("runner2 param").build();
scheduler.newWorker("runner3", new Runner())
.param("runner3 param").build();
// 将runner1的返回值传递到runner2进行处理
scheduler.must("runner1", "runner2");
scheduler.must("runner1", "runner3");
scheduler.run();
Map results = scheduler.results();
}
}
```
+ ### 多对一数据传递
通过must不仅可以将一个数据源绑定到其他多个worker,也可以将多个worker作为数据源绑定到其他worker
以下代码展示了这样的场景:

```java
public class Param {
@Test
public void manyToOne() {
Scheduler scheduler = new Scheduler();
scheduler.newWorker("runner1", new Runner())
.param("runner1 param").build();
scheduler.newWorker("runner2", new Runner())
.param("runner2 param").build();
scheduler.newWorker("runner3", new Runner())
.param("runner3 param").build();
scheduler.must("runner1", "runner3");
scheduler.must("runner2", "runner3");
scheduler.run();
Map results = scheduler.results();
}
}
```
+ ### 多对多数据传递
不要太局限,你可以在任意多个worker间传递数据
以下代码的场景如图所示

```java
public class Param {
@Test
public void manyToMany() {
Scheduler scheduler = new Scheduler();
scheduler.newWorker("runner1", new Runner())
.param("runner1 param").build();
scheduler.newWorker("runner2", new Runner())
.param("runner2 param").build();
scheduler.newWorker("runner3", new Runner())
.param("runner3 param").build();
scheduler.newWorker("runner4", new Runner())
.param("runner4 param").build();
scheduler.must("runner1", "runner3");
scheduler.must("runner2", "runner3");
scheduler.must("runner1", "runner4");
scheduler.must("runner2", "runner4");
scheduler.run();
Map results = scheduler.results();
results.forEach((s, result) -> System.out.println(s + ": " +result.getResult()));
}
}
```
+ ### 复杂流程设计
再复杂一点:

```java
public class Multi {
@Test
public void multi() {
Scheduler scheduler = new Scheduler();
for (int i = 0; i < 7; i++) {
String name = "runner" + (i + 1);
scheduler.newWorker(name, new Runner()).param(name + " param").build();
}
scheduler.must("runner1", "runner2");
scheduler.must("runner1", "runner3");
scheduler.must("runner2", "runner3");
scheduler.must("runner3", "runner4");
scheduler.must("runner4", "runner5");
scheduler.must("runner6", "runner7");
scheduler.must("runner7", "runner5");
scheduler.run();
Map results = scheduler.results();
results.forEach((s, result) -> System.out.println(s + ": " +result.getResult()));
}
}
```
+ ### 循环依赖任务预检及运行时检测
工具提供了一些能力,来避免一些可能的错误,例如以下代码,是不能够正常执行的,因为数据从1到2到3,最后又流向了1
如果确实有让1重复处理的需求,可以多实例化一个4,让其保持和1同样的功能,然后将3的数据流指向4即可
```java
public class check {
@Test
public void circleCheck() {
Scheduler scheduler = new Scheduler();
scheduler.newWorker("runner1", new Runner())
.param("runner1 param").build();
scheduler.newWorker("runner2", new Runner())
.param("runner2 param").build();
scheduler.newWorker("runner3", new Runner())
.param("runner3 param").build();
scheduler.must("runner1", "runner2");
scheduler.must("runner2", "runner3");
scheduler.must("runner3", "runner1");
// crack!
scheduler.run();
}
}
```
+ ### 强依赖设置
```java
public class MustDependency {
@Test
public void mustTest() {
Scheduler scheduler = new Scheduler();
scheduler.newWorker("runner1", new Runner())
.param("runner1 param").build();
scheduler.newWorker("runner2", new Runner())
.param("runner2 param").build();
scheduler.newWorker("runner3", new Runner())
.param("runner3 param").build();
scheduler.must("runner1", "runner3");
scheduler.must("runner2", "runner3");
scheduler.run();
// blocking
scheduler.results();
}
}
```
+ ### 弱依赖设置
除了must方法外,还可以使用need方法来实现和must相同的功能,唯一的不同是其在快速失败方面和must表现出不同的行为
也就是说,must为数据流通提供了强依赖性,类似于allOf,而need为数据流通提供了弱依赖性,类似于anyOf
```java
public class NeedDependency {
@Test
public void needTest() {
Scheduler scheduler = new Scheduler();
scheduler.newWorker("runner1", new Runner())
.param("runner1 param").build();
scheduler.newWorker("runner2", new Runner())
.param("runner2 param").build();
scheduler.newWorker("runner3", new Runner())
.param("runner3 param").build();
scheduler.need("runner1", "runner3");
scheduler.need("runner2", "runner3");
scheduler.run();
// blocking
scheduler.results();
}
}
```
+ ### 快速失败
1. #### 强依赖任务的快速失败
执行以下代码,你会发现runner3的方法不会被调用,而控制台会告诉你相关的警告信息
```java
public class FastFail {
@Test
public void mustFastFailTest() {
System.out.println(System.currentTimeMillis());
Scheduler scheduler = new Scheduler();
scheduler.newWorker("runner1", new Runner())
.param("runner1 param").build();
scheduler.newWorker("runner2", new ErrorRunner())
.param("runner2 param").build();
scheduler.newWorker("runner3", new LongTimeRunner())
.param("runner3 param").build();
scheduler.must("runner1", "runner3");
scheduler.must("runner2", "runner3");
scheduler.run();
// blocking
Map results = scheduler.results();
System.out.println(System.currentTimeMillis());
}
}
```
2. #### 弱依赖任务的快速失败
相较于强依赖任务的不可挽回,执行以下代码你会发现虽然控制台发出了警告,但是runner3依旧被执行了,这是因为runner1的正常执行
```java
public class FastFail {
@Test
public void needFastFailTest() {
Scheduler scheduler = new Scheduler();
scheduler.newWorker("runner1", new Runner())
.param("runner1 param").build();
scheduler.newWorker("runner2", new ErrorRunner())
.param("runner2 param").build();
scheduler.newWorker("runner3", new Runner())
.param("runner3 param").build();
scheduler.need("runner1", "runner3");
scheduler.need("runner2", "runner3");
scheduler.run();
// blocking
Map results = scheduler.results();
}
}
```