diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index 7c66de957fd7e9bbb663a4960809c27d23aa679f..5387614e696e6e54c8c1bdb524ff365522103255 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -136,9 +136,18 @@ public class Async { //任务结束就退出检查 if (onceWork.isFinish()) { break; - } else if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone() || future.isCancelled())) { - //完成或者取消就及时取消任务 - if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) { + } else if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isCancelled() || future.isDone())) { + //等5秒再去操作 + long startTime = SystemClock.now(); + long now = SystemClock.now(); + while (TimeUnit.SECONDS.toMillis(5) > now - startTime) { + now = SystemClock.now(); + } + //未超时、未完成或者未取消就取消任务 + if (!(onceWork.hasTimeout() + || onceWork.isFinish() + || onceWork.isCancelled() + || onceWork.isWaitingCancel())) { onceWork.pleaseCancel(); } break; diff --git a/asyncTool-core/src/test/java/v15/cases/Case15.java b/asyncTool-core/src/test/java/v15/cases/Case15.java index be1d99745ff3d029d163c794bed2f8edd50d73e3..09b99d67b4e9774e5b163a5b9744249bc698a5d3 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case15.java +++ b/asyncTool-core/src/test/java/v15/cases/Case15.java @@ -69,7 +69,7 @@ class Case15 { ) .build(); try { - OnceWork work = Async.work(5000, a, d); + OnceWork work = Async.work(10000, a, d); ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); pool.execute(() -> { @@ -95,8 +95,12 @@ class Case15 { } System.out.println("cost:" + (SystemClock.now() - now)); + int count=1; while (build.getWorkResult().getEx() == null) { //同步等待result数据写入 + if(count++>800){ + break; + } } System.out.println("输出H节点的结果----" + build.getWorkResult()); /* 输出: diff --git a/asyncTool-core/src/test/java/v15/cases/Case16.java b/asyncTool-core/src/test/java/v15/cases/Case16.java new file mode 100644 index 0000000000000000000000000000000000000000..a9181343581ad8a06af653b8eacb17efab85253c --- /dev/null +++ b/asyncTool-core/src/test/java/v15/cases/Case16.java @@ -0,0 +1,164 @@ +package v15.cases; + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.OnceWork; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WorkerWrapperBuilder; + +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 示例:模拟线程池资源不够用的情况 + * + * @author create by kyle + */ +class Case16 { + + private static WorkerWrapperBuilder builder(String id) { + + return WorkerWrapper.builder() + .id(id) + .param(id + "X") + .worker(new MyWorker(id)) + .callback((new ICallback() { + @Override + public void begin() { + System.out.println("wrapper(id=" + id + ") has begin . "); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("\t\twrapper(id=" + id + ") callback " + + (success ? "success " : "fail ") + + ", workResult is " + workResult); + } + })) + .allowInterrupt(true); + } + + public static void main(String[] args) { + long now = SystemClock.now(); + WorkerWrapper a = builder("A").build(); + WorkerWrapper d; + WorkerWrapper k; + WorkerWrapper n; + WorkerWrapper q; + WorkerWrapper t; + WorkerWrapper w; + WorkerWrapper build = builder("H") + .depends( + builder("F") + .depends(builder("B").depends(a).build()) + .depends(builder("C").depends(a).build()) + .build(), + builder("G") + .depends(builder("E") + .depends(d = builder("D").build()) + .build()) + .build(), + builder("I") + .depends(builder("J") + .depends(k = builder("K").build()) + .build()) + .build(), + builder("L") + .depends(builder("M") + .depends(n = builder("N").build()) + .build()) + .build(), + builder("O") + .depends(builder("P") + .depends(q = builder("Q").build()) + .build()) + .build(), + builder("R") + .depends(builder("S") + .depends(t = builder("T").build()) + .build()) + .build(), + builder("U") + .depends(builder("V") + .depends(w = builder("W").build()) + .build()) + .build() + ) + .build(); + try { + OnceWork work = Async.work(1000000, a, d, k, n, q, t, w); + ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); + + pool.execute(() -> { + while (true) { + try { + if (work.isCancelled()) { + System.out.println("取消成功"); + } + if (work.isFinish()) { + //注意,这里的结果和“输出H节点的结果----”位置处的不一致,这是多线程写造成的 + System.out.println("结束成功" + build.getWorkResult()); + break; + } + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + work.awaitFinish(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("cost:" + (SystemClock.now() - now)); + int count = 1; + while (build.getWorkResult().getEx() == null) { + //同步等待result数据写入 + if (count++ > 800) { + break; + } + } + System.out.println("输出H节点的结果----" + build.getWorkResult()); + /* 输出: + wrapper(id=D) is working + wrapper(id=A) is working + wrapper(id=E) is working + wrapper(id=B) is working + wrapper(id=C) is working + wrapper(id=G) is working + wrapper(id=F) is working + wrapper(id=H) is working + */ + } + + private static class MyWorker implements IWorker { + + private String id; + + private int i = 0; + + public MyWorker(String id) { + this.id = id; + } + + @Override + public String action(String param, Map> allWrappers) { + try { + TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(20)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return id; + } + + } + +} +