From fb7e3419cc332805432c5cbef5d24df77b382b90 Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Mon, 4 Jul 2022 19:58:32 +0800 Subject: [PATCH 1/3] =?UTF-8?q?refactor:=20=E5=B0=86=E6=94=B9=E5=8F=98?= =?UTF-8?q?=E5=80=BC=E7=9A=84=E5=87=BD=E6=95=B0=E6=8A=BD=E5=8F=96=E5=88=B0?= =?UTF-8?q?=E6=8A=BD=E8=B1=A1=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- asyncTool-core/src/test/java/v15/cases/Case1.java | 13 +++++++++++-- .../async/openutil/timer/AbstractWheelTimer.java | 9 +++++++++ .../async/openutil/timer/HashedWheelTimer.java | 8 ++------ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index d737d4e..fdac5b7 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -7,6 +7,9 @@ import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapperBuilder; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + /** * 示例:简单示例--复杂点的 * @@ -27,6 +30,7 @@ class Case1 { } } catch (InterruptedException e) { e.printStackTrace(); + throw new RuntimeException("被中断了"); } return id; }).callback((new ICallback() { @@ -37,6 +41,9 @@ class Case1 { @Override public void result(boolean success, String param, WorkResult workResult) { + // if ("H".equals(id)) { + // int a=1/0; + // } System.out.println("\t\twrapper(id=" + id + ") callback " + (success ? "success " : "fail ") + ", workResult is " + workResult); @@ -45,7 +52,8 @@ class Case1 { .allowInterrupt(true); } - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); long now = SystemClock.now(); WorkerWrapper a = builder("A").build(); WorkerWrapper d = builder("D").build(); @@ -63,10 +71,11 @@ class Case1 { ) .build(); try { - Async.work(1000, a, d).awaitFinish(); + Async.work(1000, executorService, a, d).awaitFinish(); } catch (InterruptedException e) { e.printStackTrace(); } + executorService.shutdown(); System.out.println("now:" + (SystemClock.now() - now)); /* 输出: wrapper(id=D) is working diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/AbstractWheelTimer.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/AbstractWheelTimer.java index d26fa1e..9cf4cfb 100644 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/AbstractWheelTimer.java +++ b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/AbstractWheelTimer.java @@ -1,5 +1,7 @@ package com.jd.platform.async.openutil.timer; +import java.util.concurrent.atomic.AtomicInteger; + /** * @author create by TcSnZh on 2021/5/12-下午6:36 */ @@ -8,6 +10,8 @@ public abstract class AbstractWheelTimer implements Timer, AutoCloseable { public static final int WORKER_STATE_STARTED = 1; public static final int WORKER_STATE_SHUTDOWN = 2; + protected final AtomicInteger workerState = new AtomicInteger(WORKER_STATE_INIT); // 0 - init, 1 - started, 2 - shut down + public abstract void start(); @SuppressWarnings("RedundantThrows") @@ -15,4 +19,9 @@ public abstract class AbstractWheelTimer implements Timer, AutoCloseable { public void close() throws Exception { stop(); } + + protected boolean changeState(int workerStateStarted, int workerStateShutdown) { + return workerState.compareAndSet(workerStateStarted, workerStateShutdown); + } + } diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java index fbb6d1d..0e472a4 100644 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java +++ b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java @@ -2,7 +2,6 @@ package com.jd.platform.async.openutil.timer; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; @@ -22,8 +21,6 @@ public class HashedWheelTimer extends AbstractWheelTimer { private final Worker worker = new Worker(); private final Thread workerThread; - @SuppressWarnings({"unused", "FieldMayBeFinal"}) - private final AtomicInteger workerState = new AtomicInteger(WORKER_STATE_INIT); // 0 - init, 1 - started, 2 - shut down private final long tickDuration; private final HashedWheelBucket[] wheel; @@ -207,7 +204,7 @@ public class HashedWheelTimer extends AbstractWheelTimer { public void start() { switch (workerState.get()) { case WORKER_STATE_INIT: - if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) { + if (changeState(WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; @@ -238,7 +235,7 @@ public class HashedWheelTimer extends AbstractWheelTimer { TimerTask.class.getSimpleName()); } - if (!workerState.compareAndSet(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { + if (!changeState(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // state is init or shutdown . return Collections.emptySet(); } @@ -315,7 +312,6 @@ public class HashedWheelTimer extends AbstractWheelTimer { startTimeInitialized.countDown(); do { - //TODO 时间轮这里一直执行,结束不了任务 final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask); -- Gitee From ffe9056600cdca39ab11fed746b726364c677b7e Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Wed, 1 Feb 2023 19:00:15 +0800 Subject: [PATCH 2/3] =?UTF-8?q?refactor:=20=E5=A6=82=E6=9E=9C=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=8F=91=E7=94=9FOOM=EF=BC=8C=E6=8F=90=E6=97=A9?= =?UTF-8?q?=E5=8F=96=E6=B6=88=E4=BB=BB=E5=8A=A1=EF=BC=8C=E8=80=8C=E4=B8=8D?= =?UTF-8?q?=E6=98=AF=E7=AD=89=E8=B6=85=E6=97=B6=EF=BC=8C=E4=BB=A5=E5=85=8D?= =?UTF-8?q?=E5=8F=91=E7=94=9F=E8=8A=82=E7=82=B9=E7=A9=BA=E5=BE=AA=E7=8E=AF?= =?UTF-8?q?=E6=A3=80=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jd/platform/async/executor/Async.java | 21 ++- .../jd/platform/async/worker/OnceWork.java | 13 ++ .../platform/async/wrapper/WorkerWrapper.java | 13 ++ .../src/test/java/v15/cases/Case15.java | 143 ++++++++++++++++++ 4 files changed, 188 insertions(+), 2 deletions(-) create mode 100644 asyncTool-core/src/test/java/v15/cases/Case15.java diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index 7cc703d..8f234be 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -85,13 +85,30 @@ public class Async { //保存上次执行的线程池变量(为了兼容以前的旧功能) Async.lastExecutorService.set(Objects.requireNonNull(executorService, "ExecutorService is null ! ")); final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout); - final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId); group.addWrapper(workerWrappers); + final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId); + //有多少个开始节点就有多少个线程,依赖任务靠被依赖任务的线程完成工作 workerWrappers.forEach(wrapper -> { if (wrapper == null) { return; } - executorService.submit(() -> wrapper.work(executorService, timeout, group)); + Future future = executorService.submit(() -> wrapper.work(executorService, timeout, group)); + onceWork.getAllThreadSubmit().add(future); + }); + executorService.execute(() -> { + while (true) { + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (onceWork.getAllThreadSubmit().stream().allMatch(Future::isDone)) { + if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) { + onceWork.pleaseCancel(); + } + break; + } + } }); return onceWork; } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java index de733f2..9f137d3 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java @@ -1,5 +1,6 @@ package com.jd.platform.async.worker; +import com.jd.platform.async.executor.PollingCenter; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapperGroup; @@ -270,9 +271,19 @@ public interface OnceWork { class Impl extends AbstractOnceWork { protected final WorkerWrapperGroup group; + /** + * 本次任务中所有线程提交 + */ + protected List> allThreadSubmit; + + public List> getAllThreadSubmit() { + return allThreadSubmit; + } + public Impl(WorkerWrapperGroup group, String workId) { super(workId); this.group = group; + allThreadSubmit = new ArrayList<>(group.getForParamUseWrappers().size()); } @Override @@ -321,6 +332,8 @@ public interface OnceWork { @Override public void pleaseCancel() { group.pleaseCancel(); + //发起检查,看看所有是否取消完毕 + PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); } } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index db522b5..aef7e4e 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -229,6 +229,19 @@ public abstract class WorkerWrapper { public void cancel() { if (State.setState(state, states_of_beforeWorkingEnd, SKIP, null)) { fastFail(false, new CancelException(), true); + //此处调用结果处理器让用户决定取消逻辑 + final Consumer __function__callbackResult = + success -> { + WorkResult _workResult = getWorkResult(); + try { + callback.result(success, param, _workResult); + } catch (Exception e) { + if (setState(state, states_of_skipOrAfterWork, ERROR, null)) { + fastFail(false, e, _workResult.getEx() instanceof EndsNormallyException); + } + } + }; + __function__callbackResult.accept(false); } } diff --git a/asyncTool-core/src/test/java/v15/cases/Case15.java b/asyncTool-core/src/test/java/v15/cases/Case15.java new file mode 100644 index 0000000..be1d997 --- /dev/null +++ b/asyncTool-core/src/test/java/v15/cases/Case15.java @@ -0,0 +1,143 @@ +package v15.cases; + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.OnceWork; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WorkerWrapperBuilder; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 示例:模拟内存溢出 + *

+ * 运行之前请设置 + * -Xmx20m -Xms20m + * + * 当内存溢出时,其中一个线程会OOM,runable不会继续调度, + * 我通过添加一个线程主动cancel来达到提前结束任务而不是等超时 + * + * @author create by kyle + */ +class Case15 { + + private static WorkerWrapperBuilder builder(String id) { + + return WorkerWrapper.builder() + .id(id) + .param(id + "X") + .worker(new MyWorker(id)) + .callback((new ICallback() { + @Override + public void begin() { + System.out.println("wrapper(id=" + id + ") has begin . "); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("\t\twrapper(id=" + id + ") callback " + + (success ? "success " : "fail ") + + ", workResult is " + workResult); + } + })) + .allowInterrupt(true); + } + + public static void main(String[] args) { + long now = SystemClock.now(); + WorkerWrapper a = builder("A").build(); + WorkerWrapper d; + WorkerWrapper build = builder("H") + .depends( + builder("F") + .depends(builder("B").depends(a).build()) + .depends(builder("C").depends(a).build()) + .build(), + builder("G") + .depends(builder("E") + .depends(d = builder("D").build()) + .build()) + .build() + ) + .build(); + try { + OnceWork work = Async.work(5000, a, d); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); + + pool.execute(() -> { + while (true) { + try { + if (work.isCancelled()) { + System.out.println("取消成功"); + } + if (work.isFinish()) { + //注意,这里的结果和“输出H节点的结果----”位置处的不一致,这是多线程写造成的 + System.out.println("结束成功" + build.getWorkResult()); + break; + } + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + work.awaitFinish(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("cost:" + (SystemClock.now() - now)); + while (build.getWorkResult().getEx() == null) { + //同步等待result数据写入 + } + System.out.println("输出H节点的结果----" + build.getWorkResult()); + /* 输出: + wrapper(id=D) is working + wrapper(id=A) is working + wrapper(id=E) is working + wrapper(id=B) is working + wrapper(id=C) is working + wrapper(id=G) is working + wrapper(id=F) is working + wrapper(id=H) is working + */ + } + + private static class MyWorker implements IWorker { + + //用于存放模拟的对象,防止GC回收,用List做对象引用 + private final List list = new LinkedList<>(); + + private String id; + + private int i = 0; + + public MyWorker(String id) { + this.id = id; + } + + @Override + public String action(String param, Map> allWrappers) { + if ("F".equals(id)) { + System.out.println("wrapper(id=" + id + ") is working"); + while (true) { + System.out.println("I am alive:" + i++); + byte[] buf = new byte[1024 * 1024]; + list.add(buf); + } + } + return id; + } + + } + +} + -- Gitee From e56521caf32d998638fd5a840393db9c1411eb09 Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Wed, 1 Feb 2023 19:27:57 +0800 Subject: [PATCH 3/3] =?UTF-8?q?refactor:=20=E5=8A=A0=E4=B8=8A=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=B7=B2=E7=BB=8F=E5=8F=96=E6=B6=88=E7=9A=84=E6=83=85?= =?UTF-8?q?=E5=86=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/jd/platform/async/executor/Async.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index 8f234be..fa72efc 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -14,6 +14,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -102,7 +103,8 @@ public class Async { } catch (InterruptedException e) { e.printStackTrace(); } - if (onceWork.getAllThreadSubmit().stream().allMatch(Future::isDone)) { + //完成或者取消就及时取消任务 + if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone()|| future.isCancelled())) { if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) { onceWork.pleaseCancel(); } -- Gitee