callback;
/**
* 标记该事件是否已经被处理过了,譬如已经超时返回false了,后续rpc又收到返回值了,则不再二次回调
* 经试验,volatile并不能保证"同一毫秒"内,多线程对该值的修改和拉取
*
* 1-finish, 2-error, 3-working
*/
- private AtomicInteger state = new AtomicInteger(0);
+ protected final AtomicInteger state = new AtomicInteger(0);
+ /**
+ * 也是个钩子变量,用来存临时的结果
+ */
+ protected volatile WorkResult workResult = WorkResult.defaultResult();
/**
* 该map存放所有wrapper的id和wrapper映射
+ *
+ * 需要线程安全。
*/
- private Map forParamUseWrappers;
+ private Map> forParamUseWrappers;
/**
- * 也是个钩子变量,用来存临时的结果
+ * 各种策略的封装类。
+ *
+ * 其实是因为加功能太多导致这个对象大小超过了128Byte,所以强迫症的我不得不把几个字段丢到策略类里面去。
+ * ps: 大小超过128Byte令我(TcSnZh)难受的一比,就像走在草坪的格子上,一步嫌小、两步扯蛋。
+ * IDEA可以使用JOL Java Object Layout插件查看对象大小。
*/
- private volatile WorkResult workResult = WorkResult.defaultResult();
+ private final WrapperStrategy wrapperStrategy = new WrapperStrategy();
/**
- * 是否在执行自己前,去校验nextWrapper的执行结果
- * 1 4
- * -------3
- * 2
- * 如这种在4执行前,可能3已经执行完毕了(被2执行完后触发的),那么4就没必要执行了。
- * 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的
+ * 超时检查,该值允许为null。表示不设置。
*/
- private volatile boolean needCheckNextWrapperResult = true;
+ private volatile TimeOutProperties timeOutProperties;
+
+ // ***** state属性的常量值 *****
- private static final int FINISH = 1;
- private static final int ERROR = 2;
- private static final int WORKING = 3;
- private static final int INIT = 0;
+ public static final int FINISH = 1;
+ public static final int ERROR = 2;
+ public static final int WORKING = 3;
+ public static final int INIT = 0;
- private WorkerWrapper(String id, IWorker worker, T param, ICallback callback) {
+ WorkerWrapper(String id, IWorker worker, T param, ICallback callback) {
if (worker == null) {
throw new NullPointerException("async.worker is null");
}
@@ -92,518 +85,636 @@ public class WorkerWrapper {
this.callback = callback;
}
+ // ========== public ==========
+
/**
- * 开始工作
- * fromWrapper代表这次work是由哪个上游wrapper发起的
+ * 外部调用本线程运行此Wrapper的入口方法。
+ *
+ * @param executorService 该ExecutorService将成功运行后,在nextWrapper有多个时被使用于多线程调用。
+ * @param remainTime 剩下的时间
+ * @param forParamUseWrappers 用于保存经过的wrapper的信息的Map,key为id。
+ * @param inspector wrapper调度检查器
*/
- private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map forParamUseWrappers) {
- this.forParamUseWrappers = forParamUseWrappers;
- //将自己放到所有wrapper的集合里去
- forParamUseWrappers.put(id, this);
- long now = SystemClock.now();
- //总的已经超时了,就快速失败,进行下一个
- if (remainTime <= 0) {
- fastFail(INIT, null);
- beginNext(executorService, now, remainTime);
- return;
- }
- //如果自己已经执行过了。
- //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
- if (getState() == FINISH || getState() == ERROR) {
- beginNext(executorService, now, remainTime);
- return;
- }
-
- //如果在执行前需要校验nextWrapper的状态
- if (needCheckNextWrapperResult) {
- //如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了
- if (!checkNextWrapperResult()) {
- fastFail(INIT, new SkippedException());
- beginNext(executorService, now, remainTime);
- return;
- }
- }
-
- //如果没有任何依赖,说明自己就是第一批要执行的
- if (dependWrappers == null || dependWrappers.size() == 0) {
- fire();
- beginNext(executorService, now, remainTime);
- return;
- }
-
- /*如果有前方依赖,存在两种情况
- 一种是前面只有一个wrapper。即 A -> B
- 一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。
- 所以需要B来做判断,必须A、C、D都完成,自己才能执行 */
+ public void work(ExecutorService executorService,
+ long remainTime,
+ Map> forParamUseWrappers,
+ WrapperEndingInspector inspector) {
+ work(executorService, null, remainTime, forParamUseWrappers, inspector);
+ }
- //只有一个依赖
- if (dependWrappers.size() == 1) {
- doDependsOneJob(fromWrapper);
- beginNext(executorService, now, remainTime);
- } else {
- //有多个依赖时
- doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);
- }
+ public String getId() {
+ return id;
+ }
+ public WorkResult getWorkResult() {
+ return workResult;
}
+ public void setParam(T param) {
+ this.param = param;
+ }
- public void work(ExecutorService executorService, long remainTime, Map forParamUseWrappers) {
- work(executorService, null, remainTime, forParamUseWrappers);
+ public int getState() {
+ return state.get();
}
/**
- * 总控制台超时,停止所有任务
+ * 获取之后的下游Wrapper
*/
- public void stopNow() {
- if (getState() == INIT || getState() == WORKING) {
- fastFail(getState(), null);
- }
- }
+ public abstract Set> getNextWrappers();
/**
- * 判断自己下游链路上,是否存在已经出结果的或已经开始执行的
- * 如果没有返回true,如果有返回false
+ * 使wrapper状态修改为超时失败。(但如果已经执行完成则不会修改)
+ *
+ * 本方法不会试图执行超时判定逻辑。
+ * 如果要执行超时逻辑判断,请用{@link TimeOutProperties#checkTimeOut(boolean)}并传入参数true。
*/
- private boolean checkNextWrapperResult() {
- //如果自己就是最后一个,或者后面有并行的多个,就返回true
- if (nextWrappers == null || nextWrappers.size() != 1) {
- return getState() == INIT;
+ public void failNow() {
+ int state = getState();
+ if (state == INIT || state == WORKING) {
+ fastFail(state, null);
}
- WorkerWrapper nextWrapper = nextWrappers.get(0);
- boolean state = nextWrapper.getState() == INIT;
- //继续校验自己的next的状态
- return state && nextWrapper.checkNextWrapperResult();
}
+ public WrapperStrategy getWrapperStrategy() {
+ return wrapperStrategy;
+ }
+
+ // ========== protected ==========
+
/**
- * 进行下一个任务
+ * 快速失败
+ *
+ * @return 已经失败则返回false,如果刚才设置为失败了则返回true。
*/
- private void beginNext(ExecutorService executorService, long now, long remainTime) {
- //花费的时间
- long costTime = SystemClock.now() - now;
- if (nextWrappers == null) {
- return;
- }
- if (nextWrappers.size() == 1) {
- nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
- return;
- }
- CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
- for (int i = 0; i < nextWrappers.size(); i++) {
- int finalI = i;
- futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
- .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
+ protected boolean fastFail(int expect, Exception e) {
+ //试图将它从expect状态,改成Error
+ if (!compareAndSetState(expect, ERROR)) {
+ return false;
}
- try {
- CompletableFuture.allOf(futures).get();
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
+
+ //尚未处理过结果
+ if (checkIsNullResult()) {
+ if (e == null) {
+ workResult.setResultState(ResultState.TIMEOUT);
+ } else {
+ workResult.setResultState(ResultState.EXCEPTION);
+ workResult.setEx(e);
+ }
+ workResult.setResult(worker.defaultValue());
}
+ callback.result(false, param, workResult);
+ return true;
}
- private void doDependsOneJob(WorkerWrapper dependWrapper) {
- if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
- workResult = defaultResult();
- fastFail(INIT, null);
- } else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
- workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
- fastFail(INIT, null);
- } else {
- //前面任务正常完毕了,该自己了
- fire();
- }
+ /**
+ * 判断{@link #state}状态是否是初始值。
+ */
+ protected boolean checkIsNullResult() {
+ return ResultState.DEFAULT == workResult.getResultState();
}
- private synchronized void doDependsJobs(ExecutorService executorService, List dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
- boolean nowDependIsMust = false;
- //创建必须完成的上游wrapper集合
- Set mustWrapper = new HashSet<>();
- for (DependWrapper dependWrapper : dependWrappers) {
- if (dependWrapper.isMust()) {
- mustWrapper.add(dependWrapper);
- }
- if (dependWrapper.getDependWrapper().equals(fromWrapper)) {
- nowDependIsMust = dependWrapper.isMust();
- }
- }
+ protected boolean compareAndSetState(int expect, int update) {
+ return this.state.compareAndSet(expect, update);
+ }
- //如果全部是不必须的条件,那么只要到了这里,就执行自己。
- if (mustWrapper.size() == 0) {
- if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) {
- fastFail(INIT, null);
- } else {
- fire();
- }
- beginNext(executorService, now, remainTime);
+ /**
+ * 工作的核心方法。
+ *
+ * @param fromWrapper 代表这次work是由哪个上游wrapper发起的。如果是首个Wrapper则为null。
+ * @param remainTime 剩余时间。
+ */
+ protected void work(ExecutorService executorService,
+ WorkerWrapper, ?> fromWrapper,
+ long remainTime,
+ Map> forParamUseWrappers,
+ WrapperEndingInspector inspector) {
+ this.setForParamUseWrappers(forParamUseWrappers);
+ //将自己放到所有wrapper的集合里去
+ forParamUseWrappers.put(id, this);
+ long now = SystemClock.now();
+ //总的已经超时了,就快速失败,进行下一个
+ if (remainTime <= 0) {
+ fastFail(INIT, null);
+ beginNext(executorService, now, remainTime, inspector);
return;
}
-
- //如果存在需要必须完成的,且fromWrapper不是必须的,就什么也不干
- if (!nowDependIsMust) {
+ //如果自己已经执行过了。
+ //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
+ if (getState() == FINISH || getState() == ERROR) {
+ beginNext(executorService, now, remainTime, inspector);
return;
}
- //如果fromWrapper是必须的
- boolean existNoFinish = false;
- boolean hasError = false;
- //先判断前面必须要执行的依赖任务的执行结果,如果有任何一个失败,那就不用走action了,直接给自己设置为失败,进行下一步就是了
- for (DependWrapper dependWrapper : mustWrapper) {
- WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();
- WorkResult tempWorkResult = workerWrapper.getWorkResult();
- //为null或者isWorking,说明它依赖的某个任务还没执行到或没执行完
- if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) {
- existNoFinish = true;
- break;
- }
- if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {
- workResult = defaultResult();
- hasError = true;
- break;
- }
- if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {
- workResult = defaultExResult(workerWrapper.getWorkResult().getEx());
- hasError = true;
- break;
- }
-
- }
- //只要有失败的
- if (hasError) {
- fastFail(INIT, null);
- beginNext(executorService, now, remainTime);
+ // 判断是否要跳过自己,该方法可能会跳过正在工作的自己。
+ final WrapperStrategy wrapperStrategy = getWrapperStrategy();
+ if (wrapperStrategy.shouldSkip(getNextWrappers(), this, fromWrapper)) {
+ fastFail(INIT, new SkippedException());
+ beginNext(executorService, now, remainTime, inspector);
return;
}
- //如果上游都没有失败,分为两种情况,一种是都finish了,一种是有的在working
- //都finish的话
- if (!existNoFinish) {
- //上游都finish了,进行自己
+ //如果没有任何依赖,说明自己就是第一批要执行的
+ final Set> dependWrappers = getDependWrappers();
+ if (dependWrappers == null || dependWrappers.size() == 0) {
fire();
- beginNext(executorService, now, remainTime);
+ beginNext(executorService, now, remainTime, inspector);
return;
}
- }
- /**
- * 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
- */
- private void fire() {
- //阻塞取结果
- workResult = workerDoJob();
+ DependenceAction.WithProperty judge = wrapperStrategy.judgeAction(dependWrappers, this, fromWrapper);
+
+ switch (judge.getDependenceAction()) {
+ case TAKE_REST:
+ inspector.reduceWrapper(this);
+ return;
+ case FAST_FAIL:
+ switch (judge.getResultState()) {
+ case TIMEOUT:
+ fastFail(INIT, null);
+ break;
+ case EXCEPTION:
+ fastFail(INIT, judge.getFastFailException());
+ break;
+ default:
+ fastFail(INIT, new RuntimeException("ResultState " + judge.getResultState() + " set to FAST_FAIL"));
+ break;
+ }
+ beginNext(executorService, now, remainTime, inspector);
+ break;
+ case START_WORK:
+ fire();
+ beginNext(executorService, now, remainTime, inspector);
+ break;
+ case JUDGE_BY_AFTER:
+ default:
+ inspector.reduceWrapper(this);
+ throw new IllegalStateException("策略配置错误,不应当在WorkerWrapper中返回JUDGE_BY_AFTER或其他无效值 : this=" + this + ",fromWrapper=" + fromWrapper);
+ }
}
/**
- * 快速失败
+ * 进行下一个任务
*/
- private boolean fastFail(int expect, Exception e) {
- //试图将它从expect状态,改成Error
- if (!compareAndSetState(expect, ERROR)) {
- return false;
+ protected void beginNext(ExecutorService executorService, long now, long remainTime, WrapperEndingInspector inspector) {
+ //花费的时间
+ final long costTime = SystemClock.now() - now;
+ final long nextRemainTIme = remainTime - costTime;
+ Set> nextWrappers = getNextWrappers();
+ if (nextWrappers == null) {
+ inspector.setWrapperEndWithTryPolling(this);
+ return;
}
-
- //尚未处理过结果
- if (checkIsNullResult()) {
- if (e == null) {
- workResult = defaultResult();
- } else {
- workResult = defaultExResult(e);
+ // nextWrappers只有一个,就用本线程继续跑。
+ if (nextWrappers.size() == 1) {
+ try {
+ WorkerWrapper, ?> next = nextWrappers.stream().findFirst().get();
+ inspector.addWrapper(next);
+ next.work(executorService, WorkerWrapper.this, nextRemainTIme, getForParamUseWrappers(), inspector);
+ } finally {
+ inspector.setWrapperEndWithTryPolling(this);
}
+ return;
+ }
+ // nextWrappers有多个
+ try {
+ inspector.addWrapper(nextWrappers);
+ nextWrappers.forEach(next -> executorService.submit(() ->
+ next.work(executorService, this, nextRemainTIme, getForParamUseWrappers(), inspector))
+ );
+ } finally {
+ inspector.setWrapperEndWithTryPolling(this);
}
-
- callback.result(false, param, workResult);
- return true;
}
/**
- * 具体的单个worker执行任务
+ * 本工作线程执行自己的job.判断阻塞超时这里开始时会判断一次总超时时间,但在轮询线程会判断单个wrapper超时时间,并也会判断总超时时间。
*/
- private WorkResult workerDoJob() {
+ protected void fire() {
+ //阻塞取结果
//避免重复执行
if (!checkIsNullResult()) {
- return workResult;
+ return;
}
try {
//如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行
if (!compareAndSetState(INIT, WORKING)) {
- return workResult;
+ return;
+ }
+ V resultValue;
+ try {
+ callback.begin();
+ if (timeOutProperties != null) {
+ timeOutProperties.startWorking();
+ }
+ //执行耗时操作
+ resultValue = (V) worker.action(param, (Map) getForParamUseWrappers());
+ } finally {
+ if (timeOutProperties != null) {
+ timeOutProperties.endWorking();
+ }
}
-
- callback.begin();
-
- //执行耗时操作
- V resultValue = worker.action(param, forParamUseWrappers);
-
//如果状态不是在working,说明别的地方已经修改了
if (!compareAndSetState(WORKING, FINISH)) {
- return workResult;
+ return;
}
-
workResult.setResultState(ResultState.SUCCESS);
workResult.setResult(resultValue);
//回调成功
callback.result(true, param, workResult);
-
- return workResult;
} catch (Exception e) {
//避免重复回调
if (!checkIsNullResult()) {
- return workResult;
+ return;
}
fastFail(WORKING, e);
- return workResult;
}
}
- public WorkResult getWorkResult() {
- return workResult;
- }
+ // ========== hashcode and equals ==========
- public List> getNextWrappers() {
- return nextWrappers;
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
}
- public void setParam(T param) {
- this.param = param;
+ /**
+ * {@code return id.hashCode();}返回id值的hashcode
+ */
+ @Override
+ public int hashCode() {
+ // final String id can use to .hashcode() .
+ return id.hashCode();
}
- private boolean checkIsNullResult() {
- return ResultState.DEFAULT == workResult.getResultState();
- }
+ // ========== Builder ==========
- private void addDepend(WorkerWrapper, ?> workerWrapper, boolean must) {
- addDepend(new DependWrapper(workerWrapper, must));
+ public static WorkerWrapperBuilder builder() {
+ return new Builder<>();
}
- private void addDepend(DependWrapper dependWrapper) {
- if (dependWrappers == null) {
- dependWrappers = new ArrayList<>();
- }
- //如果依赖的是重复的同一个,就不重复添加了
- for (DependWrapper wrapper : dependWrappers) {
- if (wrapper.equals(dependWrapper)) {
- return;
- }
+ /**
+ * 自v1.5,该类被抽取到{@link StableWorkerWrapperBuilder}抽象类,兼容之前的版本。
+ */
+ public static class Builder extends StableWorkerWrapperBuilder> {
+ /**
+ * @deprecated 建议使用 {@link #builder()}返回{@link WorkerWrapperBuilder}接口,以调用v1.5之后的规范api
+ */
+ @Deprecated
+ public Builder() {
}
- dependWrappers.add(dependWrapper);
}
- private void addNext(WorkerWrapper, ?> workerWrapper) {
- if (nextWrappers == null) {
- nextWrappers = new ArrayList<>();
- }
- //避免添加重复
- for (WorkerWrapper wrapper : nextWrappers) {
- if (workerWrapper.equals(wrapper)) {
- return;
- }
- }
- nextWrappers.add(workerWrapper);
+ // ========== package access methods , for example , some getter/setter that doesn't want to be public ==========
+
+ T getParam() {
+ return param;
}
- private void addNextWrappers(List> wrappers) {
- if (wrappers == null) {
- return;
- }
- for (WorkerWrapper, ?> wrapper : wrappers) {
- addNext(wrapper);
- }
+ IWorker getWorker() {
+ return worker;
}
- private void addDependWrappers(List dependWrappers) {
- if (dependWrappers == null) {
- return;
- }
- for (DependWrapper wrapper : dependWrappers) {
- addDepend(wrapper);
- }
+ void setWorker(IWorker worker) {
+ this.worker = worker;
}
- private WorkResult defaultResult() {
- workResult.setResultState(ResultState.TIMEOUT);
- workResult.setResult(worker.defaultValue());
- return workResult;
+ ICallback getCallback() {
+ return callback;
}
- private WorkResult defaultExResult(Exception ex) {
- workResult.setResultState(ResultState.EXCEPTION);
- workResult.setResult(worker.defaultValue());
- workResult.setEx(ex);
- return workResult;
+ void setCallback(ICallback callback) {
+ this.callback = callback;
}
+ void setState(int state) {
+ this.state.set(state);
+ }
- private int getState() {
- return state.get();
+ Map> getForParamUseWrappers() {
+ return forParamUseWrappers;
}
- public String getId() {
- return id;
+ void setForParamUseWrappers(Map> forParamUseWrappers) {
+ this.forParamUseWrappers = forParamUseWrappers;
}
- private boolean compareAndSetState(int expect, int update) {
- return this.state.compareAndSet(expect, update);
+ void setWorkResult(WorkResult workResult) {
+ this.workResult = workResult;
}
- private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
- this.needCheckNextWrapperResult = needCheckNextWrapperResult;
+ abstract void setNextWrappers(Set> nextWrappers);
+
+ abstract Set> getDependWrappers();
+
+ abstract void setDependWrappers(Set> dependWrappers);
+
+ TimeOutProperties getTimeOut() {
+ return timeOutProperties;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- WorkerWrapper, ?> that = (WorkerWrapper, ?>) o;
- return needCheckNextWrapperResult == that.needCheckNextWrapperResult &&
- Objects.equals(param, that.param) &&
- Objects.equals(worker, that.worker) &&
- Objects.equals(callback, that.callback) &&
- Objects.equals(nextWrappers, that.nextWrappers) &&
- Objects.equals(dependWrappers, that.dependWrappers) &&
- Objects.equals(state, that.state) &&
- Objects.equals(workResult, that.workResult);
+ void setTimeOut(TimeOutProperties timeOutProperties) {
+ this.timeOutProperties = timeOutProperties;
}
+ // ========== toString ==========
+
@Override
- public int hashCode() {
- return Objects.hash(param, worker, callback, nextWrappers, dependWrappers, state, workResult, needCheckNextWrapperResult);
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(200)
+ .append("WorkerWrapper{id=").append(id)
+ .append(", param=").append(param)
+ .append(", worker=").append(worker)
+ .append(", callback=").append(callback)
+ .append(", state=");
+ int state = this.state.get();
+ if (state == FINISH) {
+ sb.append("FINISH");
+ } else if (state == WORKING) {
+ sb.append("WORKING");
+ } else if (state == INIT) {
+ sb.append("INIT");
+ } else if (state == ERROR) {
+ sb.append("ERROR");
+ } else {
+ throw new IllegalStateException("unknown state : " + state);
+ }
+ sb
+ .append(", workResult=").append(workResult)
+ // 防止循环引用,这里只输出相关Wrapper的id
+ .append(", forParamUseWrappers::getId=[");
+ getForParamUseWrappers().keySet().forEach(wrapperId -> sb.append(wrapperId).append(", "));
+ if (getForParamUseWrappers().keySet().size() > 0) {
+ sb.delete(sb.length() - 2, sb.length());
+ }
+ sb
+ .append("], dependWrappers::getId=[");
+ getDependWrappers().stream().map(WorkerWrapper::getId).forEach(wrapperId -> sb.append(wrapperId).append(", "));
+ if (getDependWrappers().size() > 0) {
+ sb.delete(sb.length() - 2, sb.length());
+ }
+ sb
+ .append("], nextWrappers::getId=[");
+ getNextWrappers().stream().map(WorkerWrapper::getId).forEach(wrapperId -> sb.append(wrapperId).append(", "));
+ if (getNextWrappers().size() > 0) {
+ sb.delete(sb.length() - 2, sb.length());
+ }
+ sb
+ .append("]")
+ .append(", wrapperStrategy=").append(getWrapperStrategy())
+ .append(", timeOutProperties=").append(getTimeOut())
+ .append('}');
+ return sb.toString();
}
- public static class Builder {
- /**
- * 该wrapper的唯一标识
- */
- private String id = UUID.randomUUID().toString();
- /**
- * worker将来要处理的param
- */
- private W param;
- private IWorker worker;
- private ICallback callback;
+ public static class WrapperStrategy implements DependenceStrategy, SkipStrategy {
+
+ // ========== 这三个属性用来判断是否要开始工作 ==========
+
+ // 从前往后依次判断的顺序为 dependWrapperStrategyMapper -> dependMustStrategyMapper -> dependenceStrategy
+
/**
- * 自己后面的所有
+ * 对特殊Wrapper专用的依赖响应策略。
+ * 该值允许为null
*/
- private List> nextWrappers;
+ private DependWrapperStrategyMapper dependWrapperStrategyMapper;
/**
- * 自己依赖的所有
+ * 对必须完成的(must的)Wrapper的依赖响应策略。
+ * 该值允许为null
+ *
+ * 这是一个不得不向历史妥协的属性。用于适配must开关方式。
*/
- private List dependWrappers;
+ private DependMustStrategyMapper dependMustStrategyMapper;
/**
- * 存储强依赖于自己的wrapper集合
+ * 依赖响应全局策略。
*/
- private Set> selfIsMustSet;
+ private DependenceStrategy dependenceStrategy;
+
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ // 如果存在依赖,则调用三层依赖响应策略进行判断
+ DependenceStrategy strategy = dependWrapperStrategyMapper;
+ if (dependMustStrategyMapper != null) {
+ strategy = strategy == null ? dependMustStrategyMapper : strategy.thenJudge(dependMustStrategyMapper);
+ }
+ if (dependenceStrategy != null) {
+ strategy = strategy == null ? dependenceStrategy : strategy.thenJudge(dependenceStrategy);
+ }
+ if (strategy == null) {
+ throw new IllegalStateException("配置无效,三层判断策略均为null,请开发者检查自己的Builder是否逻辑错误!");
+ }
+ return strategy.judgeAction(dependWrappers, thisWrapper, fromWrapper);
+ }
- private boolean needCheckNextWrapperResult = true;
+ public DependWrapperStrategyMapper getDependWrapperStrategyMapper() {
+ return dependWrapperStrategyMapper;
+ }
- public Builder worker(IWorker worker) {
- this.worker = worker;
- return this;
+ public void setDependWrapperStrategyMapper(DependWrapperStrategyMapper dependWrapperStrategyMapper) {
+ this.dependWrapperStrategyMapper = dependWrapperStrategyMapper;
}
- public Builder param(W w) {
- this.param = w;
- return this;
+ public DependMustStrategyMapper getDependMustStrategyMapper() {
+ return dependMustStrategyMapper;
}
- public Builder id(String id) {
- if (id != null) {
- this.id = id;
- }
- return this;
+ public void setDependMustStrategyMapper(DependMustStrategyMapper dependMustStrategyMapper) {
+ this.dependMustStrategyMapper = dependMustStrategyMapper;
}
- public Builder needCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
- this.needCheckNextWrapperResult = needCheckNextWrapperResult;
- return this;
+ public DependenceStrategy getDependenceStrategy() {
+ return dependenceStrategy;
}
- public Builder callback(ICallback callback) {
- this.callback = callback;
- return this;
+ public void setDependenceStrategy(DependenceStrategy dependenceStrategy) {
+ this.dependenceStrategy = dependenceStrategy;
}
- public Builder depend(WorkerWrapper, ?>... wrappers) {
- if (wrappers == null) {
- return this;
- }
- for (WorkerWrapper, ?> wrapper : wrappers) {
- depend(wrapper);
- }
- return this;
+ // ========== 跳过策略 ==========
+
+ private SkipStrategy skipStrategy;
+
+ @Override
+ public boolean shouldSkip(Set> nextWrappers, WorkerWrapper, ?> thisWrapper, WorkerWrapper, ?> fromWrapper) {
+ return skipStrategy != null && skipStrategy.shouldSkip(nextWrappers, thisWrapper, fromWrapper);
}
- public Builder depend(WorkerWrapper, ?> wrapper) {
- return depend(wrapper, true);
+ public SkipStrategy getSkipStrategy() {
+ return skipStrategy;
}
- public Builder depend(WorkerWrapper, ?> wrapper, boolean isMust) {
- if (wrapper == null) {
- return this;
- }
- DependWrapper dependWrapper = new DependWrapper(wrapper, isMust);
- if (dependWrappers == null) {
- dependWrappers = new ArrayList<>();
- }
- dependWrappers.add(dependWrapper);
- return this;
+ public void setSkipStrategy(SkipStrategy skipStrategy) {
+ this.skipStrategy = skipStrategy;
}
- public Builder next(WorkerWrapper, ?> wrapper) {
- return next(wrapper, true);
+ // ========== toString ==========
+
+ @Override
+ public String toString() {
+ return "WrapperStrategy{" +
+ "dependWrapperStrategyMapper=" + dependWrapperStrategyMapper +
+ ", dependMustStrategyMapper=" + dependMustStrategyMapper +
+ ", dependenceStrategy=" + dependenceStrategy +
+ ", skipStrategy=" + skipStrategy +
+ '}';
}
+ }
- public Builder next(WorkerWrapper, ?> wrapper, boolean selfIsMust) {
- if (nextWrappers == null) {
- nextWrappers = new ArrayList<>();
- }
- nextWrappers.add(wrapper);
+ public static class TimeOutProperties {
+ private final boolean enable;
+ private final long time;
+ private final TimeUnit unit;
+ private final boolean allowInterrupt;
+ private final WorkerWrapper, ?> wrapper;
- //强依赖自己
- if (selfIsMust) {
- if (selfIsMustSet == null) {
- selfIsMustSet = new HashSet<>();
- }
- selfIsMustSet.add(wrapper);
- }
- return this;
+ private final Object lock = new Object();
+
+ private volatile boolean started = false;
+ private volatile boolean ended = false;
+ private volatile long startWorkingTime;
+ private volatile long endWorkingTime;
+ private volatile Thread doWorkingThread;
+
+ public TimeOutProperties(boolean enable, long time, TimeUnit unit, boolean allowInterrupt, WorkerWrapper, ?> wrapper) {
+ this.enable = enable;
+ this.time = time;
+ this.unit = unit;
+ this.allowInterrupt = allowInterrupt;
+ this.wrapper = wrapper;
}
- public Builder next(WorkerWrapper, ?>... wrappers) {
- if (wrappers == null) {
- return this;
- }
- for (WorkerWrapper, ?> wrapper : wrappers) {
- next(wrapper);
+ // ========== 工作线程调用 ==========
+
+ public void startWorking() {
+ synchronized (lock) {
+ started = true;
+ startWorkingTime = SystemClock.now();
+ doWorkingThread = Thread.currentThread();
}
- return this;
}
- public WorkerWrapper build() {
- WorkerWrapper wrapper = new WorkerWrapper<>(id, worker, param, callback);
- wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult);
- if (dependWrappers != null) {
- for (DependWrapper workerWrapper : dependWrappers) {
- workerWrapper.getDependWrapper().addNext(wrapper);
- wrapper.addDepend(workerWrapper);
- }
+ public void endWorking() {
+ synchronized (lock) {
+ ended = true;
+ doWorkingThread = null;
+ endWorkingTime = SystemClock.now();
}
- if (nextWrappers != null) {
- for (WorkerWrapper, ?> workerWrapper : nextWrappers) {
- boolean must = false;
- if (selfIsMustSet != null && selfIsMustSet.contains(workerWrapper)) {
- must = true;
+ }
+
+ // ========== 轮询线程调用 ==========
+
+ /**
+ * 检查超时。
+ * 可以将boolean参数传入true以在超时的时候直接失败。
+ *
+ * @param withStop 如果为false,不会发生什么,仅仅是单纯的判断是否超时。
+ * 如果为true,则会去快速失败wrapper{@link #failNow()},有必要的话还会打断线程。
+ * @return 如果 超时 或 执行时间超过限制 返回true;未超时返回false。
+ */
+ public boolean checkTimeOut(boolean withStop) {
+ if (enable) {
+ synchronized (lock) {
+ if (started) {
+ // 判断执行中的wrapper是否超时
+ long dif = (ended ? endWorkingTime : SystemClock.now()) - startWorkingTime;
+ if (dif > unit.toMillis(time)) {
+ if (withStop) {
+ if (allowInterrupt) {
+ doWorkingThread.interrupt();
+ }
+ wrapper.failNow();
+ ended = true;
+ }
+ return true;
+ }
+ return false;
}
- workerWrapper.addDepend(wrapper, must);
- wrapper.addNext(workerWrapper);
}
}
+ return false;
+ }
+
+ // ========== package ==========
+
+ boolean isEnable() {
+ return enable;
+ }
+
+ long getTime() {
+ return time;
+ }
+
+ TimeUnit getUnit() {
+ return unit;
+ }
+
+ boolean isAllowInterrupt() {
+ return allowInterrupt;
+ }
+
+ Object getLock() {
+ return lock;
+ }
+
+ boolean isStarted() {
+ return started;
+ }
+
+ void setStarted(boolean started) {
+ this.started = started;
+ }
- return wrapper;
+ boolean isEnded() {
+ return ended;
}
+ void setEnded(boolean ended) {
+ this.ended = ended;
+ }
+
+ long getStartWorkingTime() {
+ return startWorkingTime;
+ }
+
+ void setStartWorkingTime(long startWorkingTime) {
+ this.startWorkingTime = startWorkingTime;
+ }
+
+ long getEndWorkingTime() {
+ return endWorkingTime;
+ }
+
+ void setEndWorkingTime(long endWorkingTime) {
+ this.endWorkingTime = endWorkingTime;
+ }
+
+ Thread getDoWorkingThread() {
+ return doWorkingThread;
+ }
+
+ void setDoWorkingThread(Thread doWorkingThread) {
+ this.doWorkingThread = doWorkingThread;
+ }
+
+
+ // ========== toString ==========
+
+ @Override
+ public String toString() {
+ return "TimeOutProperties{" +
+ "enable=" + enable +
+ ", time=" + time +
+ ", unit=" + unit +
+ ", allowInterrupt=" + allowInterrupt +
+ ", wrapper::getId=" + wrapper.getId() +
+ ", started=" + started +
+ ", ended=" + ended +
+ ", startWorkingTime=" + startWorkingTime +
+ ", endWorkingTime=" + endWorkingTime +
+ ", doWorkingThread=" + doWorkingThread +
+ '}';
+ }
}
}
diff --git a/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java
new file mode 100644
index 0000000000000000000000000000000000000000..4a519c0e43bd4f7e46db6379257500d92474cfa3
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java
@@ -0,0 +1,290 @@
+package com.jd.platform.async.wrapper;
+
+import com.jd.platform.async.callback.ICallback;
+import com.jd.platform.async.callback.IWorker;
+import com.jd.platform.async.worker.WorkResult;
+import com.jd.platform.async.wrapper.actionstrategy.DependWrapperActionStrategy;
+import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy;
+import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 作为优化编排依赖策略后,新增的Builder接口。
+ *
+ * 该接口中不再开放很多过时的api。
+ *
+ * @author create by TcSnZh on 2021/5/4-下午1:26
+ */
+public interface WorkerWrapperBuilder {
+ /**
+ * 设置唯一id。
+ * 如果不设置,{@link StableWorkerWrapperBuilder}会使用UUID
+ */
+ WorkerWrapperBuilder id(String id);
+
+ /**
+ * 设置{@link IWorker}执行方法。
+ *
+ * @param worker 传入接口实现类/lambda
+ */
+ WorkerWrapperBuilder worker(IWorker worker);
+
+ /**
+ * wrapper启动后的传入参数。
+ *
+ * @param t 参数
+ */
+ WorkerWrapperBuilder param(T t);
+
+ /**
+ * 设置{@link ICallback}回调方法。
+ */
+ WorkerWrapperBuilder callback(ICallback callback);
+
+ /**
+ * 设置跳过策略。通常用于检查下游Wrapper是否已经完成。
+ *
+ * 允许不设置。{@link StableWorkerWrapperBuilder}将会默认设置为检查深度为1的下游Wrapper是否执行完成。
+ *
+ * @param strategy 跳过策略函数。
+ */
+ WorkerWrapperBuilder setSkipStrategy(SkipStrategy strategy);
+
+ /**
+ * 设置上游Wrapper依赖关系的选项。
+ */
+ SetDepend setDepend();
+
+ interface SetDepend {
+ /**
+ * 设置在本Wrapper之前的上游Wrapper。
+ *
+ * @param wrapper 允许传入null。
+ */
+ SetDepend wrapper(WorkerWrapper, ?> wrapper);
+
+ default SetDepend wrapper(WorkerWrapper... wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ for (WorkerWrapper, ?> wrapper : wrappers) {
+ wrapper(wrapper);
+ }
+ return this;
+ }
+
+ default SetDepend wrapper(Collection extends WorkerWrapper> wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ wrappers.forEach(this::wrapper);
+ return this;
+ }
+
+ /**
+ * 设置必须要执行成功的Wrapper,当所有被该方法设为的上游Wrapper执行成功时,本Wrapper才能执行
+ */
+ SetDepend mustRequireWrapper(WorkerWrapper, ?> wrapper);
+
+ default SetDepend mustRequireWrapper(WorkerWrapper... wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ for (WorkerWrapper, ?> wrapper : wrappers) {
+ mustRequireWrapper(wrapper);
+ }
+ return this;
+ }
+
+ /**
+ * 一个用于动态判断是否must的方法,与旧的{@code .depend(WorkerWrapper,boolean)}效果相同。
+ *
+ * @param must 如果为true,则等同于{@link #mustRequireWrapper(WorkerWrapper)},否则等同于{@link #wrapper(WorkerWrapper)}
+ */
+ default SetDepend requireWrapper(WorkerWrapper, ?> wrapper, boolean must) {
+ return must ? mustRequireWrapper(wrapper) : wrapper(wrapper);
+ }
+
+ /**
+ * 对单个Wrapper设置特殊策略。
+ *
+ * @param wrapper 需要设置特殊策略的Wrapper。
+ * @param strategy 特殊策略。
+ */
+ SetDepend specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper, ?> wrapper);
+
+ default SetDepend specialDependWrapper(DependWrapperActionStrategy strategy, WorkerWrapper... wrappers) {
+ if (strategy == null || wrappers == null) {
+ return this;
+ }
+ for (WorkerWrapper, ?> workerWrapper : wrappers) {
+ specialDependWrapper(strategy, workerWrapper);
+ }
+ return this;
+ }
+
+ /**
+ * 设置基本策略并返回。
+ *
+ * 如果从未调用该方法,则在{@link #build()}时使用{@link #defaultStrategy()}作为默认策略。
+ *
+ *
+ * @param dependenceStrategy 根据上游Wrapper判断本Wrapper是否启动的最终策略。
+ */
+ SetDepend strategy(DependenceStrategy dependenceStrategy);
+
+ /**
+ * 默认策略为{@link DependenceStrategy#ALL_DEPENDENCIES_ALL_SUCCESS}
+ */
+ default SetDepend defaultStrategy() {
+ return strategy(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS);
+ }
+
+ /**
+ * 结束依赖关系设置。返回到所属的{@link WorkerWrapperBuilder}
+ */
+ WorkerWrapperBuilder end();
+ }
+
+ /**
+ * 便捷式设置依赖的上游Wrapper。
+ *
+ * @param wrappers 上游Wrapper
+ */
+ default WorkerWrapperBuilder depends(WorkerWrapper... wrappers) {
+ return setDepend().wrapper(wrappers).end();
+ }
+
+ default WorkerWrapperBuilder depends(Collection wrappers) {
+ return setDepend().wrapper(wrappers).end();
+ }
+
+ default WorkerWrapperBuilder depends(DependenceStrategy strategy, WorkerWrapper... wrappers) {
+ return setDepend().wrapper(wrappers).strategy(strategy).end();
+ }
+
+ default WorkerWrapperBuilder depends(DependenceStrategy strategy, Collection wrappers) {
+ return setDepend().wrapper(wrappers).strategy(strategy).end();
+ }
+
+ /**
+ * 设置下游Wrapper依赖关系的选项。
+ */
+ SetNext setNext();
+
+ interface SetNext {
+ /**
+ * 设置在本Wrapper之后的下游Wrapper。
+ */
+ SetNext wrapper(WorkerWrapper, ?> wrapper);
+
+ default SetNext wrapper(WorkerWrapper... wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ for (WorkerWrapper, ?> wrapper : wrappers) {
+ wrapper(wrapper);
+ }
+ return this;
+ }
+
+ default SetNext wrapper(Collection extends WorkerWrapper> wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ wrappers.forEach(this::wrapper);
+ return this;
+ }
+
+ /**
+ * 调用该方法将会让传入的此下游workerWrappers对本Wrapper强依赖(must)
+ *
+ * @param wrapper 下游Wrapper
+ */
+ SetNext mustToNextWrapper(WorkerWrapper, ?> wrapper);
+
+ default SetNext requireToNextWrapper(WorkerWrapper, ?> wrapper, boolean must) {
+ return must ? mustToNextWrapper(wrapper) : wrapper(wrapper);
+ }
+
+ /**
+ * 调用该方法将会让传入的此下游workerWrappers对本Wrapper进行特殊策略判断,
+ *
+ * @param strategy 对本Wrapper的特殊策略。
+ * @param wrapper 依赖本Wrapper的下游Wrapper。
+ * @return 返回Builder自身。
+ */
+ SetNext specialToNextWrapper(DependWrapperActionStrategy strategy, WorkerWrapper, ?> wrapper);
+
+ WorkerWrapperBuilder end();
+ }
+
+ /**
+ * 便捷式设置本Wrapper被依赖的下游Wrapper。
+ *
+ * @param wrappers 下游Wrapper
+ */
+ default WorkerWrapperBuilder nextOf(WorkerWrapper... wrappers) {
+ return setNext().wrapper(wrappers).end();
+ }
+
+ default WorkerWrapperBuilder nextOf(Collection wrappers) {
+ return setNext().wrapper(wrappers).end();
+ }
+
+ /**
+ * 设置超时时间的具体属性
+ */
+ SetTimeOut setTimeOut();
+
+ interface SetTimeOut {
+ /**
+ * 是否启动超时判断。
+ *
+ * 默认为true
+ *
+ * @param enableElseDisable 是则true
+ */
+ SetTimeOut enableTimeOut(boolean enableElseDisable);
+
+ /**
+ * 设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断
+ *
+ * @param time 时间数值
+ * @param unit 时间单位
+ */
+ SetTimeOut setTime(long time, TimeUnit unit);
+
+ /**
+ * 是否允许被试图中断线程
+ *
+ * @param allow 是则true
+ */
+ SetTimeOut allowInterrupt(boolean allow);
+
+ WorkerWrapperBuilder end();
+ }
+
+ /**
+ * 便携式设置单个WorkerWrapper的超时时间。若不设置则不进行超时判断
+ *
+ * @param time 时间数值
+ * @param unit 时间单位
+ */
+ default WorkerWrapperBuilder timeout(long time, TimeUnit unit) {
+ return timeout(true, time, unit, false);
+ }
+
+ default WorkerWrapperBuilder timeout(boolean enableTimeOut, long time, TimeUnit unit, boolean allowInterrupt) {
+ return setTimeOut().enableTimeOut(enableTimeOut).setTime(time, unit).allowInterrupt(allowInterrupt).end();
+ }
+
+ /**
+ * 构建Wrapper。
+ *
+ * @return 返回WorkerWrapper
+ */
+ WorkerWrapper build();
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/WrapperEndingInspector.java b/src/main/java/com/jd/platform/async/wrapper/WrapperEndingInspector.java
new file mode 100644
index 0000000000000000000000000000000000000000..66bb3b9b75ddf1e7348ff6495d2a1bdacbfde057
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/WrapperEndingInspector.java
@@ -0,0 +1,486 @@
+package com.jd.platform.async.wrapper;
+
+import com.jd.platform.async.executor.timer.SystemClock;
+import com.jd.platform.async.worker.WorkResult;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+
+/**
+ * 判断{@link WorkerWrapper}是否链路调用完成的轮询器。
+ * =================================================================================
+ *
+ * 在v1.4及以前的版本,存在如下问题:
+ * >
+ * 在使用线程数量较少的线程池进行beginWork时,调用WorkerWrapper#beginNext方法时,
+ * 会因为本线程等待下游Wrapper执行完成而存在线程耗尽bug。线程池会死翘翘的僵住、动弹不得。
+ * >
+ * 例如仅有2个线程的线程池,执行以下任务:
+ * {@code
+ *
+ * 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况,在test/v15.dependnew中示例testThreadPolling_V14Bug说明了这个bug
+ * 线程数:2
+ * A(5ms)--B1(10ms) ---|--> C1(5ms)
+ * . \ | (B1、B2全部完成可执行C1、C2)
+ * . ---> B2(20ms) --|--> C2(5ms)
+ *
+ * }
+ * 线程1执行了A,然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成。
+ * 线程2执行了B1或B2中的一个,也在allOf方法等待C1、C2完成。
+ * 结果没有线程执行C和B2了,导致超时而死,并且这个线程池线程有可能被耗尽。
+ * >
+ * v1.5的解决方案是,放弃使工作线程遭致阻塞的{@link java.util.concurrent.CompletableFuture},
+ * 而是让工作线程在工作前注册到本“完成检查器”{@link WrapperEndingInspector},然后交由轮询中心{@link PollingCenter}进行检查是否完成。
+ *
+ * =================================================================================
+ *
+ * 本类的工作原理:
+ * .
+ * 原理:
+ * (1)首先在Async代码中,将主动运行的wrapper都保存到一个inspector{@link #addWrapper(WorkerWrapper)},
+ * (2)主动运行的wrapper于FINISH/ERROR时,先异步submit所有下游wrapper,在其执行时将自身(下游wrapper)保存到inspector,
+ * (3)然后在异步submit完所有下游wrapper后,将调用{@link #setWrapperEndWithTryPolling(WorkerWrapper)}方法,
+ * . 设置自己的{@link #wrappers}为true,并呼叫轮询{@link PollingCenter#tryPolling()}。
+ * (4)在下游wrapper中,经过策略器判断后,
+ * . 若是不需要运行,则把本wrapper计数-1{@link WrapperNode#count},若是计数<1则将{@link WrapperNode}移出{@link #wrappers}。
+ * . 若是需要运行,则运行之,然后跳转到 (2) 的情节。如此递归,执行链路上所有需要执行的wrapper最后都会存在于{@link #wrappers}中。
+ * .
+ * 因此,若是存在任一其{@link WrapperNode#called}为false的wrapper,则表示这条链路还没有调用完。
+ * 若是在{@link #wrappers}中所有的{@link WrapperNode#called}为true时,即可判断出链路执行完毕了。
+ *
+ *
+ * @author create by TcSnZh on 2021/5/5-下午3:22
+ */
+public class WrapperEndingInspector implements Comparable {
+ /**
+ * 最迟完成时间
+ */
+ private final long latestFinishTime;
+
+ /**
+ * 保存 需要检查的wrapper--相关属性 的Map。
+ */
+ private final ConcurrentHashMap wrappers = new ConcurrentHashMap<>();
+
+ /**
+ * 当全部wrapper都调用结束,它会countDown
+ */
+ private final CountDownLatch endCDL = new CountDownLatch(1);
+
+ /**
+ * 读锁用于修改数据,写锁用于轮询。使用公平锁让wrapper的时间波动不会太长。
+ *
+ * 在轮询到本inspector时,之所以要上写锁,是因为:
+ * 假如此时有个Wrapper正在调用{@link #addWrapper(WorkerWrapper)},则wrappers发生了改变。
+ * 假如现在恰巧访问到的是{@link #wrappers}迭代器的最后一个,但此时又加入了另一个,且这另一个又是需要去执行的。
+ * 那么假如在迭代器遍历到目前访问到的wrapper都是呼叫完毕的,那么这新加入的一个就会被忽略,从而判定为全部完成。致使bug发生。
+ *
+ * 此外,即便轮询时上写锁,对性能的影响也是有限的。因为这只会在“呼叫别人”的时候发生工作线程与轮询线程的锁争抢,
+ * 而在工作线程执行{@link com.jd.platform.async.callback.IWorker#action(Object, Map)}或
+ * {@link com.jd.platform.async.callback.ICallback#result(boolean, Object, WorkResult)}时,并不会与轮询线程去
+ * 争抢锁,而通常这个工作的时间才是最耗时的。
+ */
+ private final ReentrantReadWriteLock modifyPollingLock = new ReentrantReadWriteLock(true);
+
+ /**
+ * 当轮询发现超时时,该值被设为false
+ */
+ private final AtomicBoolean haveNotTimeOut = new AtomicBoolean(true);
+
+ public WrapperEndingInspector(long latestFinishTime) {
+ this.latestFinishTime = latestFinishTime;
+ }
+
+ public void registerToPollingCenter() {
+ modifyPollingLock.readLock().lock();
+ try {
+ // 不重复put,以免InspectorNode被替换为另一个
+ PollingCenter.getInstance().inspectionMap.putIfAbsent(this, new PollingCenter.InspectorNode());
+ } finally {
+ modifyPollingLock.readLock().unlock();
+ }
+ }
+
+ public void addWrapper(WorkerWrapper wrapper) {
+ modifyPollingLock.readLock().lock();
+ try {
+ wrappers.computeIfAbsent(wrapper, k -> new WrapperNode()).count.incrementAndGet();
+ } finally {
+ modifyPollingLock.readLock().unlock();
+ }
+ }
+
+ public void addWrapper(Collection extends WorkerWrapper> wrappers) {
+ modifyPollingLock.readLock().lock();
+ try {
+ Objects.requireNonNull(wrappers).forEach(this::addWrapper);
+ } finally {
+ modifyPollingLock.readLock().unlock();
+ }
+ }
+
+ public void reduceWrapper(WorkerWrapper wrapper) {
+ modifyPollingLock.readLock().lock();
+ try {
+ /*
+ * 有可能发生这情况,一个Wrapper刚被加进去,执行了零/一/多次,均不满足执行条件,但是下次调用却应当使其启动。
+ */
+ if (wrapper.getState() != WorkerWrapper.INIT) {
+ final WrapperNode wrapperNode = wrappers.get(wrapper);
+ if (wrapperNode == null) {
+ return;
+ }
+ synchronized (wrapperNode) {
+ if (wrapperNode.count.decrementAndGet() < 1) {
+ wrappers.remove(wrapper);
+ }
+ }
+ }
+ } finally {
+ modifyPollingLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * 原子的设置这个Wrapper已经呼叫完成了。
+ *
+ * 该方法会调用{@link PollingCenter#tryPolling()},呼叫轮询线程
+ *
+ * @return 如果为true,表示设置成功。为false表示已经被设置过了。
+ */
+ public boolean setWrapperEndWithTryPolling(WorkerWrapper wrapper) {
+ modifyPollingLock.readLock().lock();
+ try {
+ return !wrappers.get(wrapper).called.getAndSet(true);
+ } finally {
+ modifyPollingLock.readLock().unlock();
+ PollingCenter.getInstance().tryPolling();
+ }
+ }
+
+ /**
+ * 供外部调用的等待方法
+ *
+ * @return 在超时前完成,返回true。超时时间一到,就会返回false。就像,人被杀,就会死。
+ * @throws InterruptedException 外部调用的当前线程被中断时,会抛出这个异常。
+ */
+ public boolean await() throws InterruptedException {
+ endCDL.await();
+ return haveNotTimeOut.get();
+ }
+
+ /**
+ * {@link PollingCenter}会优先把最迟完成时间(即开始时间+超时时间)较早的Inspection放在前面。
+ */
+ @Override
+ public int compareTo(WrapperEndingInspector other) {
+ if (this.latestFinishTime - other.latestFinishTime < 0) {
+ return -1;
+ }
+ return 1;
+ }
+
+ @Override
+ public String toString() {
+ return "WrapperEndingInspector{" +
+ "remainTime=" + (latestFinishTime - SystemClock.now()) +
+ ", wrappers=" +
+ wrappers.entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue))
+ +
+ ", endCDL.getCount()=" + endCDL.getCount() +
+ ", writePollingLock={read=" + modifyPollingLock.getReadLockCount() + ",write=" + modifyPollingLock.getWriteHoldCount() +
+ "} }";
+ }
+
+ /**
+ * 节点对象,保存属性信息于{@link #wrappers}中。
+ *
+ * 当试图把Node移出本Map时,该Node对象自身将会被上锁。
+ */
+ public static class WrapperNode {
+ /**
+ * 是否已经呼叫完了下游wrapper
+ */
+ AtomicBoolean called = new AtomicBoolean(false);
+ /**
+ * 本wrapper总共被呼叫次数的统计。若小于1则会被移出map。
+ */
+ AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public String toString() {
+ return "{" +
+ "called=" + called.get() +
+ ", count=" + count.get() +
+ '}';
+ }
+ }
+
+ /**
+ * 轮询中心。具体的轮询调度由其完成。
+ *
+ * {@link #registerToPollingCenter()}调用时,就会将inspector注册到本轮询中心以供轮询。
+ */
+ public static class PollingCenter {
+ public static class InspectorNode {
+ /**
+ * 延迟轮询时间戳。
+ */
+ private volatile long delayTimeStamp = Long.MAX_VALUE;
+
+ private final ReadWriteLock lockOfDelayTimeStamp = new ReentrantReadWriteLock();
+
+ /**
+ * 比较传入时间戳与{@link #delayTimeStamp},并设置小的那个为{@link #delayTimeStamp}的值。
+ *
+ * @param otherDelayTimeStamp 试图用来比较的另一个时间戳
+ */
+ public void compareAndSetMinDelayTimeStamp(long otherDelayTimeStamp) {
+ lockOfDelayTimeStamp.writeLock().lock();
+ try {
+ long dif = otherDelayTimeStamp - delayTimeStamp;
+ if (dif > 0) {
+ return;
+ }
+ delayTimeStamp = otherDelayTimeStamp;
+ } finally {
+ lockOfDelayTimeStamp.writeLock().unlock();
+ }
+ }
+
+ public long getDelayTimeStamp() {
+ lockOfDelayTimeStamp.readLock().lock();
+ try {
+ return delayTimeStamp;
+ } finally {
+ lockOfDelayTimeStamp.readLock().unlock();
+ }
+ }
+
+ public long clearTimeStamp() {
+ lockOfDelayTimeStamp.writeLock().lock();
+ try {
+ long old = this.delayTimeStamp;
+ delayTimeStamp = Long.MAX_VALUE;
+ return old;
+ } finally {
+ lockOfDelayTimeStamp.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "InspectorNode{" +
+ "delayTimeStamp=" + delayTimeStamp +
+ ", lockOfDelayTimeStamp=" + lockOfDelayTimeStamp +
+ '}';
+ }
+ }
+
+ /**
+ * 将被轮询的WrapperFinishInspection集合。
+ */
+ private final Map inspectionMap = new ConcurrentSkipListMap<>();
+
+ /**
+ * 请求轮询。
+ */
+ private void tryPolling() {
+ // 开始轮询
+ SINGLETON_POLLING_POOL.submit(() -> {
+ // 用来判断在轮询过程中是否有新增的inspector的值
+ int expectCount;
+ // 如果此值变化过,则在结束时让自己在此值后的时间再启动轮询
+ while (!inspectionMap.isEmpty()) {
+ // expectCount是本线程用来记录本次循环开始时inspectionMap的个数。
+ // 每当移出一个inspector时,该值-1。
+ expectCount = inspectionMap.size();
+ // 开始检查
+ for (Map.Entry entry : inspectionMap.entrySet()) {
+ final WrapperEndingInspector inspector = entry.getKey();
+ final InspectorNode inspectorNode = entry.getValue();
+ // 直接抢锁,轮询期间禁止修改inspector
+ inspector.modifyPollingLock.writeLock().lock();
+ try {
+ // 对一个inspector进行检查
+ if (PollingCenter.this.checkInspectorIsEnd(inspector, inspectorNode)) {
+ // inspector中的wrapper调用结束了
+ // 先要把wrapper给停了
+ inspector.wrappers.forEach((wrapper, wrapperNode) -> {
+ WorkerWrapper.TimeOutProperties timeOut = wrapper.getTimeOut();
+ if (timeOut != null) {
+ timeOut.checkTimeOut(true);
+ }
+ });
+ // 修改此inspector和expectCount的状态
+ if (inspector.endCDL.getCount() > 0) {
+ // 双重检查使endCDL原子性countDown。
+ synchronized (inspector.endCDL) {
+ if (inspector.endCDL.getCount() > 0) {
+ inspectionMap.remove(inspector);
+ expectCount--;
+ inspector.endCDL.countDown();
+ }
+ }
+ }
+ }
+ } finally {
+ inspector.modifyPollingLock.writeLock().unlock();
+ }
+ }
+ /*
+ * 根据 expectCount == inspectionMap.size() 的值,在仅有本线程1个线程在轮询的情况下:
+ * 1. 若值为true,表示轮询过程中没有新的inspector被添加进set中。此时就可以break了。
+ * . 之所以可以break,是因为这个inspection还没有调用结束,在其结束前还会来催促轮询的。
+ * 2. 若值为false,表示有新的inspector在本线程轮询时,被加入到了set中,且没有被我们迭代到。此时还要重新轮询一次。
+ */
+ if (expectCount == inspectionMap.size()) {
+ break;
+ }
+ }
+ });
+ }
+
+ private boolean checkInspectorIsEnd(WrapperEndingInspector inspector, InspectorNode inspectorNode) {
+ // 判断一下inspector整组是否超时
+ if (inspector.latestFinishTime < SystemClock.now()) {
+ inspector.haveNotTimeOut.set(false);
+ inspector.wrappers.forEach(((wrapper, wrapperNode) -> {
+ wrapper.failNow();
+ wrapperNode.called.set(true);
+ }));
+ return true;
+ }
+ // 将延迟检查时间设为离现在最近的值。
+ // 此处判断的是inspector所代表整次任务的超时时间
+ inspectorNode.compareAndSetMinDelayTimeStamp(inspector.latestFinishTime);
+ // 判断inspector是否结束,并顺便记录、判断、修改wrapper的超时信息
+ for (Map.Entry entry : inspector.wrappers.entrySet()) {
+ WorkerWrapper wrapper = entry.getKey();
+ // 判断单个wrapper是否超时
+ WorkerWrapper.TimeOutProperties timeOutProperties = wrapper.getTimeOut();
+ if (timeOutProperties != null && timeOutProperties.isEnable()) {
+ // 将延迟检查时间设为离现在最近的值。
+ // 此处判断的是wrapper的超时时间
+ if (timeOutProperties.checkTimeOut(true)) {
+ inspector.haveNotTimeOut.set(false);
+ }
+ // 未超时但是设置了超时检查的话,记录一下inspector延时轮询时间
+ else {
+ inspectorNode.compareAndSetMinDelayTimeStamp(
+ (timeOutProperties.isStarted() ? timeOutProperties.getStartWorkingTime() : SystemClock.now())
+ + timeOutProperties.getUnit().toMillis(timeOutProperties.getTime())
+ );
+ }
+ }
+ // 判断wrapper是否执行完毕
+ WrapperNode node = entry.getValue();
+ if (wrapper.getState() == WorkerWrapper.INIT
+ // 上值如果为false,表示该Wrapper要么还没来得及执行,要么判断不需要执行但是还未被移出
+ || !node.called.get()
+ // 上值如果为false,表示该Wrapper正在工作或是刚刚结束/失败,还未将所有下游Wrapper调用一遍。
+ ) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ {
+ final String executorName = "asyncTool-pollingDelayCaller";
+ ScheduledThreadPoolExecutor delayPollingExecutor = new ScheduledThreadPoolExecutor(
+ 1,
+ new ThreadFactory() {
+ private final AtomicLong threadCount = new AtomicLong(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, executorName + "-thread-" + threadCount.getAndIncrement());
+ t.setDaemon(true);
+ // 线程优先级不高
+ t.setPriority(1);
+ return t;
+ }
+
+ @Override
+ public String toString() {
+ return executorName + "-threadFactory";
+ }
+ }
+ ) {
+ @Override
+ public String toString() {
+ return executorName + "{PollingCenter.this=" + PollingCenter.this + "}";
+ }
+ };
+ // 每毫秒判断一次:map.value的每个延迟轮询队列的头号元素是否抵达当前时间,如果到了,则清除并调用轮询
+ delayPollingExecutor.scheduleAtFixedRate(() -> inspectionMap.values().stream()
+ .min(Comparator.comparingLong(InspectorNode::getDelayTimeStamp))
+ .ifPresent(node -> {
+ long delayTimeStamp = node.getDelayTimeStamp();
+ if (Long.MAX_VALUE != delayTimeStamp && SystemClock.now() > delayTimeStamp) {
+ tryPolling();
+ }
+ }), 1, 1, TimeUnit.MILLISECONDS);
+ }
+
+ // ========== static ==========
+
+ private final static PollingCenter instance = new PollingCenter();
+
+ public static PollingCenter getInstance() {
+ return instance;
+ }
+
+ /**
+ * 单线程的轮询线程池
+ */
+ private static final ThreadPoolExecutor SINGLETON_POLLING_POOL;
+
+ static {
+ SINGLETON_POLLING_POOL = new ThreadPoolExecutor(
+ 0,
+ // 轮询线程数必须为1
+ 1,
+ 15L,
+ TimeUnit.SECONDS,
+ // 必须保存至少一个轮询请求,以便在本线程轮询结束时,获取到已轮询过的线程提交的轮询请求
+ new ArrayBlockingQueue<>(1),
+ new ThreadFactory() {
+ private final AtomicLong threadCount = new AtomicLong(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "asyncTool-pollingCenterPool-thread-" + threadCount.getAndIncrement());
+ t.setDaemon(true);
+ // 线程优先级不高
+ t.setPriority(3);
+ return t;
+ }
+
+ @Override
+ public String toString() {
+ return "asyncTool-pollingCenterPool-threadFactory";
+ }
+ },
+ // 多的就丢了,反正都是催这一个线程去轮询
+ new ThreadPoolExecutor.DiscardPolicy()
+ ) {
+ @Override
+ public String toString() {
+ return "asyncTool-pollingCenterPool";
+ }
+ };
+ }
+ }
+
+}
+
diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependMustStrategyMapper.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependMustStrategyMapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..8ad1019bc1bf03c012a0e55d0073428d26165c58
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependMustStrategyMapper.java
@@ -0,0 +1,99 @@
+package com.jd.platform.async.wrapper.actionstrategy;
+
+import com.jd.platform.async.worker.ResultState;
+import com.jd.platform.async.wrapper.WorkerWrapper;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * 这是一个“向历史妥协”的策略器。以兼容must开关模式。
+ *
+ * @author create by TcSnZh on 2021/5/4-下午1:24
+ */
+public class DependMustStrategyMapper implements DependenceStrategy {
+
+ private final Set> mustDependSet = new LinkedHashSet<>();
+
+ /**
+ * 在{@link #mustDependSet} 中的must依赖。
+ *
+ * 如果{@code mustDependSet == null || mustDependSet.size() < 1},返回{@link DependenceAction#JUDGE_BY_AFTER}
+ *
+ * 如果所有的Wrapper已经完成,本Wrapper将会开始工作。
+ *
+ * 如果任一{@link #mustDependSet}中的Wrapper失败,则返回{@link DependenceAction#FAST_FAIL}。
+ * 具体超时/异常则根据{@link com.jd.platform.async.worker.ResultState}的值进行判断。
+ *
+ * 如果存在Wrapper未完成 且 所有的Wrapper都未失败,则返回{@link DependenceAction#JUDGE_BY_AFTER}。
+ *
+ */
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ if (mustDependSet.size() < 1) {
+ return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
+ }
+ boolean allSuccess = true;
+ for (WorkerWrapper, ?> wrapper : mustDependSet) {
+ switch (wrapper.getWorkResult().getResultState()) {
+ case TIMEOUT:
+ return DependenceAction.FAST_FAIL.fastFailException(ResultState.TIMEOUT, null);
+ case EXCEPTION:
+ return DependenceAction.FAST_FAIL.fastFailException(ResultState.EXCEPTION, wrapper.getWorkResult().getEx());
+ case DEFAULT:
+ allSuccess = false;
+ case SUCCESS:
+ default:
+ }
+ }
+ if (allSuccess) {
+ return DependenceAction.START_WORK.emptyProperty();
+ }
+ return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
+ }
+
+ /**
+ * 新增must依赖。
+ *
+ * @param mustDependWrapper WorkerWrapper
+ * @return 返回自身
+ */
+ public DependMustStrategyMapper addDependMust(WorkerWrapper, ?> mustDependWrapper) {
+ if (mustDependWrapper == null) {
+ return this;
+ }
+ mustDependSet.add(mustDependWrapper);
+ return this;
+ }
+
+ public DependMustStrategyMapper addDependMust(Collection> wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ mustDependSet.addAll(wrappers);
+ return this;
+ }
+
+ public DependMustStrategyMapper addDependMust(WorkerWrapper, ?>... wrappers) {
+ if (wrappers == null) {
+ return this;
+ }
+ return addDependMust(Arrays.asList(wrappers));
+ }
+
+ public Set> getMustDependSet() {
+ return mustDependSet;
+ }
+
+ @Override
+ public String toString() {
+ return "DependMustStrategyMapper{" +
+ "mustDependSet::getId=" + mustDependSet.stream().map(WorkerWrapper::getId).collect(Collectors.toList()) +
+ '}';
+ }
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperActionStrategy.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperActionStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..745168ff8c6cfcca42bceace3b1df91c91960e12
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperActionStrategy.java
@@ -0,0 +1,74 @@
+package com.jd.platform.async.wrapper.actionstrategy;
+
+import com.jd.platform.async.wrapper.WorkerWrapper;
+
+/**
+ * 单参数策略。
+ *
+ * @author create by TcSnZh on 2021/5/1-下午11:16
+ */
+@FunctionalInterface
+public interface DependWrapperActionStrategy {
+ /**
+ * 仅使用一个参数的判断方法
+ *
+ * @param fromWrapper 调用本Wrapper的上游Wrapper
+ * @return 返回 {@link DependenceAction.WithProperty}
+ */
+ DependenceAction.WithProperty judge(WorkerWrapper, ?> fromWrapper);
+
+ // ========== 送几个供链式调用的默认值 ==========
+
+ /**
+ * 成功时,交给下一个策略器判断。
+ * 未运行时,休息。
+ * 失败时,失败。
+ */
+ DependWrapperActionStrategy SUCCESS_CONTINUE = new DependWrapperActionStrategy() {
+ @Override
+ public DependenceAction.WithProperty judge(WorkerWrapper, ?> ww) {
+ switch (ww.getWorkResult().getResultState()) {
+ case SUCCESS:
+ return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
+ case DEFAULT:
+ return DependenceAction.TAKE_REST.emptyProperty();
+ case EXCEPTION:
+ case TIMEOUT:
+ return DependenceAction.FAST_FAIL.fastFailException(ww.getWorkResult().getResultState(), ww.getWorkResult().getEx());
+ default:
+ }
+ throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + ww.getWorkResult().getResultState());
+ }
+
+ @Override
+ public String toString() {
+ return "SUCCESS_CONTINUE";
+ }
+ };
+ /**
+ * 成功时,开始工作。
+ * 未运行时,交给下一个策略器判断。
+ * 失败时,失败。
+ */
+ DependWrapperActionStrategy SUCCESS_START_INIT_CONTINUE = new DependWrapperActionStrategy() {
+ @Override
+ public DependenceAction.WithProperty judge(WorkerWrapper, ?> ww) {
+ switch (ww.getWorkResult().getResultState()) {
+ case SUCCESS:
+ return DependenceAction.START_WORK.emptyProperty();
+ case DEFAULT:
+ return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
+ case EXCEPTION:
+ case TIMEOUT:
+ return DependenceAction.FAST_FAIL.fastFailException(ww.getWorkResult().getResultState(), ww.getWorkResult().getEx());
+ default:
+ }
+ throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + ww.getWorkResult().getResultState());
+ }
+
+ @Override
+ public String toString() {
+ return "SUCCESS_START_INIT_CONTINUE";
+ }
+ };
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperStrategyMapper.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperStrategyMapper.java
new file mode 100644
index 0000000000000000000000000000000000000000..5c2cd5672d17c4ff8f7a843d1d86e19b669b9e90
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependWrapperStrategyMapper.java
@@ -0,0 +1,69 @@
+package com.jd.platform.async.wrapper.actionstrategy;
+
+import com.jd.platform.async.wrapper.WorkerWrapper;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * 对不同的{@link WorkerWrapper}调用者实行个性化依赖响应策略。
+ *
+ * 使用{@link DependWrapperStrategyMapper}本实现类对{@link DependenceStrategy}进行增强,
+ *
+ * @author create by TcSnZh on 2021/5/1-下午11:12
+ */
+public class DependWrapperStrategyMapper implements DependenceStrategy {
+ private final Map, DependWrapperActionStrategy> mapper = new ConcurrentHashMap<>(4);
+
+ /**
+ * 设置对应策略
+ *
+ * @param targetWrapper 要设置策略的WorkerWrapper
+ * @param strategy 要设置的策略
+ * @return 返回this,链式调用。
+ */
+ public DependWrapperStrategyMapper putMapping(WorkerWrapper, ?> targetWrapper, DependWrapperActionStrategy strategy) {
+ mapper.put(targetWrapper, strategy);
+ toStringCache = null;
+ return this;
+ }
+
+ /**
+ * 判断方法。
+ *
+ * 如果fromWrapper在{@link #mapper}中,则返回{@link DependWrapperActionStrategy}的判断返回值。否则返回{@link DependenceAction#JUDGE_BY_AFTER}
+ *
+ * @param dependWrappers (这里不会使用该值)thisWrapper.dependWrappers的属性值。
+ * @param thisWrapper (这里不会使用该值)thisWrapper,即为“被催促”的WorkerWrapper
+ * @param fromWrapper 调用来源Wrapper。
+ * @return 如果在mapper中有对fromWrapper的处理策略,则使用其进行判断。否则返回JUDGE_BY_AFTER交给下一个进行判断。
+ */
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ DependWrapperActionStrategy strategy = mapper.get(fromWrapper);
+ if (strategy == null) {
+ return DependenceAction.JUDGE_BY_AFTER.emptyProperty();
+ }
+ return strategy.judge(fromWrapper);
+ }
+
+ /**
+ * 缓存toString
+ */
+ private String toStringCache;
+
+ @Override
+ public String toString() {
+ if (toStringCache == null) {
+ toStringCache = "DependWrapperStrategyMapper{mapper=" + mapper.entrySet().stream()
+ .map(entry -> "{" + entry.getKey().getId() + ":" + entry.getValue() + "}")
+ .collect(Collectors.toList())
+ + "}";
+ }
+ return toStringCache;
+ }
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceAction.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceAction.java
new file mode 100644
index 0000000000000000000000000000000000000000..acbf5d5542b26a22c818727786b8e92bdf23467d
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceAction.java
@@ -0,0 +1,99 @@
+package com.jd.platform.async.wrapper.actionstrategy;
+
+import com.jd.platform.async.worker.ResultState;
+
+/**
+ * 返回执行工作类型的枚举。
+ *
+ * @author create by TcSnZh on 2021/5/1-下午10:47
+ */
+public enum DependenceAction {
+ /**
+ * 开始工作。WorkerWrapper会执行工作方法。
+ */
+ START_WORK,
+ /**
+ * 还没轮到,休息一下。WorkerWrapper中的调用栈会返回,以等待可能发生的下次调用。
+ */
+ TAKE_REST,
+ /**
+ * 立即失败。WorkerWrapper会去执行快速失败的方法。
+ */
+ FAST_FAIL,
+ /**
+ * 交给下层{@link DependenceStrategy}进行判断。
+ * 在WorkerWrapper中不需要考虑此值,因为配置正常的情况下不会返回这个值。
+ */
+ JUDGE_BY_AFTER;
+
+ // 空值单例
+
+ public WithProperty emptyProperty() {
+ return empty;
+ }
+
+ private final WithProperty empty = new WithProperty() {
+ @Override
+ public void setResultState(ResultState resultState) {
+ throw new UnsupportedOperationException("empty not support modify");
+ }
+
+ @Override
+ public void setFastFailException(Exception fastFailException) {
+ throw new UnsupportedOperationException("empty not support modify");
+ }
+
+ private final String toString = getDependenceAction() + ".empty";
+
+ @Override
+ public String toString() {
+ return toString;
+ }
+ };
+
+ // 携带异常信息、ResultState的返回值
+
+ public WithProperty fastFailException(ResultState resultState, Exception e) {
+ WithProperty withProperty = this.new WithProperty();
+ withProperty.setResultState(resultState);
+ withProperty.setFastFailException(e);
+ return withProperty;
+ }
+
+ /**
+ * 有时需要封装一些参数来返回,则使用本内部类进行返回。
+ *
+ * 所有的构造方法权限均为private,请在父枚举类{@link DependenceAction}的方法中选择合适的模板生成内部类WithProperty。
+ */
+ public class WithProperty {
+ private ResultState resultState;
+ private Exception fastFailException;
+
+ // getter setter
+
+ public ResultState getResultState() {
+ return resultState;
+ }
+
+ public void setResultState(ResultState resultState) {
+ this.resultState = resultState;
+ }
+
+ public Exception getFastFailException() {
+ return fastFailException;
+ }
+
+ public void setFastFailException(Exception fastFailException) {
+ this.fastFailException = fastFailException;
+ }
+
+ public DependenceAction getDependenceAction() {
+ return DependenceAction.this;
+ }
+
+ // constructor always private.
+
+ private WithProperty() {
+ }
+ }
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceStrategy.java b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..56368ea21e1b5d5ca5a75ff8be14a46009cdafab
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/actionstrategy/DependenceStrategy.java
@@ -0,0 +1,255 @@
+package com.jd.platform.async.wrapper.actionstrategy;
+
+import com.jd.platform.async.wrapper.WrapperEndingInspector;
+import com.jd.platform.async.worker.ResultState;
+import com.jd.platform.async.worker.WorkResult;
+import com.jd.platform.async.wrapper.WorkerWrapper;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/**
+ * 依赖策略接口。
+ *
+ * 提供了多个默认值可以作为单例模式使用。
+ *
+ * 工作原理示例:
+ *
+ * ==== 一个简单示例 ====
+ * 现有三个WorkerWrapper:A、B、C,其中 {@code A{dependWrappers=[B,C],} }
+ * 当B执行完成后调用A时,根据依赖关系ALL_DEPENDENCIES_ALL_SUCCESS,还需等待C的结果。
+ * 然后,当C执行完成后调用A时,根据依赖关系ALL_DEPENDENCIES_ALL_SUCCESS: 此时如果C成功了,A就开工,此时如果C失败了,A就失败。
+ * ==== 简单示例2 ====
+ *
+ *
+ *
+ * @author create by TcSnZh on 2021/5/1-下午10:48
+ */
+@FunctionalInterface
+public interface DependenceStrategy {
+ /**
+ * 核心判断策略
+ *
+ * @param dependWrappers thisWrapper.dependWrappers的属性值。
+ * @param thisWrapper thisWrapper,即为“被催促”的WorkerWrapper
+ * @param fromWrapper 调用来源Wrapper。
+ *
+ * 该参数不会为null。
+ * 因为在{@link WorkerWrapper#work(ExecutorService, long, Map, WrapperEndingInspector)}方法中传入的的第一批无依赖的Wrapper,
+ * 不会被该策略器所判断,而是不论如何直接执行。
+ *
+ * @return 返回枚举值内部类,WorkerWrapper将会根据其值来决定自己如何响应这次调用。 {@link DependenceAction.WithProperty}
+ */
+ DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper);
+
+ /**
+ * 如果本策略器的judge方法返回了JUDGE_BY_AFTER,则交给下一个策略器来判断。
+ *
+ * @param after 下层策略器
+ * @return 返回一个“封装的多层策略器”
+ */
+ default DependenceStrategy thenJudge(DependenceStrategy after) {
+ DependenceStrategy that = this;
+ return new DependenceStrategy() {
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ DependenceAction.WithProperty judge = that.judgeAction(dependWrappers, thisWrapper, fromWrapper);
+ if (judge.getDependenceAction() == DependenceAction.JUDGE_BY_AFTER) {
+ return after.judgeAction(dependWrappers, thisWrapper, fromWrapper);
+ }
+ return judge;
+ }
+
+ @Override
+ public String toString() {
+ return that + " ----> " + after;
+ }
+ };
+ }
+
+ // ========== 以下是一些默认实现 ==========
+
+ /**
+ * 被依赖的所有Wrapper都必须成功才能开始工作。
+ * 如果其中任一Wrapper还没有执行且不存在失败,则休息。
+ * 如果其中任一Wrapper失败则立即失败。
+ */
+ DependenceStrategy ALL_DEPENDENCIES_ALL_SUCCESS = new DependenceStrategy() {
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ boolean hasWaiting = false;
+ for (final WorkerWrapper, ?> dependWrapper : dependWrappers) {
+ WorkResult> workResult = dependWrapper.getWorkResult();
+ switch (workResult.getResultState()) {
+ case DEFAULT:
+ hasWaiting = true;
+ break;
+ case SUCCESS:
+ break;
+ case TIMEOUT:
+ case EXCEPTION:
+ return DependenceAction.FAST_FAIL.fastFailException(workResult.getResultState(), workResult.getEx());
+ default:
+ throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + workResult.getResultState());
+ }
+ }
+ if (hasWaiting) {
+ return DependenceAction.TAKE_REST.emptyProperty();
+ }
+ return DependenceAction.START_WORK.emptyProperty();
+ }
+
+ @Override
+ public String toString() {
+ return "ALL_DEPENDENCIES_ALL_SUCCESS";
+ }
+ };
+
+ /**
+ * 被依赖的Wrapper中任意一个成功了就可以开始工作。
+ * 如果其中所有Wrapper还没有执行,则休息。
+ * 如果其中一个Wrapper失败且不存在成功则立即失败。
+ */
+ DependenceStrategy ALL_DEPENDENCIES_ANY_SUCCESS = new DependenceStrategy() {
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ boolean hasFailed = false;
+ Exception fastFailException = null;
+ ResultState resultState = null;
+ for (final WorkerWrapper, ?> dependWrapper : dependWrappers) {
+ WorkResult> workResult = dependWrapper.getWorkResult();
+ switch (workResult.getResultState()) {
+ case DEFAULT:
+ break;
+ case SUCCESS:
+ return DependenceAction.START_WORK.emptyProperty();
+ case TIMEOUT:
+ case EXCEPTION:
+ resultState = !hasFailed ? workResult.getResultState() : resultState;
+ fastFailException = !hasFailed ? workResult.getEx() : fastFailException;
+ hasFailed = true;
+ break;
+ default:
+ throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + workResult.getResultState());
+ }
+ }
+ if (hasFailed) {
+ return DependenceAction.FAST_FAIL.fastFailException(resultState, fastFailException);
+ }
+ return DependenceAction.TAKE_REST.emptyProperty();
+ }
+
+ @Override
+ public String toString() {
+ return "ALL_DEPENDENCIES_ANY_SUCCESS";
+ }
+ };
+
+ /**
+ * 如果被依赖的工作中任一失败,则立即失败。否则就开始工作(不论之前的工作有没有开始)。
+ */
+ DependenceStrategy ALL_DEPENDENCIES_NONE_FAILED = new DependenceStrategy() {
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ for (WorkerWrapper, ?> dependWrapper : dependWrappers) {
+ WorkResult> workResult = dependWrapper.getWorkResult();
+ switch (workResult.getResultState()) {
+ case TIMEOUT:
+ case EXCEPTION:
+ return DependenceAction.FAST_FAIL.fastFailException(workResult.getResultState(), workResult.getEx());
+ default:
+ }
+ }
+ return DependenceAction.START_WORK.emptyProperty();
+ }
+
+ @Override
+ public String toString() {
+ return "ALL_DEPENDENCIES_NONE_FAILED";
+ }
+ };
+
+ /**
+ * 只有当指定的这些Wrapper都成功时,才会开始工作。
+ * 任一失败会快速失败。
+ * 任一还没有执行且不存在失败,则休息。
+ *
+ * @param theseWrapper 该方法唯一有效参数。
+ * @return 返回生成的 {@link DependenceAction.WithProperty)
+ */
+ static DependenceStrategy theseWrapperAllSuccess(Set> theseWrapper) {
+ return new DependenceStrategy() {
+ private final Set> theseWrappers;
+ private final String toString;
+
+ {
+ theseWrappers = Collections.unmodifiableSet(theseWrapper);
+ toString = "THESE_WRAPPER_MUST_SUCCESS:" + theseWrappers.stream().map(WorkerWrapper::getId).collect(Collectors.toList());
+ }
+
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ boolean hasWaiting = false;
+ for (WorkerWrapper, ?> wrapper : theseWrappers) {
+ ResultState resultState = wrapper.getWorkResult().getResultState();
+ switch (resultState) {
+ case DEFAULT:
+ hasWaiting = true;
+ break;
+ case SUCCESS:
+ break;
+ case TIMEOUT:
+ case EXCEPTION:
+ return DependenceAction.FAST_FAIL.fastFailException(resultState, wrapper.getWorkResult().getEx());
+ default:
+ throw new RuntimeException("不该执行到的代码 workResult.getResultState()=" + resultState);
+ }
+ }
+ if (hasWaiting) {
+ return DependenceAction.TAKE_REST.emptyProperty();
+ }
+ return DependenceAction.START_WORK.emptyProperty();
+ }
+
+
+ @Override
+ public String toString() {
+ return toString;
+ }
+ };
+ }
+
+ DependenceStrategy IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY = new DependenceStrategy() {
+ @Override
+ public DependenceAction.WithProperty judgeAction(Set> dependWrappers,
+ WorkerWrapper, ?> thisWrapper,
+ WorkerWrapper, ?> fromWrapper) {
+ DependMustStrategyMapper mustMapper = thisWrapper.getWrapperStrategy().getDependMustStrategyMapper();
+ if (mustMapper != null && !mustMapper.getMustDependSet().isEmpty()) {
+ // 至少有一个must,则因为must未完全完成而等待。
+ return DependenceAction.TAKE_REST.emptyProperty();
+ }
+ // 如果一个must也没有,则认为应该是ANY模式。
+ return DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS.judgeAction(dependWrappers, thisWrapper, fromWrapper);
+ }
+
+ @Override
+ public String toString() {
+ return "IF_MUST_SET_NOT_EMPTY_ALL_SUCCESS_ELSE_ANY";
+ }
+ };
+
+}
diff --git a/src/main/java/com/jd/platform/async/wrapper/skipstrategy/SkipStrategy.java b/src/main/java/com/jd/platform/async/wrapper/skipstrategy/SkipStrategy.java
new file mode 100644
index 0000000000000000000000000000000000000000..d98aaa27e8399c0ef544815dd2824145cf8ba672
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/wrapper/skipstrategy/SkipStrategy.java
@@ -0,0 +1,183 @@
+package com.jd.platform.async.wrapper.skipstrategy;
+
+import com.jd.platform.async.wrapper.WorkerWrapper;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author create by TcSnZh on 2021/5/6-下午3:02
+ */
+@FunctionalInterface
+public interface SkipStrategy {
+ /**
+ * 跳过策略函数。返回true将会使WorkerWrapper跳过执行。
+ *
+ * @param nextWrappers 下游WrapperSet
+ * @param thisWrapper 本WorkerWrapper
+ * @param fromWrapper 呼叫本Wrapper的上游Wrapper
+ * @return 返回true将会使WorkerWrapper跳过执行。
+ */
+ boolean shouldSkip(Set> nextWrappers, WorkerWrapper, ?> thisWrapper, WorkerWrapper, ?> fromWrapper);
+
+ /**
+ * 不跳过
+ */
+ SkipStrategy NOT_SKIP = new SkipStrategy() {
+ @Override
+ public boolean shouldSkip(Set> nextWrappers, WorkerWrapper, ?> thisWrapper, WorkerWrapper, ?> fromWrapper) {
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "NOT_SKIP";
+ }
+ };
+
+ SkipStrategy CHECK_ONE_LEVEL = new SkipStrategy() {
+ private final SkipStrategy searchNextOneLevel = searchNextWrappers(SearchNextWrappers.SearchType.DFS, 1);
+
+ @Override
+ public boolean shouldSkip(Set> nextWrappers, WorkerWrapper, ?> thisWrapper, WorkerWrapper, ?> fromWrapper) {
+ return searchNextOneLevel.shouldSkip(nextWrappers, thisWrapper, fromWrapper);
+ }
+
+ @Override
+ public String toString() {
+ return "CHECK_ONE_LEVEL";
+ }
+ };
+
+ default SearchNextWrappers searchNextWrappers(SearchNextWrappers.SearchType searchType, int searchLevel) {
+ return new SearchNextWrappers(searchType, searchLevel);
+ }
+
+ /**
+ * 检查之后的Wrapper是否不在INIT状态
+ */
+ class SearchNextWrappers implements SkipStrategy {
+ /**
+ * 搜索策略
+ */
+ enum SearchType {
+ DFS, BFS;
+ }
+
+ private final SearchType searchType;
+
+ /**
+ * 搜索深度
+ */
+ private final int searchLevel;
+
+ public SearchNextWrappers(SearchType searchType, int searchLevel) {
+ this.searchType = Objects.requireNonNull(searchType);
+ this.searchLevel = searchLevel;
+ }
+
+ @Override
+ public boolean shouldSkip(Set> nextWrappers, WorkerWrapper, ?> thisWrapper, WorkerWrapper, ?> fromWrapper) {
+ Set> nextSet;
+ if ((nextSet = nextWrappers) == null || nextSet.isEmpty()) {
+ return false;
+ }
+ switch (searchType) {
+ case DFS:
+ return nextSet.stream().allMatch(next ->
+ next.getState() != WorkerWrapper.INIT || dfsSearchShouldSkip(next, 1));
+ case BFS:
+ LinkedList queue = nextSet.stream().map(ww -> new BfsNode(ww, 0)).collect(Collectors.toCollection(LinkedList::new));
+ HashSet> existed = new HashSet<>(nextSet);
+ while (!queue.isEmpty()) {
+ BfsNode node = queue.poll();
+ if (node.atLevel > searchLevel) {
+ continue;
+ }
+ if (node.wrapper.getState() != WorkerWrapper.INIT) {
+ return true;
+ }
+ if (node.atLevel < searchLevel) {
+ // 如果不是深度的最大值,则往队列里添加
+ node.wrapper.getNextWrappers().forEach(nextWrapper -> {
+ if (existed.contains(nextWrapper)) {
+ return;
+ }
+ queue.offer(new BfsNode(nextWrapper, node.atLevel + 1));
+ existed.add(nextWrapper);
+ });
+ }
+ }
+ return false;
+ default:
+ throw new IllegalStateException("searchType type illegal : " + searchType);
+ }
+ }
+
+ private boolean dfsSearchShouldSkip(WorkerWrapper, ?> currentWrapper, int currentLevel) {
+ if (currentLevel + 1 > searchLevel || currentWrapper == null) {
+ return false;
+ }
+ for (WorkerWrapper, ?> nextWrapper : currentWrapper.getNextWrappers()) {
+ if (nextWrapper != null &&
+ (nextWrapper.getState() != WorkerWrapper.INIT
+ || dfsSearchShouldSkip(nextWrapper, currentLevel + 1))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ static class BfsNode {
+ final WorkerWrapper, ?> wrapper;
+ final int atLevel;
+
+ public BfsNode(WorkerWrapper, ?> wrapper, int atLevel) {
+ this.wrapper = wrapper;
+ this.atLevel = atLevel;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BfsNode bfsNode = (BfsNode) o;
+ return Objects.equals(wrapper, bfsNode.wrapper);
+ }
+
+ @Override
+ public int hashCode() {
+ return wrapper.hashCode();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SearchNextWrappers that = (SearchNextWrappers) o;
+ return searchLevel == that.searchLevel && searchType == that.searchType;
+ }
+
+ @Override
+ public int hashCode() {
+ return searchLevel ^ searchType.ordinal();
+ }
+
+ @Override
+ public String toString() {
+ return "CheckNextWrapper{" +
+ "searchType=" + searchType +
+ ", searchLevel=" + searchLevel +
+ '}';
+ }
+ }
+}
diff --git a/src/test/java/dependnew/DeWorker.java b/src/test/java/beforev14/depend/DeWorker.java
similarity index 90%
rename from src/test/java/dependnew/DeWorker.java
rename to src/test/java/beforev14/depend/DeWorker.java
index 6ae011f911e551eb4ed1c909a0715f0107268f19..e9f2b821bf0b7de5eb891936136a19e8ddea523a 100755
--- a/src/test/java/dependnew/DeWorker.java
+++ b/src/test/java/beforev14/depend/DeWorker.java
@@ -1,4 +1,4 @@
-package dependnew;
+package beforev14.depend;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker implements IWorker, ICallback {
+class DeWorker implements IWorker, ICallback {
@Override
public User action(String object, Map allWrappers) {
diff --git a/src/test/java/depend/DeWorker1.java b/src/test/java/beforev14/depend/DeWorker1.java
similarity index 89%
rename from src/test/java/depend/DeWorker1.java
rename to src/test/java/beforev14/depend/DeWorker1.java
index 6cafc30cbed3593203e3162c5622a345920ebf0e..b95876821b0bc2e5602e5031b1f3e4def9600d9e 100755
--- a/src/test/java/depend/DeWorker1.java
+++ b/src/test/java/beforev14/depend/DeWorker1.java
@@ -1,4 +1,4 @@
-package depend;
+package beforev14.depend;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker1 implements IWorker, User>, ICallback, User> {
+class DeWorker1 implements IWorker, User>, ICallback, User> {
@Override
public User action(WorkResult result, Map allWrappers) {
diff --git a/src/test/java/depend/DeWorker2.java b/src/test/java/beforev14/depend/DeWorker2.java
similarity index 89%
rename from src/test/java/depend/DeWorker2.java
rename to src/test/java/beforev14/depend/DeWorker2.java
index 3dd73e7fa55a7edb6c06feadc90cbe55e6b34e8d..c2a48c41246b4e5c683d6e1efdf2beceaebab126 100755
--- a/src/test/java/depend/DeWorker2.java
+++ b/src/test/java/beforev14/depend/DeWorker2.java
@@ -1,4 +1,4 @@
-package depend;
+package beforev14.depend;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker2 implements IWorker, String>, ICallback, String> {
+class DeWorker2 implements IWorker, String>, ICallback, String> {
@Override
public String action(WorkResult result, Map allWrappers) {
diff --git a/src/test/java/depend/LambdaTest.java b/src/test/java/beforev14/depend/LambdaTest.java
similarity index 98%
rename from src/test/java/depend/LambdaTest.java
rename to src/test/java/beforev14/depend/LambdaTest.java
index 42c1bb253130dc2a1f7866dd036c55795e8b5516..93037a5b2d0b71e8981cd5158434179627871ef7 100644
--- a/src/test/java/depend/LambdaTest.java
+++ b/src/test/java/beforev14/depend/LambdaTest.java
@@ -1,4 +1,4 @@
-package depend;
+package beforev14.depend;
import java.util.Map;
@@ -10,7 +10,7 @@ import com.jd.platform.async.wrapper.WorkerWrapper;
* @author sjsdfg
* @since 2020/6/14
*/
-public class LambdaTest {
+class LambdaTest {
public static void main(String[] args) throws Exception {
WorkerWrapper, String> workerWrapper2 = new WorkerWrapper.Builder, String>()
.worker((WorkResult result, Map allWrappers) -> {
diff --git a/src/test/java/depend/Test.java b/src/test/java/beforev14/depend/Test.java
similarity index 98%
rename from src/test/java/depend/Test.java
rename to src/test/java/beforev14/depend/Test.java
index 971fdcf0ec86198629fd2c192fa544676d74eb19..a877047159605e62cd9e7a6682df23e22dec62b3 100644
--- a/src/test/java/depend/Test.java
+++ b/src/test/java/beforev14/depend/Test.java
@@ -1,4 +1,4 @@
-package depend;
+package beforev14.depend;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.worker.WorkResult;
@@ -12,7 +12,7 @@ import java.util.concurrent.ExecutionException;
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
-public class Test {
+class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeWorker w = new DeWorker();
diff --git a/src/test/java/dependnew/User.java b/src/test/java/beforev14/depend/User.java
similarity index 91%
rename from src/test/java/dependnew/User.java
rename to src/test/java/beforev14/depend/User.java
index bbef801120e9ede1c404d210a96407a0155bd17b..8481a49fc6869a345f9298cf863226e6dfe8c789 100644
--- a/src/test/java/dependnew/User.java
+++ b/src/test/java/beforev14/depend/User.java
@@ -1,11 +1,11 @@
-package dependnew;
+package beforev14.depend;
/**
* 一个包装类
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
-public class User {
+class User {
private String name;
public User(String name) {
diff --git a/src/test/java/depend/DeWorker.java b/src/test/java/beforev14/dependnew/DeWorker.java
similarity index 90%
rename from src/test/java/depend/DeWorker.java
rename to src/test/java/beforev14/dependnew/DeWorker.java
index e963816eaf3d29440f40f549af04823fadbf8ef7..135b6b3d64905d1c25456dbed2dad627c9202c39 100755
--- a/src/test/java/depend/DeWorker.java
+++ b/src/test/java/beforev14/dependnew/DeWorker.java
@@ -1,4 +1,4 @@
-package depend;
+package beforev14.dependnew;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker implements IWorker, ICallback {
+class DeWorker implements IWorker, ICallback {
@Override
public User action(String object, Map allWrappers) {
diff --git a/src/test/java/dependnew/DeWorker1.java b/src/test/java/beforev14/dependnew/DeWorker1.java
similarity index 92%
rename from src/test/java/dependnew/DeWorker1.java
rename to src/test/java/beforev14/dependnew/DeWorker1.java
index 0a56fdf319d50368c35057360b6fa6099d31654f..ba02503d484c80571bf7dcd37a72da51ba9e9902 100755
--- a/src/test/java/dependnew/DeWorker1.java
+++ b/src/test/java/beforev14/dependnew/DeWorker1.java
@@ -1,4 +1,4 @@
-package dependnew;
+package beforev14.dependnew;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker1 implements IWorker, ICallback {
+class DeWorker1 implements IWorker, ICallback {
@Override
public User action(String object, Map allWrappers) {
diff --git a/src/test/java/dependnew/DeWorker2.java b/src/test/java/beforev14/dependnew/DeWorker2.java
similarity index 92%
rename from src/test/java/dependnew/DeWorker2.java
rename to src/test/java/beforev14/dependnew/DeWorker2.java
index c4f61bccce0634c7174dc204db139693ae0cca0e..304df06d5f98cec378d13a16ab0f0371e12864ad 100755
--- a/src/test/java/dependnew/DeWorker2.java
+++ b/src/test/java/beforev14/dependnew/DeWorker2.java
@@ -1,4 +1,4 @@
-package dependnew;
+package beforev14.dependnew;
import com.jd.platform.async.callback.ICallback;
@@ -11,7 +11,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class DeWorker2 implements IWorker, ICallback {
+class DeWorker2 implements IWorker, ICallback {
@Override
public String action(User object, Map allWrappers) {
diff --git a/src/test/java/dependnew/Test.java b/src/test/java/beforev14/dependnew/Test.java
similarity index 97%
rename from src/test/java/dependnew/Test.java
rename to src/test/java/beforev14/dependnew/Test.java
index 731e42b787054e7930c99875292fb37067c1a631..657bb7f261642711f9a417ea5afb73f150298cf5 100644
--- a/src/test/java/dependnew/Test.java
+++ b/src/test/java/beforev14/dependnew/Test.java
@@ -1,4 +1,4 @@
-package dependnew;
+package beforev14.dependnew;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.wrapper.WorkerWrapper;
@@ -11,7 +11,7 @@ import java.util.concurrent.ExecutionException;
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
-public class Test {
+class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeWorker w = new DeWorker();
diff --git a/src/test/java/depend/User.java b/src/test/java/beforev14/dependnew/User.java
similarity index 91%
rename from src/test/java/depend/User.java
rename to src/test/java/beforev14/dependnew/User.java
index dfd6277e4180b3c53f0a051e96e0e4cf2b1aa330..e133e3d517775a9e48f6ef95833ed789958f4b49 100644
--- a/src/test/java/depend/User.java
+++ b/src/test/java/beforev14/dependnew/User.java
@@ -1,11 +1,11 @@
-package depend;
+package beforev14.dependnew;
/**
* 一个包装类
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
-public class User {
+class User {
private String name;
public User(String name) {
diff --git a/src/test/java/parallel/ParTimeoutWorker.java b/src/test/java/beforev14/parallel/ParTimeoutWorker.java
similarity index 92%
rename from src/test/java/parallel/ParTimeoutWorker.java
rename to src/test/java/beforev14/parallel/ParTimeoutWorker.java
index 7f7b9aa869e96c3847cfdbdc9ed1a8d7fe45620f..f0a2f3abc10233fab3bf77e763ae8bd38d80262e 100755
--- a/src/test/java/parallel/ParTimeoutWorker.java
+++ b/src/test/java/beforev14/parallel/ParTimeoutWorker.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParTimeoutWorker implements IWorker, ICallback {
+class ParTimeoutWorker implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/parallel/ParWorker.java b/src/test/java/beforev14/parallel/ParWorker.java
similarity index 92%
rename from src/test/java/parallel/ParWorker.java
rename to src/test/java/beforev14/parallel/ParWorker.java
index b174c511ead97aa386c630dc8c3333f155d00d93..b28b7e690345584b98202877657ebf5dbfe7edf1 100755
--- a/src/test/java/parallel/ParWorker.java
+++ b/src/test/java/beforev14/parallel/ParWorker.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParWorker implements IWorker, ICallback {
+class ParWorker implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/parallel/ParWorker1.java b/src/test/java/beforev14/parallel/ParWorker1.java
similarity index 93%
rename from src/test/java/parallel/ParWorker1.java
rename to src/test/java/beforev14/parallel/ParWorker1.java
index 7f1308194aca5b5eb68a17e84430b98980b93c19..414851cd627f73beaa10b958cf5dffb360056c2c 100755
--- a/src/test/java/parallel/ParWorker1.java
+++ b/src/test/java/beforev14/parallel/ParWorker1.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParWorker1 implements IWorker, ICallback {
+class ParWorker1 implements IWorker, ICallback {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
diff --git a/src/test/java/parallel/ParWorker2.java b/src/test/java/beforev14/parallel/ParWorker2.java
similarity index 93%
rename from src/test/java/parallel/ParWorker2.java
rename to src/test/java/beforev14/parallel/ParWorker2.java
index 0e89e4556e98c5552aad51d925d520b5ea66248b..87cc0aca05c860753a33e8610bf6e5f1b2295834 100755
--- a/src/test/java/parallel/ParWorker2.java
+++ b/src/test/java/beforev14/parallel/ParWorker2.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParWorker2 implements IWorker, ICallback {
+class ParWorker2 implements IWorker, ICallback {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
diff --git a/src/test/java/parallel/ParWorker3.java b/src/test/java/beforev14/parallel/ParWorker3.java
similarity index 93%
rename from src/test/java/parallel/ParWorker3.java
rename to src/test/java/beforev14/parallel/ParWorker3.java
index 4284b0f3b98cde7c7343ac52386ca9388d824669..82b62994e81b190427d1a14fbdb14bb503b88284 100755
--- a/src/test/java/parallel/ParWorker3.java
+++ b/src/test/java/beforev14/parallel/ParWorker3.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParWorker3 implements IWorker, ICallback {
+class ParWorker3 implements IWorker, ICallback {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
diff --git a/src/test/java/parallel/ParWorker4.java b/src/test/java/beforev14/parallel/ParWorker4.java
similarity index 92%
rename from src/test/java/parallel/ParWorker4.java
rename to src/test/java/beforev14/parallel/ParWorker4.java
index 723c5f2b511d29dd80d301f775bd59d79a7a624c..7f9c26752988df32aa4ae4a062d1d1defd97c41d 100755
--- a/src/test/java/parallel/ParWorker4.java
+++ b/src/test/java/beforev14/parallel/ParWorker4.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class ParWorker4 implements IWorker, ICallback {
+class ParWorker4 implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/parallel/TestPar.java b/src/test/java/beforev14/parallel/TestPar.java
similarity index 99%
rename from src/test/java/parallel/TestPar.java
rename to src/test/java/beforev14/parallel/TestPar.java
index e13ccc9069577589b6a7b023ea48fff996c25fca..c031c5384de8f8b97be6ebc5f8e07489b92b2998 100755
--- a/src/test/java/parallel/TestPar.java
+++ b/src/test/java/beforev14/parallel/TestPar.java
@@ -1,4 +1,4 @@
-package parallel;
+package beforev14.parallel;
import com.jd.platform.async.executor.Async;
@@ -14,7 +14,7 @@ import java.util.concurrent.Executors;
* @author wuweifeng wrote on 2019-11-20.
*/
@SuppressWarnings("ALL")
-public class TestPar {
+class TestPar {
public static void main(String[] args) throws Exception {
// testNormal();
diff --git a/src/test/java/seq/SeqTimeoutWorker.java b/src/test/java/beforev14/seq/SeqTimeoutWorker.java
similarity index 92%
rename from src/test/java/seq/SeqTimeoutWorker.java
rename to src/test/java/beforev14/seq/SeqTimeoutWorker.java
index 0de5e0abdfd5274c542584194fceb279b9c99bb8..80a5c7b362bb00c402c3b2644eeabed9503d70f7 100755
--- a/src/test/java/seq/SeqTimeoutWorker.java
+++ b/src/test/java/beforev14/seq/SeqTimeoutWorker.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class SeqTimeoutWorker implements IWorker, ICallback {
+class SeqTimeoutWorker implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/seq/SeqWorker.java b/src/test/java/beforev14/seq/SeqWorker.java
similarity index 93%
rename from src/test/java/seq/SeqWorker.java
rename to src/test/java/beforev14/seq/SeqWorker.java
index 18c345703c88819513710970bc4c90d819771b77..c2bc392028c79bbea7ae4ee9424371d6d7bba53e 100755
--- a/src/test/java/seq/SeqWorker.java
+++ b/src/test/java/beforev14/seq/SeqWorker.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class SeqWorker implements IWorker, ICallback {
+class SeqWorker implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/seq/SeqWorker1.java b/src/test/java/beforev14/seq/SeqWorker1.java
similarity index 93%
rename from src/test/java/seq/SeqWorker1.java
rename to src/test/java/beforev14/seq/SeqWorker1.java
index ae445c6a925665d10118a49e83ced6b6bb661a42..b3ded50059ef40a0d4d7f0d43952fa72c12bda44 100755
--- a/src/test/java/seq/SeqWorker1.java
+++ b/src/test/java/beforev14/seq/SeqWorker1.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class SeqWorker1 implements IWorker, ICallback {
+class SeqWorker1 implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/seq/SeqWorker2.java b/src/test/java/beforev14/seq/SeqWorker2.java
similarity index 93%
rename from src/test/java/seq/SeqWorker2.java
rename to src/test/java/beforev14/seq/SeqWorker2.java
index 34853eef708f17a9a6a3eba389a1dcbbd5f13bbf..458db800fe242acf21a8e4c8b715c4712e4d65bc 100755
--- a/src/test/java/seq/SeqWorker2.java
+++ b/src/test/java/beforev14/seq/SeqWorker2.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.callback.ICallback;
@@ -12,7 +12,7 @@ import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
-public class SeqWorker2 implements IWorker, ICallback {
+class SeqWorker2 implements IWorker, ICallback {
@Override
public String action(String object, Map allWrappers) {
diff --git a/src/test/java/seq/TestSequential.java b/src/test/java/beforev14/seq/TestSequential.java
similarity index 97%
rename from src/test/java/seq/TestSequential.java
rename to src/test/java/beforev14/seq/TestSequential.java
index d4e1c6734034057a02b88d9ccfb47e5bbda4f10b..586c0c417d6979214dc87333ad1667058002a42a 100755
--- a/src/test/java/seq/TestSequential.java
+++ b/src/test/java/beforev14/seq/TestSequential.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.executor.Async;
@@ -11,7 +11,7 @@ import java.util.concurrent.ExecutionException;
* 串行测试
* @author wuweifeng wrote on 2019-11-20.
*/
-public class TestSequential {
+class TestSequential {
public static void main(String[] args) throws InterruptedException, ExecutionException {
diff --git a/src/test/java/seq/TestSequentialTimeout.java b/src/test/java/beforev14/seq/TestSequentialTimeout.java
similarity index 97%
rename from src/test/java/seq/TestSequentialTimeout.java
rename to src/test/java/beforev14/seq/TestSequentialTimeout.java
index f2b02dec5ca95a8507ab34f2dc6c0ec0087e07bb..ccd2423aaf4e063c291ca59358d1e6ed13fd0bd1 100755
--- a/src/test/java/seq/TestSequentialTimeout.java
+++ b/src/test/java/beforev14/seq/TestSequentialTimeout.java
@@ -1,4 +1,4 @@
-package seq;
+package beforev14.seq;
import com.jd.platform.async.executor.Async;
@@ -12,7 +12,7 @@ import java.util.concurrent.ExecutionException;
* @author wuweifeng wrote on 2019-11-20.
*/
@SuppressWarnings("Duplicates")
-public class TestSequentialTimeout {
+class TestSequentialTimeout {
public static void main(String[] args) throws InterruptedException, ExecutionException {
testFirstTimeout();
}
diff --git a/src/test/java/v15/dependnew/Test.java b/src/test/java/v15/dependnew/Test.java
new file mode 100644
index 0000000000000000000000000000000000000000..d0b149be89c498817eca1a9ff36aa408cb6a95a5
--- /dev/null
+++ b/src/test/java/v15/dependnew/Test.java
@@ -0,0 +1,272 @@
+package v15.dependnew;
+
+import com.jd.platform.async.callback.IWorker;
+import com.jd.platform.async.executor.Async;
+import com.jd.platform.async.executor.timer.SystemClock;
+import com.jd.platform.async.worker.ResultState;
+import com.jd.platform.async.wrapper.actionstrategy.DependenceAction;
+import com.jd.platform.async.wrapper.actionstrategy.DependenceStrategy;
+import com.jd.platform.async.wrapper.WorkerWrapper;
+import com.jd.platform.async.wrapper.WorkerWrapperBuilder;
+import com.jd.platform.async.wrapper.skipstrategy.SkipStrategy;
+
+import java.io.PrintStream;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+
+/**
+ * @author create by TcSnZh on 2021/5/2-下午9:25
+ */
+class Test {
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+// ExecutorService pool = Executors.newFixedThreadPool(3);
+ ExecutorService pool = Async.getCommonPool();
+ try {
+ // 先随便找个任务让线程池跑一把,把线程用一下,后面的测试效果会明显一点
+ testNew2(pool);
+ System.out.println("\n\n\n");
+
+ testNew1(pool);
+ System.out.println("\n\n\n");
+
+ testNew2(pool);
+ System.out.println("\n\n\n");
+
+ testThreadPolling_Speed(pool);
+ System.out.println("\n\n\n");
+
+ testThreadPolling_V14Bug();
+ System.out.println("\n\n\n");
+
+ testTimeOut(pool);
+
+ } finally {
+ //Async.shutDownCommonPool();
+ pool.shutdown();
+ }
+ }
+
+ /**
+ * 简简单单的测试一下新的编排方式
+ *
+ * .A ===> B1 ===> C1 ----> D1
+ * . ||> B2 | || \--> D2
+ * . ||> B3 | ``========v
+ * . ||> B4 |---> C2 ====> E1
+ * . \--> E2
+ */
+ private static void testNew1(ExecutorService pool) throws ExecutionException, InterruptedException {
+ System.out.println("测试新的builder Api");
+ WorkerWrapper a = testBuilder("A")
+ .build();
+ WorkerWrapper b1 = testBuilder("B1").depends(a).build();
+ WorkerWrapper b2 = testBuilder("B2").depends(a).build();
+ WorkerWrapper b3 = testBuilder("B3").depends(a).build();
+ WorkerWrapper b4 = testBuilder("B4").depends(a).build();
+ WorkerWrapper c1 = testBuilder("C1")
+ .depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, b1, b2, b3, b4)
+ .nextOf(testBuilder("D1").build(),
+ testBuilder("D2").build())
+ .build();
+ WorkerWrapper c2 = testBuilder("C2")
+ .depends(DependenceStrategy.ALL_DEPENDENCIES_ANY_SUCCESS, b1, b2, b3, b4)
+ .nextOf(testBuilder("E1").depends(DependenceStrategy.ALL_DEPENDENCIES_ALL_SUCCESS, c1).build(),
+ testBuilder("E2").build())
+ .build();
+ Async.beginWork(2000, pool, a);
+ logAll();
+ }
+
+ /**
+ * 测试船新的编排方式的花里胡哨的玩法。
+ * A => {B1 ~ B10} >>> C
+ *
+ * C仅需要b1-b10中任意3个Worker工作完成即可启动。
+ * (不过C不一定一定在3个完成后启动,具体还要看线程池属性与线程抢占的顺序,线程池线程数小一点的话更容易让C早日执行)
+ *
+ */
+ private static void testNew2(ExecutorService pool) throws ExecutionException, InterruptedException {
+ System.out.println("测试10个B中成功三个才能执行C");
+ WorkerWrapper a = testBuilder("A").build();
+ ArrayList bList = new ArrayList<>();
+ for (int i = 1; i <= 10; i++) {
+ bList.add(testBuilder("B" + i).depends(a).build());
+ }
+ WorkerWrapper c = testBuilder("C")
+ .setDepend().strategy((dependWrappers, thisWrapper, fromWrapper) -> {
+ if (dependWrappers.stream()
+ .filter(w -> w.getWorkResult().getResultState() == ResultState.SUCCESS).count() >= 3) {
+ return DependenceAction.START_WORK.emptyProperty();
+ } else {
+ return DependenceAction.TAKE_REST.emptyProperty();
+ }
+ }).wrapper(bList).end().build();
+ Async.beginWork(2000, pool, a);
+ logAll();
+ }
+
+ /**
+ * 测试线程轮询的效率
+ */
+ private static void testThreadPolling_Speed(ExecutorService pool) throws InterruptedException {
+ int MAX = 1000;
+ Collection> wrappers = new ArrayList<>(MAX);
+ AtomicLong a = new AtomicLong(0);
+ for (int i = 0; i < MAX; i++) {
+ WorkerWrapperBuilder builder = WorkerWrapper.builder()
+ .id(String.valueOf(i))
+ // 拷贝数组测试,每次在数组最后加一个递增的值+1的数
+ .worker((object, allWrappers) -> {
+ for (int j = 0; j < 100000; j++) {
+ a.incrementAndGet();
+ }
+ return null;
+ })
+ .setSkipStrategy(SkipStrategy.NOT_SKIP);
+ wrappers.add(builder.build());
+ }
+ long t1 = SystemClock.now();
+ PrintStream out = Async.beginWork(10000, pool, wrappers) ? System.out : System.err;
+ out.println("无依赖任务的测试:\n1000个wrapper对AtomicLong分别自增100000次,耗时 : " + (SystemClock.now() - t1) + "ms a=" + a.get());
+ WorkerWrapper.builder();
+ }
+
+ /**
+ * 测试旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况:
+ *
+ * A(5ms)--B1(10ms) ---|--> C1(5ms)
+ * . \ | (B1、B2全部完成可执行C1、C2)
+ * . ---> B2(20ms) --|--> C2(5ms)
+ */
+ private static void testThreadPolling_V14Bug() throws ExecutionException, InterruptedException {
+ System.out.println("以下代码可复制到v1.4,复现线程耗尽bug : ");
+ BiFunction> sleepWork = (id, time) -> (IWorker) (object, allWrappers) -> {
+ try {
+ System.out.println("wrapper.id=" + id + " before sleep");
+ Thread.sleep(time);
+ System.out.println("wrapper.id=" + id + " after sleep");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return null;
+ };
+ WorkerWrapper a = new WorkerWrapper.Builder()
+ .id("A")
+ .worker(sleepWork.apply("A", 5L))
+ .build();
+ WorkerWrapper.Builder cBuilder = new WorkerWrapper.Builder()
+ .depend(new WorkerWrapper.Builder()
+ .id("B1")
+ .worker(sleepWork.apply("B1", 10L))
+ .depend(a)
+ .build())
+ .depend(new WorkerWrapper.Builder()
+ .id("B2")
+ .worker(sleepWork.apply("B2", 10L))
+ .depend(a)
+ .build());
+ cBuilder.id("C1").worker(sleepWork.apply("C1", 5L)).build();
+ cBuilder.id("C2").worker(sleepWork.apply("C2", 5L)).build();
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ try {
+ Async.beginWork(100, pool, a);
+ } finally {
+ pool.shutdown();
+ }
+ System.out.println(a.getNextWrappers());
+ }
+
+ /**
+ * 超时测试
+ */
+ private static void testTimeOut(ExecutorService pool) throws ExecutionException, InterruptedException {
+ System.out.println("超时测试:");
+ System.err.println("如果抛出" + InterruptedException.class.getName() + "异常,则打断线程成功");
+ WorkerWrapper a = testBuilder("A")
+ // B1、B2不超时
+ .nextOf(testBuilder("B1", 100).timeout(150, TimeUnit.MILLISECONDS).build())
+ .nextOf(testBuilder("B2", 100).build())
+ // B3单wrapper超时
+ .nextOf(testBuilder("B3", 200).timeout(150, TimeUnit.MILLISECONDS).build())
+ // B4、B5总任务超时
+ .nextOf(testBuilder("B4", 250).build())
+ .nextOf(testBuilder("B5", 250)
+ .setTimeOut().enableTimeOut(true).setTime(300, TimeUnit.MILLISECONDS).allowInterrupt(false).end()
+ .build())
+ // 测试打断B6线程
+ .nextOf(testBuilder("B6", 250).timeout(true, 150, TimeUnit.MILLISECONDS, true).build())
+ .build();
+ long t1 = SystemClock.now();
+ boolean success = Async.beginWork(200, pool, a);
+ System.out.println("time=" + (SystemClock.now() - t1) + ", success=" + success);
+ a.getNextWrappers().forEach(System.out::println);
+ logAll();
+ }
+
+ // ========== util method ==========
+
+ static final AtomicInteger count = new AtomicInteger(1);
+ static final AtomicReference> logger = new AtomicReference<>(new ConcurrentHashMap<>());
+
+ static WorkerWrapperBuilder testBuilder(String id) {
+ return testBuilder(id, -1);
+ }
+
+ static WorkerWrapperBuilder