From fb8e0278865bd6a5e5470d65464a00dcbfa68942 Mon Sep 17 00:00:00 2001 From: klaokai <573984425@qq.com> Date: Mon, 20 Feb 2023 17:41:26 +0800 Subject: [PATCH] =?UTF-8?q?refactor=EF=BC=9A=E6=B7=BB=E5=8A=A0=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E7=BA=BF=E7=A8=8B=E5=8F=8A=E6=97=B6=E6=A3=80=E6=B5=8B?= =?UTF-8?q?=E6=95=B4=E4=B8=AA=E6=B5=81=E7=A8=8B=E6=98=AF=E5=90=A6OOM?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 具体测试情况可以查看Case15.java测试代码 --- .../com/jd/platform/async/executor/Async.java | 69 +++-- .../executor/ExecutorServiceWrapper.java | 107 +++++++ .../jd/platform/async/worker/OnceWork.java | 51 ++-- .../platform/async/wrapper/WorkerWrapper.java | 284 ++++++++++-------- .../src/test/java/v15/cases/Case15.java | 71 ++++- .../src/test/java/v15/cases/Case16.java | 199 ++++++++++++ 6 files changed, 591 insertions(+), 190 deletions(-) create mode 100644 asyncTool-core/src/main/java/com/jd/platform/async/executor/ExecutorServiceWrapper.java create mode 100644 asyncTool-core/src/test/java/v15/cases/Case16.java diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index 7c66de9..054a949 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -10,6 +10,7 @@ import com.jd.platform.async.wrapper.WorkerWrapperGroup; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import java.math.BigDecimal; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @@ -103,7 +104,6 @@ public class Async { * @param executorService 执行线程池 * @param workerWrappers 任务容器集合 * @param workId 本次工作id - * * @return 返回 {@link OnceWork}任务句柄对象。 */ public static OnceWork work(long timeout, @@ -118,33 +118,16 @@ public class Async { final WorkerWrapperGroup group = new WorkerWrapperGroup(SystemClock.now(), timeout); group.addWrapper(workerWrappers); final OnceWork.Impl onceWork = new OnceWork.Impl(group, workId); + ExecutorServiceWrapper executorServiceWrapper = new ExecutorServiceWrapper(executorService); + //有多少个开始节点就有多少个线程,依赖任务靠被依赖任务的线程完成工作 workerWrappers.forEach(wrapper -> { if (wrapper == null) { return; } - Future future = executorService.submit(() -> wrapper.work(executorService, timeout, group)); - onceWork.getAllThreadSubmit().add(future); - }); - executorService.execute(() -> { - while (true) { - try { - TimeUnit.MILLISECONDS.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - //任务结束就退出检查 - if (onceWork.isFinish()) { - break; - } else if (onceWork.getAllThreadSubmit().stream().allMatch(future -> future.isDone() || future.isCancelled())) { - //完成或者取消就及时取消任务 - if (!onceWork.isCancelled() && !onceWork.isWaitingCancel()) { - onceWork.pleaseCancel(); - } - break; - } - } + executorServiceWrapper.addThreadSubmit(new TaskCallable(wrapper, timeout, group, executorServiceWrapper)); }); + executorServiceWrapper.startCheck(onceWork); return onceWork; } @@ -202,7 +185,6 @@ public class Async { /** * @param now 是否立即关闭 - * * @return 如果尚未调用过{@link #getCommonPool()},即没有初始化默认线程池,返回false。否则返回true。 */ @SuppressWarnings("unused") @@ -220,13 +202,10 @@ public class Async { return true; } - // ========================= deprecated ========================= - /** * 同步执行一次任务。 * * @return 只要执行未超时,就返回true。 - * * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 */ @Deprecated @@ -256,6 +235,8 @@ public class Async { return beginWork(timeout, executorService, workerWrappers); } + // ========================= deprecated ========================= + /** * 同步阻塞,直到所有都完成,或失败 * @@ -335,7 +316,6 @@ public class Async { * 关闭指定的线程池 * * @param executorService 指定的线程池。传入null则会关闭默认线程池。 - * * @deprecated 没啥用的方法,要关闭线程池还不如直接调用线程池的关闭方法,避免歧义。 */ @Deprecated @@ -345,4 +325,39 @@ public class Async { } } + public static class TaskCallable implements Callable { + + private final WorkerWrapperGroup group; + + private final long timeout; + + private final WorkerWrapper wrapper; + + private final ExecutorServiceWrapper executorServiceWrapper; + + private final WorkerWrapper workerWrapper; + + public TaskCallable(WorkerWrapper wrapper, long timeout, WorkerWrapperGroup group, ExecutorServiceWrapper executorServiceWrapper) { + this.wrapper = wrapper; + this.group = group; + this.timeout = timeout; + this.executorServiceWrapper = executorServiceWrapper; + this.workerWrapper = null; + } + + public TaskCallable(WorkerWrapper wrapper, long timeout, WorkerWrapperGroup group, ExecutorServiceWrapper executorService, WorkerWrapper workerWrapper) { + this.wrapper = wrapper; + this.group = group; + this.timeout = timeout; + this.executorServiceWrapper = executorService; + this.workerWrapper = workerWrapper; + } + + @Override + public BigDecimal call() throws Exception { + wrapper.work(executorServiceWrapper, this.workerWrapper, timeout, group); + return BigDecimal.ZERO; + } + + } } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/ExecutorServiceWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/ExecutorServiceWrapper.java new file mode 100644 index 0000000..4ebb4a3 --- /dev/null +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/ExecutorServiceWrapper.java @@ -0,0 +1,107 @@ +package com.jd.platform.async.executor; + +import com.jd.platform.async.worker.OnceWork; + +import java.util.concurrent.*; + +public class ExecutorServiceWrapper { + + private final ExecutorService executorService; + + /** + * 本次任务中所有线程提交 + */ + protected LinkedBlockingQueue> allThreadSubmit; + + public ExecutorServiceWrapper(ExecutorService executorService) { + this.executorService = executorService; + this.allThreadSubmit = new LinkedBlockingQueue<>(); + } + + public LinkedBlockingQueue> getAllThreadSubmit() { + return allThreadSubmit; + } + + public void addThreadSubmit(Async.TaskCallable callable) { + allThreadSubmit.add(executorService.submit(callable)); + } + + public void startCheck(final OnceWork.Impl onceWork) { + executorService.execute(new ThreadCheckRunable(onceWork, this)); + } + + private static class ThreadCheckRunable implements Runnable { + + private final OnceWork.Impl onceWork; + + private final ExecutorServiceWrapper executorServiceWrapper; + + public ThreadCheckRunable(OnceWork.Impl onceWork, ExecutorServiceWrapper executorServiceWrapper) { + this.onceWork = onceWork; + this.executorServiceWrapper = executorServiceWrapper; + } + + @Override + public void run() { + while (true) { + try { + TimeUnit.MILLISECONDS.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + //任务结束就退出检查 + if (onceWork.isFinish()) { + break; + } else if (executorServiceWrapper.getAllThreadSubmit().size() > 0) { + boolean isException = false; + boolean isCancelld = false; + boolean isDone = false; + for (Future item : executorServiceWrapper.getAllThreadSubmit()) { + try { + //完成并且没有返回 + if (item.isCancelled()) { + isCancelld = true; + } + if ((item.isDone() && item.get(500, TimeUnit.MILLISECONDS) == null)) { + isDone = true; + } + } catch (InterruptedException e) { + //中断等 + e.printStackTrace(); + isException = true; + } catch (ExecutionException e) { + //内存溢出等 + e.printStackTrace(); + isException = true; + } catch (TimeoutException e) { + //超时不管,继续检查 + } + } + + //异常或者有线程取消 + if (isException || isCancelld) { + //未超时、未完成或者未取消就取消任务 + while (!(onceWork.hasTimeout() + || onceWork.isFinish() + || onceWork.isCancelled() + || onceWork.isWaitingCancel())) { + onceWork.pleaseCancel(); + } + break; + } else { + if (isDone) { + System.out.println("部分任务已经在线程池完成"); + } + //没有的话继续完成 + onceWork.check(); + } + } else { + //FIXME 高强度检查会不会造成检查线程过多? + onceWork.check(); + } + } + } + + } + +} diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java index 9f137d3..50fcff6 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java @@ -6,7 +6,10 @@ import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapperGroup; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; @@ -16,6 +19,14 @@ import java.util.stream.Collectors; * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午3:22 */ public interface OnceWork { + + /** + * 空任务 + */ + static OnceWork emptyWork(String workId) { + return new EmptyWork(workId); + } + /** * 返回唯一的workId */ @@ -126,6 +137,8 @@ public interface OnceWork { return new AsFuture(this, limitTime -> limitTime / 16); } + // static + /** * 返回{@link Future}视图。 * @@ -137,19 +150,12 @@ public interface OnceWork { return new AsFuture(this, sleepCheckInterval); } - // static - - /** - * 空任务 - */ - static OnceWork emptyWork(String workId) { - return new EmptyWork(workId); - } - // class class AsFuture implements Future>> { + private final OnceWork onceWork; + private final Function sleepCheckInterval; private AsFuture(OnceWork onceWork, Function sleepCheckInterval) { @@ -217,9 +223,11 @@ public interface OnceWork { public String toString() { return "(asFuture from " + onceWork + ")@" + Integer.toHexString(this.hashCode()); } + } abstract class AbstractOnceWork implements OnceWork { + protected final String workId; public AbstractOnceWork(String workId) { @@ -266,24 +274,16 @@ public interface OnceWork { .append(", wrappers::getId=").append(getWrappers().keySet()) .append('}').toString(); } + } class Impl extends AbstractOnceWork { - protected final WorkerWrapperGroup group; - - /** - * 本次任务中所有线程提交 - */ - protected List> allThreadSubmit; - public List> getAllThreadSubmit() { - return allThreadSubmit; - } + protected final WorkerWrapperGroup group; public Impl(WorkerWrapperGroup group, String workId) { super(workId); this.group = group; - allThreadSubmit = new ArrayList<>(group.getForParamUseWrappers().size()); } @Override @@ -331,13 +331,20 @@ public interface OnceWork { @Override public void pleaseCancel() { - group.pleaseCancel(); + if (group.pleaseCancel()) { + check(); + } + } + + public void check() { //发起检查,看看所有是否取消完毕 PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); } + } class EmptyWork extends AbstractOnceWork { + private final long initTime = SystemClock.now(); public EmptyWork(String workId) { @@ -392,5 +399,7 @@ public interface OnceWork { public String toString() { return "(it's empty work)"; } + } + } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index aef7e4e..fff1ab4 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -6,6 +6,8 @@ import com.jd.platform.async.callback.IWorker; import com.jd.platform.async.exception.CancelException; import com.jd.platform.async.exception.EndsNormallyException; import com.jd.platform.async.exception.SkippedException; +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.ExecutorServiceWrapper; import com.jd.platform.async.executor.PollingCenter; import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.worker.ResultState; @@ -18,7 +20,6 @@ import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy; import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -43,49 +44,59 @@ public abstract class WorkerWrapper { * 该wrapper的唯一标识 */ protected final String id; + protected final IWorker worker; + protected final ICallback callback; - /** - * 各种策略的封装类。 - */ - private final WrapperStrategy wrapperStrategy; + /** * 是否允许被打断 */ protected final boolean allowInterrupt; + + /** + * 原子设置wrapper的状态 + *

+ * {@link State}此枚举类枚举了state值所代表的状态枚举。 + */ + protected final AtomicInteger state = new AtomicInteger(BUILDING.id); + + /** + * 该值将在{@link IWorker#action(Object, Map)}进行时设为当前线程,在任务开始前或结束后都为null。 + */ + protected final AtomicReference doWorkingThread = new AtomicReference<>(); + + /** + * 也是个钩子变量,用来存临时的结果 + */ + protected final AtomicReference> workResult = new AtomicReference<>(null); + /** * 是否启动超时检查 */ final boolean enableTimeout; + + // ========== 临时属性 ========== + /** * 超时时间长度 */ final long timeoutLength; + /** * 超时时间单位 */ final TimeUnit timeoutUnit; - // ========== 临时属性 ========== + /** + * 各种策略的封装类。 + */ + private final WrapperStrategy wrapperStrategy; /** * worker将来要处理的param */ protected volatile T param; - /** - * 原子设置wrapper的状态 - *

- * {@link State}此枚举类枚举了state值所代表的状态枚举。 - */ - protected final AtomicInteger state = new AtomicInteger(BUILDING.id); - /** - * 该值将在{@link IWorker#action(Object, Map)}进行时设为当前线程,在任务开始前或结束后都为null。 - */ - protected final AtomicReference doWorkingThread = new AtomicReference<>(); - /** - * 也是个钩子变量,用来存临时的结果 - */ - protected final AtomicReference> workResult = new AtomicReference<>(null); WorkerWrapper(String id, IWorker worker, @@ -126,6 +137,10 @@ public abstract class WorkerWrapper { // ========== public ========== + public static WorkerWrapperBuilder builder() { + return new Builder<>(); + } + /** * 外部调用本线程运行此wrapper的入口方法。 * 该方法将会确定这组wrapper所属的group。 @@ -135,7 +150,7 @@ public abstract class WorkerWrapper { * @param group wrapper组 * @throws IllegalStateException 当wrapper正在building状态时被启动,则会抛出该异常。 */ - public void work(ExecutorService executorService, + public void work(ExecutorServiceWrapper executorService, long remainTime, WorkerWrapperGroup group) { work(executorService, null, remainTime, group); @@ -167,11 +182,17 @@ public abstract class WorkerWrapper { */ public abstract Set> getNextWrappers(); + abstract void setNextWrappers(Set> nextWrappers); + /** * 获取上游wrapper */ public abstract Set> getDependWrappers(); + abstract void setDependWrappers(Set> dependWrappers); + + // ========== protected ========== + /** * 获取本wrapper的超时情况。如有必要还会修改wrapper状态。 * @@ -249,8 +270,6 @@ public abstract class WorkerWrapper { return wrapperStrategy; } - // ========== protected ========== - /** * 工作的核心方法。 * @@ -258,10 +277,10 @@ public abstract class WorkerWrapper { * @param remainTime 剩余时间。 * @throws IllegalStateException 当wrapper正在building状态时被启动,则会抛出该异常。 */ - protected void work(ExecutorService executorService, - WorkerWrapper fromWrapper, - long remainTime, - WorkerWrapperGroup group + public void work(ExecutorServiceWrapper executorService, + WorkerWrapper fromWrapper, + long remainTime, + WorkerWrapperGroup group ) { long now = SystemClock.now(); // ================================================ @@ -297,7 +316,7 @@ public abstract class WorkerWrapper { if (setState(state, WORKING, AFTER_WORK)) { __function__callbackResultOfFalse_beginNext.accept(true); } - }else { + } else { //如果任务超时,需要将最后那个超时任务设置为超时异常结束的 if (setState(state, WORKING, ERROR)) { __function__fastFail_callbackResult$false_beginNext.accept(true, new TimeoutException()); @@ -368,10 +387,7 @@ public abstract class WorkerWrapper { wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper); switch (judge.getDependenceAction()) { case TAKE_REST: - //FIXME 等待200毫秒重新投入线程池,主要为了调起最后一个任务 - Thread.sleep(200L); - executorService.submit(() -> this.work(executorService, fromWrapper, - remainTime - (SystemClock.now() - now), group)); + PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); return; case FAST_FAIL: if (setState(state, STARTED, ERROR)) { @@ -401,11 +417,13 @@ public abstract class WorkerWrapper { } } + // ========== hashcode and equals ========== /** * 本工作线程执行自己的job. *

* 本方法不负责校验状态。请在调用前自行检验 + * * @return */ protected boolean fire(WorkerWrapperGroup group) { @@ -446,12 +464,14 @@ public abstract class WorkerWrapper { )); } + // ========== builder ========== + /** * 进行下一个任务 *

* 本方法不负责校验状态。请在调用前自行检验 */ - protected void beginNext(ExecutorService executorService, long now, long remainTime, WorkerWrapperGroup group) { + protected void beginNext(ExecutorServiceWrapper executorService, long now, long remainTime, WorkerWrapperGroup group) { //花费的时间 final long costTime = SystemClock.now() - now; final long nextRemainTIme = remainTime - costTime; @@ -478,9 +498,9 @@ public abstract class WorkerWrapper { else { try { group.addWrapper(nextWrappers); - nextWrappers.forEach(next -> executorService.submit(() -> - next.work(executorService, this, nextRemainTIme, group)) - ); + nextWrappers.forEach(next -> executorService.addThreadSubmit( + new Async.TaskCallable(next, nextRemainTIme, group, executorService, this) + )); setState(state, AFTER_WORK, SUCCESS); } finally { PollingCenter.getInstance().checkGroup(group.new CheckFinishTask()); @@ -489,13 +509,13 @@ public abstract class WorkerWrapper { } - // ========== hashcode and equals ========== - @Override public boolean equals(Object o) { return super.equals(o); } + // ========== package access methods ========== + /** * {@code return id.hashCode();}返回id值的hashcode */ @@ -505,33 +525,6 @@ public abstract class WorkerWrapper { return id.hashCode(); } - // ========== builder ========== - - public static WorkerWrapperBuilder builder() { - return new Builder<>(); - } - - /** - * 自v1.5,该类被抽取到{@link StableWorkerWrapperBuilder}抽象类,兼容之前的版本。 - */ - public static class Builder extends StableWorkerWrapperBuilder> { - /** - * @deprecated 建议使用 {@link #builder()}返回{@link WorkerWrapperBuilder}接口,以调用v1.5之后的规范api - */ - @SuppressWarnings("DeprecatedIsStillUsed") - @Deprecated - public Builder() { - } - } - - // ========== package access methods ========== - - abstract void setNextWrappers(Set> nextWrappers); - - abstract void setDependWrappers(Set> dependWrappers); - - // ========== toString ========== - @Override public String toString() { final StringBuilder sb = new StringBuilder(256) @@ -567,56 +560,7 @@ public abstract class WorkerWrapper { return sb.toString(); } - /** - * 一个通用的策略器实现类,提供了修改的功能。并兼容之前的代码。 - */ - public static class StableWrapperStrategy extends WrapperStrategy.AbstractWrapperStrategy { - private DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper; - private DependMustStrategyMapper dependMustStrategyMapper; - private DependenceStrategy dependenceStrategy; - private SkipStrategy skipStrategy; - - @Override - public DependOnUpWrapperStrategyMapper getDependWrapperStrategyMapper() { - return dependOnUpWrapperStrategyMapper; - } - - @Override - public void setDependWrapperStrategyMapper(DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper) { - this.dependOnUpWrapperStrategyMapper = dependOnUpWrapperStrategyMapper; - } - - @SuppressWarnings("deprecation") - @Override - public DependMustStrategyMapper getDependMustStrategyMapper() { - return dependMustStrategyMapper; - } - - @Override - public void setDependMustStrategyMapper(DependMustStrategyMapper dependMustStrategyMapper) { - this.dependMustStrategyMapper = dependMustStrategyMapper; - } - - @Override - public DependenceStrategy getDependenceStrategy() { - return dependenceStrategy; - } - - @Override - public void setDependenceStrategy(DependenceStrategy dependenceStrategy) { - this.dependenceStrategy = dependenceStrategy; - } - - @Override - public SkipStrategy getSkipStrategy() { - return skipStrategy; - } - - @Override - public void setSkipStrategy(SkipStrategy skipStrategy) { - this.skipStrategy = skipStrategy; - } - } + // ========== toString ========== /** * state状态枚举工具类 @@ -658,27 +602,33 @@ public abstract class WorkerWrapper { // public - public boolean finished() { - return this == SUCCESS || this == ERROR || this == SKIP; - } + static final State[] states_of_notWorked = new State[]{INIT, STARTED}; // package - State(int id) { - this.id = id; - } + static final State[] states_of_skipOrAfterWork = new State[]{SKIP, AFTER_WORK}; - final int id; + static final State[] states_of_beforeWorkingEnd = new State[]{INIT, STARTED, WORKING}; // package-static - static final State[] states_of_notWorked = new State[]{INIT, STARTED}; + static final State[] states_all = new State[]{BUILDING, INIT, STARTED, WORKING, AFTER_WORK, SUCCESS, ERROR, SKIP}; - static final State[] states_of_skipOrAfterWork = new State[]{SKIP, AFTER_WORK}; + static final Map id2state; - static final State[] states_of_beforeWorkingEnd = new State[]{INIT, STARTED, WORKING}; + static { + HashMap map = new HashMap<>(); + for (State s : State.values()) { + map.put(s.id, s); + } + id2state = Collections.unmodifiableMap(map); + } - static final State[] states_all = new State[]{BUILDING, INIT, STARTED, WORKING, AFTER_WORK, SUCCESS, ERROR, SKIP}; + final int id; + + State(int id) { + this.id = id; + } /** * 自旋+CAS的设置状态,如果状态不在exceptValues返回内 或 没有设置成功,则返回false。 @@ -778,16 +728,81 @@ public abstract class WorkerWrapper { return id2state.get(id); } - static final Map id2state; + public boolean finished() { + return this == SUCCESS || this == ERROR || this == SKIP; + } - static { - HashMap map = new HashMap<>(); - for (State s : State.values()) { - map.put(s.id, s); - } - id2state = Collections.unmodifiableMap(map); + + } + + /** + * 自v1.5,该类被抽取到{@link StableWorkerWrapperBuilder}抽象类,兼容之前的版本。 + */ + public static class Builder extends StableWorkerWrapperBuilder> { + + /** + * @deprecated 建议使用 {@link #builder()}返回{@link WorkerWrapperBuilder}接口,以调用v1.5之后的规范api + */ + @SuppressWarnings("DeprecatedIsStillUsed") + @Deprecated + public Builder() { + } + + } + + /** + * 一个通用的策略器实现类,提供了修改的功能。并兼容之前的代码。 + */ + public static class StableWrapperStrategy extends WrapperStrategy.AbstractWrapperStrategy { + + private DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper; + + private DependMustStrategyMapper dependMustStrategyMapper; + + private DependenceStrategy dependenceStrategy; + + private SkipStrategy skipStrategy; + + @Override + public DependOnUpWrapperStrategyMapper getDependWrapperStrategyMapper() { + return dependOnUpWrapperStrategyMapper; + } + + @Override + public void setDependWrapperStrategyMapper(DependOnUpWrapperStrategyMapper dependOnUpWrapperStrategyMapper) { + this.dependOnUpWrapperStrategyMapper = dependOnUpWrapperStrategyMapper; + } + + @SuppressWarnings("deprecation") + @Override + public DependMustStrategyMapper getDependMustStrategyMapper() { + return dependMustStrategyMapper; + } + + @Override + public void setDependMustStrategyMapper(DependMustStrategyMapper dependMustStrategyMapper) { + this.dependMustStrategyMapper = dependMustStrategyMapper; + } + + @Override + public DependenceStrategy getDependenceStrategy() { + return dependenceStrategy; + } + + @Override + public void setDependenceStrategy(DependenceStrategy dependenceStrategy) { + this.dependenceStrategy = dependenceStrategy; + } + + @Override + public SkipStrategy getSkipStrategy() { + return skipStrategy; } + @Override + public void setSkipStrategy(SkipStrategy skipStrategy) { + this.skipStrategy = skipStrategy; + } } @@ -795,8 +810,11 @@ public abstract class WorkerWrapper { * 这是因未知错误而引发的异常 */ public static class NotExpectedException extends Exception { + public NotExpectedException(Throwable cause, WorkerWrapper wrapper) { super("It's should not happened Exception . wrapper is " + wrapper, cause); } + } + } diff --git a/asyncTool-core/src/test/java/v15/cases/Case15.java b/asyncTool-core/src/test/java/v15/cases/Case15.java index be1d997..ef89848 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case15.java +++ b/asyncTool-core/src/test/java/v15/cases/Case15.java @@ -12,16 +12,15 @@ import com.jd.platform.async.wrapper.WorkerWrapperBuilder; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.UUID; +import java.util.concurrent.*; /** * 示例:模拟内存溢出 *

- * 运行之前请设置 + * 运行内存溢出之前请设置 * -Xmx20m -Xms20m - * + *

* 当内存溢出时,其中一个线程会OOM,runable不会继续调度, * 我通过添加一个线程主动cancel来达到提前结束任务而不是等超时 * @@ -33,7 +32,7 @@ class Case15 { return WorkerWrapper.builder() .id(id) - .param(id + "X") + .param(UUID.randomUUID().toString()) .worker(new MyWorker(id)) .callback((new ICallback() { @Override @@ -69,12 +68,24 @@ class Case15 { ) .build(); try { - OnceWork work = Async.work(5000, a, d); + ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + throw new RejectedExecutionException("Task " + r.toString() + + " rejectexxxxd from " + + e.toString()); + } + }); + OnceWork work = Async.work(10000, executor, a, d); ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)); pool.execute(() -> { while (true) { try { + if (work.hasTimeout()) { + System.out.println("超时"); + } if (work.isCancelled()) { System.out.println("取消成功"); } @@ -95,8 +106,12 @@ class Case15 { } System.out.println("cost:" + (SystemClock.now() - now)); + int count = 1; while (build.getWorkResult().getEx() == null) { //同步等待result数据写入 + if (count++ > 800) { + break; + } } System.out.println("输出H节点的结果----" + build.getWorkResult()); /* 输出: @@ -116,7 +131,7 @@ class Case15 { //用于存放模拟的对象,防止GC回收,用List做对象引用 private final List list = new LinkedList<>(); - private String id; + private final String id; private int i = 0; @@ -127,13 +142,51 @@ class Case15 { @Override public String action(String param, Map> allWrappers) { if ("F".equals(id)) { - System.out.println("wrapper(id=" + id + ") is working"); while (true) { + System.out.println("wrapper(id=" + id + ") is working"); System.out.println("I am alive:" + i++); + /* + 第一种问题,内存溢出OOM,由系统取消任务执行,H的结果为{result=null, resultState=DEFAULT, ex=null},因为没有跑到H,所以H的结果为null + + + 取消成功,结束成功 + */ byte[] buf = new byte[1024 * 1024]; list.add(buf); + /* + 第二种问题,存在异常,H的结果为WorkResult{result=null, resultState=EXCEPTION, ex=java.lang.ArithmeticException: / by zero} + + + 结束成功 + */ + /*if(i==20000){ + int a=1/0; + }*/ + /* + 第三种问题,啥也不做就是等待,结果执行超时,WorkResult{result=null, resultState=TIMEOUT, ex=null},AsyncTool会在超时时发出中断指令,停止运行 + + + 超时,结束成功 + */ + /*try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + e.printStackTrace(); + //如果将下面的语句注释,那么任务将永远不会结束 + throw new RuntimeException("被中断"); + }*/ } } + if ("H".equals(id)) { + /** + * 最后一个节点是否会被回调 + * + * 第一种问题下不会回调 + * 第二种问题下不会回调 + * 第三种问题下不会回调 + */ + System.out.println("H被回调"); + } return id; } diff --git a/asyncTool-core/src/test/java/v15/cases/Case16.java b/asyncTool-core/src/test/java/v15/cases/Case16.java new file mode 100644 index 0000000..f86fac1 --- /dev/null +++ b/asyncTool-core/src/test/java/v15/cases/Case16.java @@ -0,0 +1,199 @@ +package v15.cases; + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.worker.OnceWork; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; +import com.jd.platform.async.wrapper.WorkerWrapperBuilder; + +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** + * 示例:模拟线程池资源不够用的情况 + * + * @author create by kyle + */ +class Case16 { + + private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, + 30L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(10), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + System.out.println("Task " + r.toString() + + " rejected from " + + e.toString()); + if (!e.isShutdown()) { + r.run(); + } + } + }); + + private static WorkerWrapperBuilder builder(String id) { + + return WorkerWrapper.builder() + .id(id) + .param(UUID.randomUUID().toString()) + .worker(new MyWorker(id)) + .callback((new ICallback() { + @Override + public void begin() { + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + /* System.out.println("(id=" + id + ") callback " + + (success ? "success " : "fail ") + + ", workResult is " + workResult);*/ + } + })) + .allowInterrupt(true); + } + + public static void main(String[] args) { + //任务数量 + final int count = 40; + //超时时间 + int timeout = 10000; + List workList = new ArrayList<>(count); + //每个任务约23个节点 + for (int i = 0; i < count; i++) { + WorkerWrapper a = builder("A").build(); + WorkerWrapper d; + WorkerWrapper k; + WorkerWrapper n; + WorkerWrapper q; + WorkerWrapper t; + WorkerWrapper w; + WorkerWrapper build = builder("H") + .depends( + builder("F") + .depends(builder("B").depends(a).build()) + .depends(builder("C").depends(a).build()) + .build(), + builder("G") + .depends(builder("E") + .depends(d = builder("D").build()) + .build()) + .build(), + builder("I") + .depends(builder("J") + .depends(k = builder("K").build()) + .build()) + .build(), + builder("L") + .depends(builder("M") + .depends(n = builder("N").build()) + .build()) + .build(), + builder("O") + .depends(builder("P") + .depends(q = builder("Q").build()) + .build()) + .build(), + builder("R") + .depends(builder("S") + .depends(t = builder("T").build()) + .build()) + .build(), + builder("U") + .depends(builder("V") + .depends(w = builder("W").build()) + .build()) + .build() + ) + .build(); + OnceWork work = Async.work(timeout, executor, a, d, k, n, q, t, w); + workList.add(work); + } + + while (true) { + long finishCount = workList.stream().filter(OnceWork::isFinish).count(); + if (finishCount == count) { + break; + } + } + for (OnceWork work : workList) { + try { + System.out.println("cost:" + (work.getFinishTime() - work.getStartTime())); + } catch (IllegalStateException e) { + } + } + long cancelCount = workList.stream().filter(onceWork -> onceWork.isCancelled() || onceWork.isWaitingCancel()).count(); + long timeoutCount = workList.stream().filter(OnceWork::hasTimeout).count(); + long finishCount = workList.stream().filter(OnceWork::isFinish).count(); + + System.out.println("取消数量" + cancelCount); + System.out.println("超时数量" + timeoutCount); + System.out.println("完成数量" + finishCount); + + } + + private static class MyWorker implements IWorker { + + //用于存放模拟的对象,防止GC回收,用List做对象引用 + private final List list = new LinkedList<>(); + + private final String id; + + private final int i = 0; + + public MyWorker(String id) { + this.id = id; + } + + @Override + public String action(String param, Map> allWrappers) { + if ("F".equals(id)) { + while (true) { + /* + 第一种问题,内存溢出OOM,由系统取消任务执行,H的结果为{result=null, resultState=DEFAULT, ex=null},因为没有跑到H,所以H的结果为null + + + 取消成功,结束成功 + */ + /*byte[] buf = new byte[1024 * 1024]; + list.add(buf);*/ + /* + 第二种问题,存在异常,H的结果为WorkResult{result=null, resultState=EXCEPTION, ex=java.lang.ArithmeticException: / by zero} + + + 结束成功 + */ + /*if(i==20000){ + int a=1/0; + }*/ + /* + 第三种问题,啥也不做就是等待,结果执行超时,WorkResult{result=null, resultState=TIMEOUT, ex=null},AsyncTool会在超时时发出中断指令,停止运行 + + + 超时,结束成功 + */ + /*try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + e.printStackTrace(); + //如果将下面的语句注释,那么任务将永远不会结束 + throw new RuntimeException("被中断"); + }*/ + + + //模拟有任务不退出的情况 + System.out.println(param + " running"); + Thread.yield(); + } + } + return id; + } + + } + +} + -- Gitee