diff --git a/core/src/main/java/org/dromara/dynamictp/core/executor/ScheduledDtpExecutor.java b/core/src/main/java/org/dromara/dynamictp/core/executor/ScheduledDtpExecutor.java index 78e034aa47058b6ff5d8e50f0746e688c1367226..25c832768977d1077aedbe00a60989f8e0913506 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/executor/ScheduledDtpExecutor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/executor/ScheduledDtpExecutor.java @@ -18,6 +18,7 @@ package org.dromara.dynamictp.core.executor; import org.dromara.dynamictp.common.em.JreEnum; +import org.dromara.dynamictp.core.support.ScheduledThreadPoolExecutorProxy; import java.util.Collection; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -58,7 +59,7 @@ public class ScheduledDtpExecutor extends DtpExecutor implements ScheduledExecut if (JreEnum.JAVA_8.isCurrentVersion()) { corePoolSize = corePoolSize == 0 ? 1 : corePoolSize; } - delegate = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler); + delegate = new ScheduledThreadPoolExecutorProxy(new ScheduledThreadPoolExecutor(corePoolSize, threadFactory, handler)); } @Override @@ -263,6 +264,11 @@ public class ScheduledDtpExecutor extends DtpExecutor implements ScheduledExecut public long getCompletedTaskCount() { return delegate.getCompletedTaskCount(); } + + @Override + public ScheduledThreadPoolExecutor getOriginal() { + return delegate; + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java index ac2184d98671eab88a1a2176c2a25ec38c19e387..cde0e35e8af93d023d1425a37f263d6a62837b91 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java +++ b/core/src/main/java/org/dromara/dynamictp/core/spring/DtpPostProcessor.java @@ -29,6 +29,7 @@ import org.dromara.dynamictp.core.executor.eager.TaskQueue; import org.dromara.dynamictp.core.plugin.DtpInterceptorRegistry; import org.dromara.dynamictp.core.support.DynamicTp; import org.dromara.dynamictp.core.support.ExecutorWrapper; +import org.dromara.dynamictp.core.support.ScheduledThreadPoolExecutorProxy; import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; @@ -48,6 +49,8 @@ import java.util.Collections; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import static org.dromara.dynamictp.core.support.DtpLifecycleSupport.shutdownGracefulAsync; @@ -130,7 +133,12 @@ public class DtpPostProcessor implements BeanPostProcessor, BeanFactoryAware, Pr DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE); return bean; } - val proxy = newProxy(poolName, (ThreadPoolExecutor) bean); + Executor proxy; + if (bean instanceof ScheduledThreadPoolExecutor) { + proxy = newScheduledTpProxy(poolName, (ScheduledThreadPoolExecutor) bean); + } else { + proxy = newProxy(poolName, (ThreadPoolExecutor) bean); + } DtpRegistry.registerExecutor(new ExecutorWrapper(poolName, proxy), REGISTER_SOURCE); return proxy; } @@ -150,4 +158,10 @@ public class DtpPostProcessor implements BeanPostProcessor, BeanFactoryAware, Pr shutdownGracefulAsync(originExecutor, name, 0); return proxy; } + + private ScheduledThreadPoolExecutorProxy newScheduledTpProxy(String name, ScheduledThreadPoolExecutor originExecutor) { + val proxy = new ScheduledThreadPoolExecutorProxy(originExecutor); + shutdownGracefulAsync(originExecutor, name, 0); + return proxy; + } } diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java b/core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java new file mode 100644 index 0000000000000000000000000000000000000000..0d30c14d8acb5db99e044fc2e1037a8349812791 --- /dev/null +++ b/core/src/main/java/org/dromara/dynamictp/core/support/ScheduledThreadPoolExecutorProxy.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.core.support; + +import org.dromara.dynamictp.core.aware.AwareManager; +import org.dromara.dynamictp.core.aware.RejectHandlerAware; +import org.dromara.dynamictp.core.aware.TaskEnhanceAware; +import org.dromara.dynamictp.core.reject.RejectHandlerGetter; +import org.dromara.dynamictp.core.support.task.wrapper.TaskWrapper; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * ScheduledThreadPoolExecutorProxy related + * The schedule method does not support queue timeout monitoring + * + * @author kyao + * @since 1.1.5 + */ +public class ScheduledThreadPoolExecutorProxy extends ScheduledThreadPoolExecutor implements TaskEnhanceAware, RejectHandlerAware { + + /** + * Task wrappers, do sth enhanced. + */ + private List taskWrappers; + + /** + * Reject handler type. + */ + private final String rejectHandlerType; + + public ScheduledThreadPoolExecutorProxy(ScheduledThreadPoolExecutor executor) { + super(executor.getCorePoolSize(), executor.getThreadFactory()); + this.rejectHandlerType = executor.getRejectedExecutionHandler().getClass().getSimpleName(); + setRejectedExecutionHandler(RejectHandlerGetter.getProxy(getRejectedExecutionHandler())); + } + + @Override + public void execute(Runnable command) { + command = getEnhancedTask(command); + AwareManager.execute(this, command); + super.execute(command); + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + AwareManager.beforeExecute(this, t, r); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + AwareManager.afterExecute(this, r, t); + } + + @Override + public String getRejectHandlerType() { + return rejectHandlerType; + } + + @Override + public List getTaskWrappers() { + return taskWrappers; + } + + @Override + public void setTaskWrappers(List taskWrappers) { + this.taskWrappers = taskWrappers; + } +} diff --git a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolCreator.java b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolCreator.java index b2822cc9b55741186751b2e4e7e5d0112bb3ec34..376faa00e5d5b39641d430b67f617e8310249dfb 100644 --- a/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolCreator.java +++ b/core/src/main/java/org/dromara/dynamictp/core/support/ThreadPoolCreator.java @@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; -import static org.dromara.dynamictp.common.em.QueueTypeEnum.SYNCHRONOUS_QUEUE; import static org.dromara.dynamictp.common.em.QueueTypeEnum.VARIABLE_LINKED_BLOCKING_QUEUE; /** diff --git a/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/config/TpConfig.java b/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/config/TpConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..73bee5c671aab9cfd35066665e3def30baf951da --- /dev/null +++ b/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/config/TpConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.dromara.dynamictp.example.config; + +import org.dromara.dynamictp.core.executor.NamedThreadFactory; +import org.dromara.dynamictp.core.support.DynamicTp; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * @author kyao + * @since 1.1.5 + */ +@Configuration +public class TpConfig { + + @Bean + @DynamicTp("scheduledThreadPoolExecutor") + public ScheduledThreadPoolExecutor scheduledThreadPoolExecutor() { + return new ScheduledThreadPoolExecutor(8, + new NamedThreadFactory("scheduled")); + } + +} diff --git a/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/controller/TestController.java b/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/controller/TestController.java index c941217b35f04470c62290119c275449cc41b260..8f8c083355108f88ccc163a4dc44f27b7c458684 100644 --- a/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/controller/TestController.java +++ b/example/example-adapter/example-adapter-webserver/src/main/java/org/dromara/dynamictp/example/controller/TestController.java @@ -18,10 +18,13 @@ package org.dromara.dynamictp.example.controller; import lombok.extern.slf4j.Slf4j; +import org.dromara.dynamictp.core.executor.ScheduledDtpExecutor; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; + import javax.annotation.Resource; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * @author kyao @@ -32,18 +35,35 @@ import java.util.concurrent.ThreadPoolExecutor; public class TestController { @Resource - private ThreadPoolExecutor testExecutor; + private ScheduledDtpExecutor testExecutor; + + @Resource + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; @GetMapping("/dtp-example-adapter/testWebserver") public String testWebserver() throws InterruptedException { - testExecutor.execute(() -> { + testExecutor.schedule(() -> { + try { + Thread.sleep((int) (Math.random() * 1000)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + log.info("success"); + }, 1, TimeUnit.SECONDS); + return "success"; + } + + @GetMapping("/dtp-example-adapter/testScheduleExecutor") + public String testScheduleExecutor() throws InterruptedException { + scheduledThreadPoolExecutor.schedule(() -> { try { Thread.sleep((int) (Math.random() * 1000)); } catch (InterruptedException e) { e.printStackTrace(); } log.info("success"); - }); + }, 1, TimeUnit.SECONDS); + Thread.sleep(2000); return "success"; } }