diff --git a/rec-spi/src/main/java/cn/icanci/rec/spi/event/AbstractEventDispatcher.java b/rec-spi/src/main/java/cn/icanci/rec/spi/event/AbstractEventDispatcher.java index bc21f85e5a230f9f7b086be98447bf1c1fa618e8..9b5b592f17980e2ed495880436313603856f333d 100644 --- a/rec-spi/src/main/java/cn/icanci/rec/spi/event/AbstractEventDispatcher.java +++ b/rec-spi/src/main/java/cn/icanci/rec/spi/event/AbstractEventDispatcher.java @@ -1,10 +1,7 @@ package cn.icanci.rec.spi.event; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.*; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; @@ -21,43 +18,46 @@ public abstract class AbstractEventDispatcher implements EventDispatcher, Applic * Spring 容器 */ protected ApplicationContext applicationContext; - /** * 事件监听器列表 */ protected Map, List> eventListMap = new ConcurrentHashMap<>(); - /** * 监听器 */ protected List listeners = new LinkedList(); - /** * 注册事件类型 */ protected Set> eventClasses = new HashSet<>(); - /** * 排序器 */ protected static final ListenerComparator LISTENER_COMPARATOR = new ListenerComparator(); - /** - * Asynchronous executor + * 线程池核心大小 + */ + private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); + /** + * 线程池 */ - protected Executor taskExecutor; + protected static final ThreadPoolExecutor eventPool = new ThreadPoolExecutor(CORE_SIZE, // + CORE_SIZE << 1, // + 60L, // + TimeUnit.SECONDS, // + new LinkedBlockingQueue<>(2000), // + runnable -> new Thread(runnable, "AbstractEventDispatcher Pool-" + runnable.hashCode()), // + (r, executor) -> { + throw new RuntimeException("AbstractEventDispatcher Pool is EXHAUSTED!"); + }); - /**锁*/ - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Object lock = new Object(); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; - try { - lock.writeLock().lock(); + synchronized (lock) { register(); - } finally { - lock.writeLock().unlock(); } } @@ -71,12 +71,7 @@ public abstract class AbstractEventDispatcher implements EventDispatcher, Applic * * @return 事件执行线程池 */ - protected abstract Executor getTaskExecutor(); - - /** - * 设置任务执行器 - * - * @param taskExecutor 事件执行线程池 - */ - public abstract void setTaskExecutor(Executor taskExecutor); + protected Executor getTaskPool() { + return eventPool; + } } diff --git a/rec-spi/src/main/java/cn/icanci/rec/spi/event/DefaultEventDispatcher.java b/rec-spi/src/main/java/cn/icanci/rec/spi/event/DefaultEventDispatcher.java index 3b88b0a78e00f11c0a2fe0957b45a105c55fb939..af9d2b3efe357285a7b110a156aa61c204dad56b 100644 --- a/rec-spi/src/main/java/cn/icanci/rec/spi/event/DefaultEventDispatcher.java +++ b/rec-spi/src/main/java/cn/icanci/rec/spi/event/DefaultEventDispatcher.java @@ -6,7 +6,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.function.Consumer; /** @@ -84,28 +83,6 @@ public class DefaultEventDispatcher extends AbstractEventDispatcher { return eventClass == actualTypeArgument; } - /** - * 获取任务执行器 - * - * @return 任务执行器 - */ - @Override - protected Executor getTaskExecutor() { - if (taskExecutor == null) { - taskExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("beedong-event", true)); - } - return taskExecutor; - } - - /** - * 设置任务执行器 - * - * @param taskExecutor taskExecutor - */ - public void setTaskExecutor(Executor taskExecutor) { - this.taskExecutor = taskExecutor; - } - /** * 移除监听器 * @@ -148,9 +125,9 @@ public class DefaultEventDispatcher extends AbstractEventDispatcher { listener.onEvent(baseEvent); } } : baseEventListeners -> { - Executor taskExecutor = getTaskExecutor(); + Executor taskExecutor = getTaskPool(); for (final BaseEventListener listener : baseEventListeners) { - taskExecutor.execute(new AnonymityEventHandler(baseEvent, listener)); + taskExecutor.execute(new EventHandler(baseEvent, listener)); } }; diff --git a/rec-spi/src/main/java/cn/icanci/rec/spi/event/AnonymityEventHandler.java b/rec-spi/src/main/java/cn/icanci/rec/spi/event/EventHandler.java similarity index 73% rename from rec-spi/src/main/java/cn/icanci/rec/spi/event/AnonymityEventHandler.java rename to rec-spi/src/main/java/cn/icanci/rec/spi/event/EventHandler.java index 6810aa52f5dfd13f53716642e72d8d6494fc2a32..41577473df959c46d4cd29b25c6cf4b5e5760f9c 100644 --- a/rec-spi/src/main/java/cn/icanci/rec/spi/event/AnonymityEventHandler.java +++ b/rec-spi/src/main/java/cn/icanci/rec/spi/event/EventHandler.java @@ -6,14 +6,14 @@ package cn.icanci.rec.spi.event; * @author icanci * @since 1.0 Created in 2022/11/11 18:01 */ -public class AnonymityEventHandler implements Runnable { +public class EventHandler implements Runnable { /** 事件 */ private BaseEvent event; /** 事件监听器 */ private BaseEventListener listener; - public AnonymityEventHandler(BaseEvent event, BaseEventListener listener) { + public EventHandler(BaseEvent event, BaseEventListener listener) { this.event = event; this.listener = listener; } diff --git a/rec-spi/src/main/java/cn/icanci/rec/spi/event/NamedThreadFactory.java b/rec-spi/src/main/java/cn/icanci/rec/spi/event/NamedThreadFactory.java deleted file mode 100644 index 82cb91056adfedbcd64eaf56e1a7db5451f9ccb4..0000000000000000000000000000000000000000 --- a/rec-spi/src/main/java/cn/icanci/rec/spi/event/NamedThreadFactory.java +++ /dev/null @@ -1,34 +0,0 @@ -package cn.icanci.rec.spi.event; - -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author icanci - * @since 1.0 Created in 2022/11/11 18:01 - */ -public class NamedThreadFactory implements ThreadFactory { - - private final AtomicInteger threadNum = new AtomicInteger(1); - - private final String prefix; - - private final boolean daemon; - - private final ThreadGroup group; - - public NamedThreadFactory(String prefix, boolean daemon) { - this.prefix = prefix + "-thread-"; - this.daemon = daemon; - SecurityManager s = System.getSecurityManager(); - group = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); - } - - @Override - public Thread newThread(Runnable runnable) { - String name = prefix + threadNum.getAndIncrement(); - Thread ret = new Thread(group, runnable, name, 0); - ret.setDaemon(daemon); - return ret; - } -}