From 5730241a80a85e631a505f6b5b728b94470d0b0c Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Thu, 30 Jun 2022 10:07:23 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=8E=92=E9=99=A425=E6=AC=A1=E5=BE=AA?= =?UTF-8?q?=E7=8E=AF=E5=8F=96=E5=80=BC=EF=BC=8C=E8=BF=99=E4=B8=AA=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E8=BF=98=E6=98=AF=E4=B8=8D=E7=A8=B3=E5=AE=9A=EF=BC=8C?= =?UTF-8?q?=E6=9C=89=E5=8F=AF=E8=83=BD=E6=8B=BF=E4=B8=8D=E5=88=B0=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 现在的做法是,只有当结果成功被设置,才将WORKING转为AFTER_WORK状态,结束任务 --- .../platform/async/wrapper/WorkerWrapper.java | 52 ++++++------------- .../src/test/java/v15/cases/Case1.java | 25 +++++++-- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index c0145ab..bc07acc 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -1,15 +1,15 @@ package com.jd.platform.async.wrapper; -import com.jd.platform.async.exception.CancelException; -import com.jd.platform.async.exception.EndsNormallyException; -import com.jd.platform.async.worker.ResultState; -import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.callback.DefaultCallback; import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.exception.CancelException; +import com.jd.platform.async.exception.EndsNormallyException; import com.jd.platform.async.exception.SkippedException; import com.jd.platform.async.executor.PollingCenter; import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.ResultState; +import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.wrapper.strategy.WrapperStrategy; import com.jd.platform.async.wrapper.strategy.depend.DependMustStrategyMapper; import com.jd.platform.async.wrapper.strategy.depend.DependOnUpWrapperStrategyMapper; @@ -25,18 +25,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.AFTER_WORK; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.BUILDING; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.ERROR; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.INIT; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.SKIP; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.STARTED; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.SUCCESS; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.WORKING; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_all; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_beforeWorkingEnd; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_notWorked; -import static com.jd.platform.async.wrapper.WorkerWrapper.State.states_of_skipOrAfterWork; import static com.jd.platform.async.wrapper.WorkerWrapper.State.*; /** @@ -268,17 +256,6 @@ public abstract class WorkerWrapper { final Consumer __function__callbackResult = success -> { WorkResult _workResult = getWorkResult(); - /* - 如果不循环拿,则很容易拿到空值(用户有可能拿到值,也有可能拿到null), - 但如果一定要空值的话,那么尝试25次之后就允许, - 这是个魔法值,如果有更合适的设计请修改这里。 - 比如将getWorkResult()方法的调用交给用户, - 但用户必须明确知道会有这种情况发生 - */ - int count = 25; - while (_workResult.getResultState() == ResultState.DEFAULT && count-- > 0) { - _workResult = getWorkResult(); - } try { callback.result(success, param, _workResult); } catch (Exception e) { @@ -302,18 +279,18 @@ public abstract class WorkerWrapper { () -> { if (setState(state, STARTED, WORKING)) { try { - fire(group); + if (fire(group)) { + if (setState(state, WORKING, AFTER_WORK)) { + __function__callbackResult.accept(true); + beginNext(executorService, now, remainTime, group); + } + } } catch (Exception e) { if (setState(state, WORKING, ERROR)) { __function__fastFail_callbackResult$false_beginNext.accept(false, e); } - return; } } - if (setState(state, WORKING, AFTER_WORK)) { - __function__callbackResult.accept(true); - beginNext(executorService, now, remainTime, group); - } }; // ================================================ // 开始执行 @@ -375,7 +352,9 @@ public abstract class WorkerWrapper { case TAKE_REST: //FIXME 等待200毫秒重新投入线程池,主要为了调起最后一个任务 Thread.sleep(200L); - executorService.submit(() -> this.work(executorService, fromWrapper, remainTime-(SystemClock.now()-now), group)); + System.out.println(id+"进入休息"); + executorService.submit(() -> this.work(executorService, fromWrapper, + remainTime - (SystemClock.now() - now), group)); return; case FAST_FAIL: if (setState(state, STARTED, ERROR)) { @@ -410,13 +389,14 @@ public abstract class WorkerWrapper { * 本工作线程执行自己的job. *

* 本方法不负责校验状态。请在调用前自行检验 + * @return */ - protected void fire(WorkerWrapperGroup group) { + protected boolean fire(WorkerWrapperGroup group) { try { doWorkingThread.set(Thread.currentThread()); //执行耗时操作 V result = worker.action(param, group.getForParamUseWrappers()); - workResult.compareAndSet( + return workResult.compareAndSet( null, new WorkResult<>(result, ResultState.SUCCESS) ); diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index 29059cb..5711ca1 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -1,7 +1,9 @@ package v15.cases; +import com.jd.platform.async.callback.ICallback; import com.jd.platform.async.executor.Async; import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapperBuilder; @@ -19,7 +21,7 @@ class Case1 { try { if ("F".equals(id)) { System.out.println("wrapper(id=" + id + ") is working"); - Thread.sleep(12000); + Thread.sleep(100); } else { System.out.println("wrapper(id=" + id + ") is worki444ng"); } @@ -27,13 +29,26 @@ class Case1 { e.printStackTrace(); } return id; - }); + }).callback((new ICallback() { + @Override + public void begin() { + System.out.println("wrapper(id=" + id + ") has begin . "); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("\t\twrapper(id=" + id + ") callback " + + (success ? "success " : "fail ") + + ", workResult is " + workResult); + } + })) + .allowInterrupt(true); } public static void main(String[] args) { long now = SystemClock.now(); WorkerWrapper a = builder("A").build(); - WorkerWrapper d; + WorkerWrapper d = builder("D").build(); builder("H") .depends( builder("F") @@ -42,13 +57,13 @@ class Case1 { .build(), builder("G") .depends(builder("E") - .depends(d = builder("D").build()) + .depends(d) .build()) .build() ) .build(); try { - Async.work(10000, a, d).awaitFinish(); + Async.work(5000, a, d).awaitFinish(); } catch (InterruptedException e) { e.printStackTrace(); } -- Gitee