diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index fa72efcf8be28522f2dcb38b09e6da9ebfbb49bc..7c66de957fd7e9bbb663a4960809c27d23aa679f 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -14,7 +14,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -28,6 +27,34 @@ public class Async { // ========================= 任务执行核心代码 ========================= + /** + * 在以前(及现在)的版本中: + * 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时,ExecutorService将会被记录下来。 + *

+ * 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个 + * + * @deprecated 不明意义、毫无用处的字段。记录之前使用的线程池没啥意义。 + */ + @SuppressWarnings("DeprecatedIsStillUsed") + @Deprecated + private static final AtomicReference lastExecutorService = new AtomicReference<>(null); + + /** + * 默认线程池。 + *

+ * 在v1.4及之前,该COMMON_POLL是被写死的。 + *

+ * 自v1.5后: + * 该线程池将会在第一次调用{@link #getCommonPool()}时懒加载。 + * tip: + * 要注意,{@link #work(long, WorkerWrapper[])}、{@link #work(long, Collection)}这些方法, + * 不传入线程池就会默认调用{@link #getCommonPool()},就会初始化线程池。 + *

+ * 该线程池将会给线程取名为asyncTool-commonPool-thread-0(数字不重复)。 + *

+ */ + private static volatile ThreadPoolExecutor COMMON_POOL; + /** * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。 * 使用uuid作为工作id。使用{@link #getCommonPool()}作为线程池。 @@ -57,6 +84,8 @@ public class Async { Objects.requireNonNull(workerWrappers, "workerWrappers array is null"))); } + // ========================= 设置/属性选项 ========================= + /** * {@link #work(long, ExecutorService, Collection, String)}方法的简易封装。 * 省略工作id,使用uuid。 @@ -74,6 +103,7 @@ public class Async { * @param executorService 执行线程池 * @param workerWrappers 任务容器集合 * @param workId 本次工作id + * * @return 返回 {@link OnceWork}任务句柄对象。 */ public static OnceWork work(long timeout, @@ -103,8 +133,11 @@ public class Async { } catch (InterruptedException e) { e.printStackTrace(); } - //完成或者取消就及时取消任务 - if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone()|| future.isCancelled())) { + //任务结束就退出检查 + if (onceWork.isFinish()) { + break; + } else if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone() || future.isCancelled())) { + //完成或者取消就及时取消任务 if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) { onceWork.pleaseCancel(); } @@ -115,36 +148,6 @@ public class Async { return onceWork; } - // ========================= 设置/属性选项 ========================= - - /** - * 默认线程池。 - *

- * 在v1.4及之前,该COMMON_POLL是被写死的。 - *

- * 自v1.5后: - * 该线程池将会在第一次调用{@link #getCommonPool()}时懒加载。 - * tip: - * 要注意,{@link #work(long, WorkerWrapper[])}、{@link #work(long, Collection)}这些方法, - * 不传入线程池就会默认调用{@link #getCommonPool()},就会初始化线程池。 - *

- * 该线程池将会给线程取名为asyncTool-commonPool-thread-0(数字不重复)。 - *

- */ - private static volatile ThreadPoolExecutor COMMON_POOL; - - /** - * 在以前(及现在)的版本中: - * 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时,ExecutorService将会被记录下来。 - *

- * 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个 - * - * @deprecated 不明意义、毫无用处的字段。记录之前使用的线程池没啥意义。 - */ - @SuppressWarnings("DeprecatedIsStillUsed") - @Deprecated - private static final AtomicReference lastExecutorService = new AtomicReference<>(null); - /** * 该方法将会返回{@link #COMMON_POOL},如果还未初始化则会懒加载初始化后再返回。 * 详情参考{@link #COMMON_POOL}上的注解 @@ -199,6 +202,7 @@ public class Async { /** * @param now 是否立即关闭 + * * @return 如果尚未调用过{@link #getCommonPool()},即没有初始化默认线程池,返回false。否则返回true。 */ @SuppressWarnings("unused") @@ -222,6 +226,7 @@ public class Async { * 同步执行一次任务。 * * @return 只要执行未超时,就返回true。 + * * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 */ @Deprecated @@ -330,6 +335,7 @@ public class Async { * 关闭指定的线程池 * * @param executorService 指定的线程池。传入null则会关闭默认线程池。 + * * @deprecated 没啥用的方法,要关闭线程池还不如直接调用线程池的关闭方法,避免歧义。 */ @Deprecated @@ -338,4 +344,5 @@ public class Async { executorService.shutdown(); } } + } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java index 36e7c564ab0026a07b1be39da47e1c422bc84d42..c99705459db190e6806bcce54c4a7907a7a3b39e 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java @@ -235,6 +235,9 @@ public interface WorkerWrapperBuilder { return setNext().wrapper(wrappers).end(); } + default WorkerWrapperBuilder nextOf2(Collection> wrappers) { + return setNext().wrapper(wrappers).end(); + } /** * 设置超时时间的具体属性 */