From 43fab80d9198b884b82290b4961f095f45f55fdf Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Thu, 16 Feb 2023 17:19:04 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20isDone=E5=88=A4=E6=96=AD=E4=B8=8D?= =?UTF-8?q?=E5=87=86=E7=A1=AE=EF=BC=8C=E6=9C=89=E5=8F=AF=E8=83=BD=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=98=AF=E6=AD=A3=E5=B8=B8=E9=80=80=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 通过延迟5秒,再次检测任务状态,如果5秒都没有反应,假定线程已经全部结束 --- .../com/jd/platform/async/executor/Async.java | 15 ++++++++++++--- .../src/test/java/v15/cases/Case15.java | 6 +++++- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index 7c66de9..1ed954c 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -136,9 +136,18 @@ public class Async { //任务结束就退出检查 if (onceWork.isFinish()) { break; - } else if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone() || future.isCancelled())) { - //完成或者取消就及时取消任务 - if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) { + } else if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isCancelled() || future.isDone())) { + //等5秒再去操作 + long startTime = SystemClock.now(); + long now = SystemClock.now(); + while (TimeUnit.SECONDS.toMillis(5) > now - startTime) { + now = SystemClock.now(); + } + //未超时、未完成或者未取消就取消任务 + if (!(onceWork.hasTimeout() + && onceWork.isFinish() + && onceWork.isCancelled() + && onceWork.isWaitingCancel())) { onceWork.pleaseCancel(); } break; diff --git a/asyncTool-core/src/test/java/v15/cases/Case15.java b/asyncTool-core/src/test/java/v15/cases/Case15.java index be1d997..09b99d6 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case15.java +++ b/asyncTool-core/src/test/java/v15/cases/Case15.java @@ -69,7 +69,7 @@ class Case15 { ) .build(); try { - OnceWork work = Async.work(5000, a, d); + OnceWork work = Async.work(10000, a, d); ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); pool.execute(() -> { @@ -95,8 +95,12 @@ class Case15 { } System.out.println("cost:" + (SystemClock.now() - now)); + int count=1; while (build.getWorkResult().getEx() == null) { //同步等待result数据写入 + if(count++>800){ + break; + } } System.out.println("输出H节点的结果----" + build.getWorkResult()); /* 输出: -- Gitee From fbd61ca1815fd0d736f1911777f978d5ddfcd60c Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Fri, 17 Feb 2023 07:41:32 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20=E4=BF=AE=E6=AD=A3=E4=B8=80=E5=A4=84?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E5=88=A4=E6=96=AD=E7=9A=84=E9=94=99=E8=AF=AF?= =?UTF-8?q?=EF=BC=8C=E5=BA=94=E8=AF=A5=E6=98=AF=E6=88=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jd/platform/async/executor/Async.java | 6 +- .../src/test/java/v15/cases/Case16.java | 164 ++++++++++++++++++ 2 files changed, 167 insertions(+), 3 deletions(-) create mode 100644 asyncTool-core/src/test/java/v15/cases/Case16.java diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index 1ed954c..5387614 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -145,9 +145,9 @@ public class Async { } //未超时、未完成或者未取消就取消任务 if (!(onceWork.hasTimeout() - && onceWork.isFinish() - && onceWork.isCancelled() - && onceWork.isWaitingCancel())) { + || onceWork.isFinish() + || onceWork.isCancelled() + || onceWork.isWaitingCancel())) { onceWork.pleaseCancel(); } break; diff --git a/asyncTool-core/src/test/java/v15/cases/Case16.java b/asyncTool-core/src/test/java/v15/cases/Case16.java new file mode 100644 index 0000000..a918134 --- /dev/null +++ b/asyncTool-core/src/test/java/v15/cases/Case16.java @@ -0,0 +1,164 @@ +package v15.cases; + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.OnceWork; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WorkerWrapperBuilder; + +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 示例:模拟线程池资源不够用的情况 + * + * @author create by kyle + */ +class Case16 { + + private static WorkerWrapperBuilder builder(String id) { + + return WorkerWrapper.builder() + .id(id) + .param(id + "X") + .worker(new MyWorker(id)) + .callback((new ICallback() { + @Override + public void begin() { + System.out.println("wrapper(id=" + id + ") has begin . "); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("\t\twrapper(id=" + id + ") callback " + + (success ? "success " : "fail ") + + ", workResult is " + workResult); + } + })) + .allowInterrupt(true); + } + + public static void main(String[] args) { + long now = SystemClock.now(); + WorkerWrapper a = builder("A").build(); + WorkerWrapper d; + WorkerWrapper k; + WorkerWrapper n; + WorkerWrapper q; + WorkerWrapper t; + WorkerWrapper w; + WorkerWrapper build = builder("H") + .depends( + builder("F") + .depends(builder("B").depends(a).build()) + .depends(builder("C").depends(a).build()) + .build(), + builder("G") + .depends(builder("E") + .depends(d = builder("D").build()) + .build()) + .build(), + builder("I") + .depends(builder("J") + .depends(k = builder("K").build()) + .build()) + .build(), + builder("L") + .depends(builder("M") + .depends(n = builder("N").build()) + .build()) + .build(), + builder("O") + .depends(builder("P") + .depends(q = builder("Q").build()) + .build()) + .build(), + builder("R") + .depends(builder("S") + .depends(t = builder("T").build()) + .build()) + .build(), + builder("U") + .depends(builder("V") + .depends(w = builder("W").build()) + .build()) + .build() + ) + .build(); + try { + OnceWork work = Async.work(1000000, a, d, k, n, q, t, w); + ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); + + pool.execute(() -> { + while (true) { + try { + if (work.isCancelled()) { + System.out.println("取消成功"); + } + if (work.isFinish()) { + //注意,这里的结果和“输出H节点的结果----”位置处的不一致,这是多线程写造成的 + System.out.println("结束成功" + build.getWorkResult()); + break; + } + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + work.awaitFinish(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("cost:" + (SystemClock.now() - now)); + int count = 1; + while (build.getWorkResult().getEx() == null) { + //同步等待result数据写入 + if (count++ > 800) { + break; + } + } + System.out.println("输出H节点的结果----" + build.getWorkResult()); + /* 输出: + wrapper(id=D) is working + wrapper(id=A) is working + wrapper(id=E) is working + wrapper(id=B) is working + wrapper(id=C) is working + wrapper(id=G) is working + wrapper(id=F) is working + wrapper(id=H) is working + */ + } + + private static class MyWorker implements IWorker { + + private String id; + + private int i = 0; + + public MyWorker(String id) { + this.id = id; + } + + @Override + public String action(String param, Map> allWrappers) { + try { + TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(20)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return id; + } + + } + +} + -- Gitee