diff --git a/QuickStart.md b/QuickStart.md index 9f997ca56cae9ffe2fe19a67b1d2709bbed8f69a..cd2bd3449a564fb44aa9d10f034cccb6303313dc 100644 --- a/QuickStart.md +++ b/QuickStart.md @@ -1,4 +1,4 @@ -如果只是需要用这个框架,请往下看即可。如果需要深入了解这个框架是如何一步一步实现的,从接到需求,到每一步的思考,每个类为什么这么设计,为什么有这些方法,也就是如何从0到1开发出这个框架,作者在[csdn开了专栏](https://blog.csdn.net/tianyaleixiaowu/category_9637010.html)专门讲中间件如何从0开发,包括并不限于这个小框架。京东内部同事可在cf上搜索erp也能看到。 +如果只是需要用这个框架,请往下看即可。如果需要深入了解这个框架是如何一步一步实现的,从接到需求,到每一步的思考,每个类为什么这么设计,为什么有这些方法,也就是如何从0到1开发出这个框架,作者在[csdn开了专栏](https://blog.csdn.net/tianyaleixiaowu/category_9637010.html) 专门讲中间件如何从0开发,包括并不限于这个小框架。京东内部同事可在cf上搜索erp也能看到。 # 安装教程 diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index 56f83b603954ee8b13c583a5b128fdc0cd9c1d86..42785b5c55bff07482e94ae202f9b9fd5cf0d599 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -362,6 +362,9 @@ public abstract class WorkerWrapper { wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper); switch (judge.getDependenceAction()) { case TAKE_REST: + //FIXME 等待200毫秒重新投入线程池,主要为了调起最后一个任务 + Thread.sleep(200L); + executorService.submit(() -> this.work(executorService, fromWrapper, remainTime-(SystemClock.now()-now), group)); return; case FAST_FAIL: if (setState(state, STARTED, ERROR)) { diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java index a075369635bf3e67fbe06958b6338338c4056b4f..e45aedc86692a0358977f849a894548d64075fa1 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java @@ -1,6 +1,5 @@ package com.jd.platform.async.wrapper.strategy.depend; -import com.jd.platform.async.exception.SkippedException; import com.jd.platform.async.worker.ResultState; import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.wrapper.WorkerWrapper; @@ -80,6 +79,14 @@ public interface DependenceStrategy { * 被依赖的所有Wrapper都必须成功才能开始工作。 * 如果其中任一Wrapper还没有执行且不存在失败,则休息。 * 如果其中任一Wrapper失败则立即失败。 + * + * FIXME + * 这里有个问题, + * 假设任务A依赖B、C + * + * B执行时间比较长,A-B的线程和A-C的线程都检测到B的res==null(DEFAULT), + * 那么线程A就真的去休眠(TAKE_REST)而没有发起, + * 导致整个任务长时间无法结束 */ DependenceStrategy ALL_DEPENDENCIES_ALL_SUCCESS = new DependenceStrategy() { @Override diff --git a/asyncTool-core/src/test/java/beforev14/parallel/ParWorker5.java b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker5.java new file mode 100755 index 0000000000000000000000000000000000000000..1a2670ef3730176cd6b88c28ed159cb48058a2bd --- /dev/null +++ b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker5.java @@ -0,0 +1,54 @@ +package beforev14.parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker5 implements IWorker, ICallback { + + private long sleepTime = 1000; + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + + @Override + public String action(String object, Map> allWrappers) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 3"; + } + + @Override + public String defaultValue() { + return "worker3--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" + Thread.currentThread().getName()); + } else { + System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" + Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool-core/src/test/java/beforev14/parallel/ParWorker6.java b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker6.java new file mode 100755 index 0000000000000000000000000000000000000000..8da8e2fc7aec3de16310ce020dca4f9e957b106e --- /dev/null +++ b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker6.java @@ -0,0 +1,55 @@ +package beforev14.parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker6 implements IWorker, ICallback { + + private long sleepTime = 1000; + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + + @Override + public String action(String object, Map> allWrappers) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 3"; + } + + + @Override + public String defaultValue() { + return "worker3--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" + Thread.currentThread().getName()); + } else { + System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" + Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool-core/src/test/java/beforev14/parallel/ParWorker7.java b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker7.java new file mode 100755 index 0000000000000000000000000000000000000000..1d3810d263cefb7b1db2f2ffc167ac8d692b7226 --- /dev/null +++ b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker7.java @@ -0,0 +1,55 @@ +package beforev14.parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker7 implements IWorker, ICallback { + + private long sleepTime = 1000; + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + + @Override + public String action(String object, Map> allWrappers) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 3"; + } + + + @Override + public String defaultValue() { + return "worker3--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" + Thread.currentThread().getName()); + } else { + System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" + Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index c0a1b74ddb14c5002aba0c563690c4f980419563..29059cb1169abce895ac6eb6234f5c3285cf8866 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -1,32 +1,37 @@ package v15.cases; import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapperBuilder; -import java.util.concurrent.ExecutionException; - /** * 示例:简单示例--复杂点的 * * @author create by TcSnZh on 2021/5/8-下午10:29 */ class Case1 { + private static WorkerWrapperBuilder builder(String id) { return WorkerWrapper.builder() .id(id) .worker((param, allWrappers) -> { - System.out.println("wrapper(id=" + id + ") is working"); try { - Thread.sleep(50); + if ("F".equals(id)) { + System.out.println("wrapper(id=" + id + ") is working"); + Thread.sleep(12000); + } else { + System.out.println("wrapper(id=" + id + ") is worki444ng"); + } } catch (InterruptedException e) { e.printStackTrace(); } - return null; + return id; }); } public static void main(String[] args) { + long now = SystemClock.now(); WorkerWrapper a = builder("A").build(); WorkerWrapper d; builder("H") @@ -43,10 +48,11 @@ class Case1 { ) .build(); try { - Async.work(1000, a, d).awaitFinish(); + Async.work(10000, a, d).awaitFinish(); } catch (InterruptedException e) { e.printStackTrace(); } + System.out.println("now:" + (SystemClock.now() - now)); /* 输出: wrapper(id=D) is working wrapper(id=A) is working @@ -58,5 +64,6 @@ class Case1 { wrapper(id=H) is working */ } + } diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java index 90df6e99fcec6e743d01d0a038daba37aad4d4d9..5492b726d3a9d155a741125e9ccc9f4c00a9189e 100644 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java +++ b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java @@ -315,6 +315,7 @@ public class HashedWheelTimer extends AbstractWheelTimer { startTimeInitialized.countDown(); do { + //TODO 时间轮这里结束不了任务 final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask);