From 13be645314915f66de9b840151146346c7ca3594 Mon Sep 17 00:00:00 2001
From: TcSnZh <1293969878@qq.com>
Date: Thu, 13 May 2021 00:24:44 +0800
Subject: [PATCH] =?UTF-8?q?v1.5.2=20=E9=87=8D=E6=9E=84=E4=BA=86=E4=B9=8B?=
=?UTF-8?q?=E5=89=8D=E7=9A=84=E4=B8=8D=E8=89=AF=E4=BB=A3=E7=A0=81=EF=BC=8C?=
=?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BA=86QuickStart=E6=96=87=E6=A1=A3?=
=?UTF-8?q?=E3=80=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
QuickStart.md | 1216 ++++++++++++++---
.../jd/platform/async/callback/IWorker.java | 2 +-
.../async/exception/SkippedException.java | 1 +
.../com/jd/platform/async/executor/Async.java | 27 +-
.../async/executor/PollingCenter.java | 87 ++
.../async/util/collection/WheelIterator.java | 23 +
.../async/util/timer/AbstractWheelTimer.java | 17 +
.../async/util/timer/HashedWheelTimer.java | 685 ++++++++++
.../jd/platform/async/util/timer/Timeout.java | 36 +
.../jd/platform/async/util/timer/Timer.java | 41 +
.../platform/async/util/timer/TimerTask.java | 10 +
.../jd/platform/async/worker/WorkResult.java | 27 +-
.../async/wrapper/StableWorkerWrapper.java | 16 +-
.../wrapper/StableWorkerWrapperBuilder.java | 68 +-
.../platform/async/wrapper/WorkerWrapper.java | 881 ++++++------
.../async/wrapper/WorkerWrapperBuilder.java | 27 +-
.../async/wrapper/WorkerWrapperGroup.java | 155 +++
.../async/wrapper/WrapperEndingInspector.java | 486 -------
.../wrapper/skipstrategy/SkipStrategy.java | 183 ---
.../depend}/DependMustStrategyMapper.java | 2 +-
.../depend}/DependWrapperActionStrategy.java | 2 +-
.../depend}/DependWrapperStrategyMapper.java | 2 +-
.../depend}/DependenceAction.java | 13 +-
.../depend}/DependenceStrategy.java | 28 +-
.../wrapper/strategy/skip/SkipStrategy.java | 53 +
src/test/java/beforev14/depend/DeWorker.java | 2 +-
src/test/java/beforev14/depend/DeWorker1.java | 2 +-
src/test/java/beforev14/depend/DeWorker2.java | 2 +-
.../java/beforev14/depend/LambdaTest.java | 6 +-
.../java/beforev14/dependnew/DeWorker.java | 2 +-
.../java/beforev14/dependnew/DeWorker1.java | 2 +-
.../java/beforev14/dependnew/DeWorker2.java | 2 +-
.../beforev14/parallel/ParTimeoutWorker.java | 2 +-
.../java/beforev14/parallel/ParWorker.java | 2 +-
.../java/beforev14/parallel/ParWorker1.java | 2 +-
.../java/beforev14/parallel/ParWorker2.java | 2 +-
.../java/beforev14/parallel/ParWorker3.java | 2 +-
.../java/beforev14/parallel/ParWorker4.java | 2 +-
.../java/beforev14/seq/SeqTimeoutWorker.java | 2 +-
src/test/java/beforev14/seq/SeqWorker.java | 2 +-
src/test/java/beforev14/seq/SeqWorker1.java | 2 +-
src/test/java/beforev14/seq/SeqWorker2.java | 2 +-
src/test/java/v15/cases/Case1.java | 62 +
src/test/java/v15/cases/Case2.java | 53 +
src/test/java/v15/cases/Case3.java | 61 +
src/test/java/v15/cases/Case4.java | 69 +
src/test/java/v15/cases/Case5.java | 75 +
src/test/java/v15/cases/Case6.java | 61 +
src/test/java/v15/cases/Case7.java | 61 +
src/test/java/v15/cases/Case8.java | 73 +
.../v15/{dependnew => wrappertest}/Test.java | 14 +-
51 files changed, 3302 insertions(+), 1353 deletions(-)
create mode 100644 src/main/java/com/jd/platform/async/executor/PollingCenter.java
create mode 100644 src/main/java/com/jd/platform/async/util/collection/WheelIterator.java
create mode 100644 src/main/java/com/jd/platform/async/util/timer/AbstractWheelTimer.java
create mode 100644 src/main/java/com/jd/platform/async/util/timer/HashedWheelTimer.java
create mode 100644 src/main/java/com/jd/platform/async/util/timer/Timeout.java
create mode 100644 src/main/java/com/jd/platform/async/util/timer/Timer.java
create mode 100644 src/main/java/com/jd/platform/async/util/timer/TimerTask.java
create mode 100644 src/main/java/com/jd/platform/async/wrapper/WorkerWrapperGroup.java
delete mode 100644 src/main/java/com/jd/platform/async/wrapper/WrapperEndingInspector.java
delete mode 100644 src/main/java/com/jd/platform/async/wrapper/skipstrategy/SkipStrategy.java
rename src/main/java/com/jd/platform/async/wrapper/{actionstrategy => strategy/depend}/DependMustStrategyMapper.java (98%)
rename src/main/java/com/jd/platform/async/wrapper/{actionstrategy => strategy/depend}/DependWrapperActionStrategy.java (98%)
rename src/main/java/com/jd/platform/async/wrapper/{actionstrategy => strategy/depend}/DependWrapperStrategyMapper.java (97%)
rename src/main/java/com/jd/platform/async/wrapper/{actionstrategy => strategy/depend}/DependenceAction.java (80%)
rename src/main/java/com/jd/platform/async/wrapper/{actionstrategy => strategy/depend}/DependenceStrategy.java (93%)
create mode 100644 src/main/java/com/jd/platform/async/wrapper/strategy/skip/SkipStrategy.java
create mode 100644 src/test/java/v15/cases/Case1.java
create mode 100644 src/test/java/v15/cases/Case2.java
create mode 100644 src/test/java/v15/cases/Case3.java
create mode 100644 src/test/java/v15/cases/Case4.java
create mode 100644 src/test/java/v15/cases/Case5.java
create mode 100644 src/test/java/v15/cases/Case6.java
create mode 100644 src/test/java/v15/cases/Case7.java
create mode 100644 src/test/java/v15/cases/Case8.java
rename src/test/java/v15/{dependnew => wrappertest}/Test.java (96%)
diff --git a/QuickStart.md b/QuickStart.md
index f191d47..125a6d9 100644
--- a/QuickStart.md
+++ b/QuickStart.md
@@ -1,39 +1,43 @@
如果只是需要用这个框架,请往下看即可。如果需要深入了解这个框架是如何一步一步实现的,从接到需求,到每一步的思考,每个类为什么这么设计,为什么有这些方法,也就是如何从0到1开发出这个框架,作者在[csdn开了专栏](https://blog.csdn.net/tianyaleixiaowu/category_9637010.html)专门讲中间件如何从0开发,包括并不限于这个小框架。京东内部同事可在cf上搜索erp也能看到。
+# 安装教程
+
+代码不多,直接拷贝包过去即可。
+
京东同事通过引用如下maven来使用。
-```
-
+ * 该参数不会为null。 + * 因为在{@link WorkerWrapper#work(ExecutorService, long, Map, WrapperEndingInspector)}方法中传入的的第一批无依赖的Wrapper, + * 不会被该策略器所判断,而是不论如何直接执行。 + *
+ * @return 返回枚举值内部类,WorkerWrapper将会根据其值来决定自己如何响应这次调用。 {@link DependenceAction.WithProperty} + */ + DependenceAction.WithProperty judgeAction(Set
+ * 默认为true
+ *
+ * @param enableElseDisable 是则true
+ */
+ SetTimeOut
+ *
+ * ===========================================================================================
+ *
+ * 在v1.4及以前的版本,存在如下问题:
+ * >
+ * 在使用线程数量较少的线程池进行beginWork时,调用WorkerWrapper#beginNext方法时,
+ * 会因为本线程等待下游Wrapper执行完成而存在线程耗尽bug。线程池会死翘翘的僵住、动弹不得。
+ * >
+ * 例如仅有2个线程的线程池,执行以下任务:
+ * {@code
+ *
+ * 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况,在test/v15.wrappertest中示例testThreadPolling_V14Bug说明了这个bug
+ * 线程数:2
+ * A(5ms)--B1(10ms) ---|--> C1(5ms)
+ * . \ | (B1、B2全部完成可执行C1、C2)
+ * . ---> B2(20ms) --|--> C2(5ms)
+ *
+ * }
+ * 线程1执行了A,然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成。
+ * 线程2执行了B1或B2中的一个,也在allOf方法等待C1、C2完成。
+ * 结果没有线程执行C和B2了,导致超时而死,并且这个线程池线程有可能被耗尽。
+ * >
+ *
+ * @author create by TcSnZh on 2021/5/9-下午9:22
+ */
+public class PollingCenter {
+
+ // ========== singleton instance ==========
+
+ private static final PollingCenter instance = new PollingCenter();
+
+ public static PollingCenter getInstance() {
+ return instance;
+ }
+
+ // ========== fields and methods ==========
+
+ public void checkGroup(WorkerWrapperGroup.CheckFinishTask task) {
+ checkGroup(task, 0);
+ }
+
+ public void checkGroup(WorkerWrapperGroup.CheckFinishTask task, long daley) {
+ timer.newTimeout(task, daley, TimeUnit.MILLISECONDS);
+ }
+
+ private final Timer timer = new Timer() {
+ private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(
+ r -> {
+ Thread thread = new Thread(r, "asyncTool-pollingThread");
+ thread.setDaemon(true);
+ return thread;
+ },
+ 4,
+ TimeUnit.MILLISECONDS,
+ 1024);
+
+ @Override
+ public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
+ return hashedWheelTimer.newTimeout(task, delay, unit);
+ }
+
+ @Override
+ public Set extends Timeout> stop() {
+ return hashedWheelTimer.stop();
+ }
+
+ @Override
+ public String toString() {
+ return "PollingCenter.timer";
+ }
+ };
+}
diff --git a/src/main/java/com/jd/platform/async/util/collection/WheelIterator.java b/src/main/java/com/jd/platform/async/util/collection/WheelIterator.java
new file mode 100644
index 0000000..97ea47d
--- /dev/null
+++ b/src/main/java/com/jd/platform/async/util/collection/WheelIterator.java
@@ -0,0 +1,23 @@
+package com.jd.platform.async.util.collection;
+
+import java.util.Iterator;
+
+/**
+ * 一个反复循环的迭代器
+ *
+ * @author create by TcSnZh on 2021/5/9-下午6:25
+ */
+public interface WheelIterator
- * 1-finish, 2-error, 3-working
+ * 是否允许被打断
*/
- protected final AtomicInteger state = new AtomicInteger(0);
+ protected final boolean allowInterrupt;
/**
- * 也是个钩子变量,用来存临时的结果
+ * 是否启动超时检查
*/
- protected volatile WorkResult
+ * {@link State}此枚举类枚举了state值所代表的状态枚举。
+ */
+ protected final AtomicInteger state = new AtomicInteger(BUILDING.id);
+ /**
+ * 该值将在{@link IWorker#action(Object, Map)}进行时设为当前线程,在任务开始前或结束后都为null。
+ */
+ protected final AtomicReference
+ * 当没有超时,若该wrapper已经结束但没有超时,返回 0L 。
+ *
+ * 如果该wrapper单独了设置超时策略并正在运行,返回距离超时策略限时相差的毫秒值。
+ * 例如设置10ms超时,此时已经开始3ms,则返回 7L。
+ * 如果此差值<1,则返回 1L。
+ *
+ * 如果已经超时,则返回 -1L。
+ *
- * 在v1.4及以前的版本,存在如下问题:
- * >
- * 在使用线程数量较少的线程池进行beginWork时,调用WorkerWrapper#beginNext方法时,
- * 会因为本线程等待下游Wrapper执行完成而存在线程耗尽bug。线程池会死翘翘的僵住、动弹不得。
- * >
- * 例如仅有2个线程的线程池,执行以下任务:
- * {@code
- *
- * 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况,在test/v15.dependnew中示例testThreadPolling_V14Bug说明了这个bug
- * 线程数:2
- * A(5ms)--B1(10ms) ---|--> C1(5ms)
- * . \ | (B1、B2全部完成可执行C1、C2)
- * . ---> B2(20ms) --|--> C2(5ms)
- *
- * }
- * 线程1执行了A,然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成。
- * 线程2执行了B1或B2中的一个,也在allOf方法等待C1、C2完成。
- * 结果没有线程执行C和B2了,导致超时而死,并且这个线程池线程有可能被耗尽。
- * >
- * v1.5的解决方案是,放弃使工作线程遭致阻塞的{@link java.util.concurrent.CompletableFuture},
- * 而是让工作线程在工作前注册到本“完成检查器”{@link WrapperEndingInspector},然后交由轮询中心{@link PollingCenter}进行检查是否完成。
- *
- * 本类的工作原理:
- * .
- * 原理:
- * (1)首先在Async代码中,将主动运行的wrapper都保存到一个inspector{@link #addWrapper(WorkerWrapper)},
- * (2)主动运行的wrapper于FINISH/ERROR时,先异步submit所有下游wrapper,在其执行时将自身(下游wrapper)保存到inspector,
- * (3)然后在异步submit完所有下游wrapper后,将调用{@link #setWrapperEndWithTryPolling(WorkerWrapper)}方法,
- * . 设置自己的{@link #wrappers}为true,并呼叫轮询{@link PollingCenter#tryPolling()}。
- * (4)在下游wrapper中,经过策略器判断后,
- * . 若是不需要运行,则把本wrapper计数-1{@link WrapperNode#count},若是计数<1则将{@link WrapperNode}移出{@link #wrappers}。
- * . 若是需要运行,则运行之,然后跳转到 (2) 的情节。如此递归,执行链路上所有需要执行的wrapper最后都会存在于{@link #wrappers}中。
- * .
- * 因此,若是存在任一其{@link WrapperNode#called}为false的wrapper,则表示这条链路还没有调用完。
- * 若是在{@link #wrappers}中所有的{@link WrapperNode#called}为true时,即可判断出链路执行完毕了。
- *