From fdc91c8a9f5af5674346ba15f0680ef5ecbfc0a3 Mon Sep 17 00:00:00 2001 From: kyao <24456865@qq.com> Date: Fri, 1 Dec 2023 11:16:01 +0800 Subject: [PATCH] =?UTF-8?q?fix:=201.=E4=BF=AE=E5=A4=8D=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E6=B3=A8=E8=A7=A3=E6=B3=A8=E5=85=A5ScheduledThreadPoolExecutor?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=E6=8A=A5=E9=94=99=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98=202.=E4=BF=AE=E5=A4=8DScheduledDtpExecutor=E4=B8=8D?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=B6=85=E6=97=B6=E5=91=8A=E8=AD=A6=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/executor/ScheduledDtpExecutor.java | 8 +- .../core/spring/DtpPostProcessor.java | 16 +++- .../ScheduledThreadPoolExecutorProxy.java | 86 +++++++++++++++++++ .../core/support/ThreadPoolCreator.java | 1 - .../dynamictp/example/config/TpConfig.java | 41 +++++++++ .../example/controller/TestController.java | 28 +++++- 6 files changed, 173 insertions(+), 7 deletions(-) create mode 100644 core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java create mode 100644 example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/config/TpConfig.java diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/ScheduledDtpExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/ScheduledDtpExecutor.java index 78e034aa..25c83276 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/ScheduledDtpExecutor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/ScheduledDtpExecutor.java @@ -18,6 +18,7 @@ package org.dromara.dynamictp.core.executor; import org.dromara.dynamictp.common.em.JreEnum; +import org.dromara.dynamictp.core.support.ScheduledThreadPoolExecutorProxy; import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -58,7 +59,7 @@ public class ScheduledDtpExecutor extends DtpExecutor implements ScheduledExecut if (JreEnum.JAVA_8.isCurrentVersion()) { corePoolSize = corePoolSize == 0 ? 1 : corePoolSize; } - delegate = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler); + delegate = new ScheduledThreadPoolExecutorProxy(new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler)); } @Override @@ -263,6 +264,11 @@ public class ScheduledDtpExecutor extends DtpExecutor implements ScheduledExecut public long getCompletedTaskCount() { return delegate.getCompletedTaskCount(); } + + @Override + public ScheduledThreadPoolExecutor getOriginal() { + return delegate; + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java index ac2184d9..cde0e35e 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java @@ -29,6 +29,7 @@ import org.dromara.dynamictp.core.executor.eager.TaskQueue; import org.dromara.dynamictp.core.plugin.DtpInterceptorRegistry; import org.dromara.dynamictp.core.support.DynamicTp; import org.dromara.dynamictp.core.support.ExecutorWrapper; +import org.dromara.dynamictp.core.support.ScheduledThreadPoolExecutorProxy; import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; @@ -48,6 +49,8 @@ import java.util.Collections; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync; @@ -130,7 +133,12 @@ public class DtpPostProcessor implements BeanPostProcessor, BeanFactoryAware, Pr DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE); return bean; } - val proxy = newProxy(poolName, (ThreadPoolExecutor) bean); + Executor proxy; + if (bean instanceof ScheduledThreadPoolExecutor) { + proxy = newScheduledTpProxy(poolName, (ScheduledThreadPoolExecutor) bean); + } else { + proxy = newProxy(poolName, (ThreadPoolExecutor) bean); + } DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE); return proxy; } @@ -150,4 +158,10 @@ public class DtpPostProcessor implements BeanPostProcessor, BeanFactoryAware, Pr shutdownGracefulAsync(originExecutor, name, 0); return proxy; } + + private ScheduledThreadPoolExecutorProxy newScheduledTpProxy(String name, ScheduledThreadPoolExecutor originExecutor) { + val proxy = new ScheduledThreadPoolExecutorProxy(originExecutor); + shutdownGracefulAsync(originExecutor, name, 0); + return proxy; + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java b/core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java new file mode 100644 index 00000000..0d30c14d --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.core.support; + +import org.dromara.dynamictp.core.aware.AwareManager; +import org.dromara.dynamictp.core.aware.RejectHandlerAware; +import org.dromara.dynamictp.core.aware.TaskEnhanceAware; +import org.dromara.dynamictp.core.reject.RejectHandlerGetter; +import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * ScheduledThreadPoolExecutorProxy related + * The schedule method does not support queue timeout monitoring + * + * @author kyao + * @since 1.1.5 + */ +public class ScheduledThreadPoolExecutorProxy extends ScheduledThreadPoolExecutor implements TaskEnhanceAware, RejectHandlerAware { + + /** + * Task wrappers, do sth enhanced. + */ + private List taskWrappers; + + /** + * Reject handler type. + */ + private final String rejectHandlerType; + + public ScheduledThreadPoolExecutorProxy(ScheduledThreadPoolExecutor executor) { + super(executor.getCorePoolSize(), executor.getThreadFactory()); + this.rejectHandlerType = executor.getRejectedExecutionHandler().getClass().getSimpleName(); + setRejectedExecutionHandler(RejectHandlerGetter.getProxy(getRejectedExecutionHandler())); + } + + @Override + public void execute(Runnable command) { + command = getEnhancedTask(command); + AwareManager.execute(this, command); + super.execute(command); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + AwareManager.beforeExecute(this, t, r); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + AwareManager.afterExecute(this, r, t); + } + + @Override + public String getRejectHandlerType() { + return rejectHandlerType; + } + + @Override + public List getTaskWrappers() { + return taskWrappers; + } + + @Override + public void setTaskWrappers(List taskWrappers) { + this.taskWrappers = taskWrappers; + } +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolCreator.java b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolCreator.java index b2822cc9..376faa00 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolCreator.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolCreator.java @@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import static org.dromara.dynamictp.common.em.QueueTypeEnum.SYNCHRONOUS_QUEUE; import static org.dromara.dynamictp.common.em.QueueTypeEnum.VARIABLE_LINKED_BLOCKING_QUEUE; /** diff --git a/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/config/TpConfig.java b/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/config/TpConfig.java new file mode 100644 index 00000000..73bee5c6 --- /dev/null +++ b/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/config/TpConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.config; + +import org.dromara.dynamictp.core.executor.NamedThreadFactory; +import org.dromara.dynamictp.core.support.DynamicTp; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * @author kyao + * @since 1.1.5 + */ +@Configuration +public class TpConfig { + + @Bean + @DynamicTp("scheduledThreadPoolExecutor") + public ScheduledThreadPoolExecutor scheduledThreadPoolExecutor() { + return new ScheduledThreadPoolExecutor(8, + new NamedThreadFactory("scheduled")); + } + +} diff --git a/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/controller/TestController.java b/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/controller/TestController.java index c941217b..8f8c0833 100644 --- a/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/controller/TestController.java +++ b/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/controller/TestController.java @@ -18,10 +18,13 @@ package org.dromara.dynamictp.example.controller; import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.core.executor.ScheduledDtpExecutor; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; + import javax.annotation.Resource; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author kyao @@ -32,18 +35,35 @@ import java.util.concurrent.ThreadPoolExecutor; public class TestController { @Resource - private ThreadPoolExecutor testExecutor; + private ScheduledDtpExecutor testExecutor; + + @Resource + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; @GetMapping("/dtp-example-adapter/testWebserver") public String testWebserver() throws InterruptedException { - testExecutor.execute(() -> { + testExecutor.schedule(() -> { + try { + Thread.sleep((int) (Math.random() * 1000)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + log.info("success"); + }, 1, TimeUnit.SECONDS); + return "success"; + } + + @GetMapping("/dtp-example-adapter/testScheduleExecutor") + public String testScheduleExecutor() throws InterruptedException { + scheduledThreadPoolExecutor.schedule(() -> { try { Thread.sleep((int) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } log.info("success"); - }); + }, 1, TimeUnit.SECONDS); + Thread.sleep(2000); return "success"; } } -- Gitee