From 3c309a6e99dee3ca9c647a35f65ba5cf9e6545c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Thu, 29 Jul 2021 20:14:19 +0800 Subject: [PATCH 1/6] =?UTF-8?q?1=E3=80=81=E4=BC=98=E5=8C=96IOUtil#close=20?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E3=80=82=202=E3=80=81AIOSession=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E6=96=B9=E6=B3=95=20readBuffer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aio-core/pom.xml | 2 +- .../socket/transport/AioSession.java | 6 +++ .../smartboot/socket/transport/IOUtil.java | 10 ++++- .../socket/transport/IoServerConfig.java | 2 +- .../socket/transport/TcpAioSession.java | 7 +++- aio-pro/pom.xml | 2 +- .../socket/transport/UdpAioSession.java | 6 +++ example/pom.xml | 4 +- pom.xml | 2 +- smart-socket-parent/pom.xml | 41 +------------------ 10 files changed, 34 insertions(+), 48 deletions(-) diff --git a/aio-core/pom.xml b/aio-core/pom.xml index ef7bec2c..a95379ee 100644 --- a/aio-core/pom.xml +++ b/aio-core/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.5.10 + 1.5.11-SNAPSHOT ../smart-socket-parent diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java index c5405710..d996af9c 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java @@ -12,6 +12,7 @@ package org.smartboot.socket.transport; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; /** @@ -55,6 +56,11 @@ public abstract class AioSession { */ public abstract WriteBuffer writeBuffer(); + /** + * 获取读缓冲区对象 + */ + public abstract ByteBuffer readBuffer(); + /** * 强制关闭当前AIOSession。 *

若此时还存留待输出的数据,则会导致该部分数据丢失

diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IOUtil.java b/aio-core/src/main/java/org/smartboot/socket/transport/IOUtil.java index e6dab7bc..046dc4eb 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IOUtil.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IOUtil.java @@ -11,6 +11,7 @@ package org.smartboot.socket.transport; import java.io.IOException; import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.NotYetConnectedException; /** * @author 三刀 @@ -21,13 +22,18 @@ final class IOUtil { * @param channel 需要被关闭的通道 */ public static void close(AsynchronousSocketChannel channel) { + boolean connected = true; try { channel.shutdownInput(); } catch (IOException ignored) { + } catch (NotYetConnectedException e) { + connected = false; } try { - channel.shutdownOutput(); - } catch (IOException ignored) { + if (connected) { + channel.shutdownOutput(); + } + } catch (IOException | NotYetConnectedException ignored) { } try { channel.close(); diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index 361cd975..4cc19825 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -39,7 +39,7 @@ final class IoServerConfig { /** * 当前smart-socket版本号 */ - public static final String VERSION = "v1.5.10"; + public static final String VERSION = "v1.5.11-SNAPSHOT"; /** * 消息体缓存大小,字节 diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/TcpAioSession.java b/aio-core/src/main/java/org/smartboot/socket/transport/TcpAioSession.java index b535d82b..7eb64a05 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/TcpAioSession.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/TcpAioSession.java @@ -165,10 +165,15 @@ final class TcpAioSession extends AioSession { /** * @return 输入流 */ - public final WriteBuffer writeBuffer() { + public WriteBuffer writeBuffer() { return byteBuf; } + @Override + public ByteBuffer readBuffer() { + return readBuffer.buffer(); + } + @Override public void awaitRead() { modCount++; diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml index 4a90e281..b45bdf41 100644 --- a/aio-pro/pom.xml +++ b/aio-pro/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.5.10 + 1.5.11-SNAPSHOT ../smart-socket-parent diff --git a/aio-pro/src/main/java/org/smartboot/socket/transport/UdpAioSession.java b/aio-pro/src/main/java/org/smartboot/socket/transport/UdpAioSession.java index 74b566aa..f66bd014 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/transport/UdpAioSession.java +++ b/aio-pro/src/main/java/org/smartboot/socket/transport/UdpAioSession.java @@ -14,6 +14,7 @@ import org.smartboot.socket.StateMachineEnum; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.ByteBuffer; /** * @author 三刀 @@ -39,6 +40,11 @@ final class UdpAioSession extends AioSession { return writeBuffer; } + @Override + public ByteBuffer readBuffer() { + throw new UnsupportedOperationException(); + } + @Override public void awaitRead() { throw new UnsupportedOperationException(); diff --git a/example/pom.xml b/example/pom.xml index f5f40441..38ac2ddd 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -26,12 +26,12 @@ org.smartboot.socket aio-pro - 1.5.10 + 1.5.11-SNAPSHOT org.smartboot.socket aio-core - 1.5.10 + 1.5.11-SNAPSHOT org.apache.commons diff --git a/pom.xml b/pom.xml index c053d54d..0883cd2a 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ org.smartboot.socket smart-socket-parent - 1.5.10 + 1.5.11-SNAPSHOT 2.6 diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml index ea5e1c4e..6746a5d4 100644 --- a/smart-socket-parent/pom.xml +++ b/smart-socket-parent/pom.xml @@ -15,14 +15,14 @@ 4.0.0 org.smartboot.socket smart-socket-parent - 1.5.10 + 1.5.11-SNAPSHOT pom UTF-8 1.7.29 - 1.5.10 + 1.5.11-SNAPSHOT @@ -175,43 +175,6 @@ deploy - - org.apache.maven.plugins - maven-pmd-plugin - - - rulesets/java/ali-comment.xml - rulesets/java/ali-concurrent.xml - rulesets/java/ali-constant.xml - rulesets/java/ali-exception.xml - rulesets/java/ali-flowcontrol.xml - rulesets/java/ali-naming.xml - rulesets/java/ali-oop.xml - rulesets/java/ali-orm.xml - rulesets/java/ali-other.xml - rulesets/java/ali-set.xml - - 1 - true - - - - validate - validate - - check - - - - - - - com.alibaba.p3c - p3c-pmd - 1.3.6 - - - maven-checkstyle-plugin 3.0.0 -- Gitee From 2a37ad704226691b33267453fcbac32bd8031163 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Thu, 29 Jul 2021 20:42:52 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E9=9B=86=E6=88=90aio-enhance?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aio-core/pom.xml | 28 +- .../socket/transport/AioQuickServer.java | 11 +- .../socket/transport/IoServerConfig.java | 12 + aio-enhance/pom.xml | 24 + .../socket/enhance/ByteBufferArray.java | 40 ++ .../EnhanceAsynchronousChannelGroup.java | 351 +++++++++++++ .../EnhanceAsynchronousChannelProvider.java | 58 +++ ...nhanceAsynchronousServerSocketChannel.java | 160 ++++++ .../EnhanceAsynchronousSocketChannel.java | 487 ++++++++++++++++++ .../enhance/FutureCompletionHandler.java | 111 ++++ smart-socket-parent/pom.xml | 6 + 11 files changed, 1262 insertions(+), 26 deletions(-) create mode 100644 aio-enhance/pom.xml create mode 100644 aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java create mode 100644 aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java create mode 100644 aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java create mode 100644 aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java create mode 100644 aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java create mode 100644 aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java diff --git a/aio-core/pom.xml b/aio-core/pom.xml index a95379ee..6dfbe9e7 100644 --- a/aio-core/pom.xml +++ b/aio-core/pom.xml @@ -22,26 +22,10 @@ 1.5.11-SNAPSHOT ../smart-socket-parent - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - true - public - -Xdoclint:none - - - - attach-javadocs - - jar - - - - - - + + + org.smartboot.socket + aio-enhance + + diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java index 4ae81023..16a9af38 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java @@ -15,6 +15,7 @@ import org.smartboot.socket.StateMachineEnum; import org.smartboot.socket.VirtualBufferFactory; import org.smartboot.socket.buffer.BufferFactory; import org.smartboot.socket.buffer.BufferPagePool; +import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; import java.io.IOException; import java.net.InetSocketAddress; @@ -24,6 +25,7 @@ import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; +import java.nio.channels.spi.AsynchronousChannelProvider; import java.security.InvalidParameterException; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; @@ -52,8 +54,6 @@ import java.util.function.Function; * @version V1.0.0 */ public final class AioQuickServer { - private static final String AIO_ENHANCE_PROVIDER = "org.smartboot.aio.EnhanceAsynchronousChannelProvider"; - private static final String ASYNCHRONOUS_CHANNEL_PROVIDER = "java.nio.channels.spi.AsynchronousChannelProvider"; /** * Server端服务配置。 *

调用AioQuickServer的各setXX()方法,都是为了设置config的各配置项

@@ -144,14 +144,17 @@ public final class AioQuickServer { this.innerBufferPool = bufferPool; } this.aioSessionFunction = aioSessionFunction; - if (AIO_ENHANCE_PROVIDER.equals(System.getProperty(ASYNCHRONOUS_CHANNEL_PROVIDER))) { + AsynchronousChannelProvider provider; + if (config.isAioEnhance()) { aioReadCompletionHandler = new ReadCompletionHandler(); + provider = new EnhanceAsynchronousChannelProvider(); } else { concurrentReadCompletionHandlerExecutor = new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); aioReadCompletionHandler = new ConcurrentReadCompletionHandler(new Semaphore(config.getThreadNum() - 1), concurrentReadCompletionHandlerExecutor); + provider = AsynchronousChannelProvider.provider(); } - asynchronousChannelGroup = AsynchronousChannelGroup.withFixedThreadPool(config.getThreadNum(), new ThreadFactory() { + asynchronousChannelGroup = provider.openAsynchronousChannelGroup(config.getThreadNum(), new ThreadFactory() { private byte index = 0; @Override diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index 4cc19825..c7a4d615 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -99,6 +99,10 @@ final class IoServerConfig { */ private BufferFactory bufferFactory = BufferFactory.DISABLED_BUFFER_FACTORY; + /** + * 启用 aio 增强 + */ + private boolean aioEnhance = true; /** * 获取默认内存块大小 @@ -234,6 +238,14 @@ final class IoServerConfig { this.backlog = backlog; } + public boolean isAioEnhance() { + return aioEnhance; + } + + public void setAioEnhance(boolean aioEnhance) { + this.aioEnhance = aioEnhance; + } + @Override public String toString() { return "IoServerConfig{" + diff --git a/aio-enhance/pom.xml b/aio-enhance/pom.xml new file mode 100644 index 00000000..e7347eeb --- /dev/null +++ b/aio-enhance/pom.xml @@ -0,0 +1,24 @@ + + + + + + smart-socket-parent + org.smartboot.socket + 1.5.11-SNAPSHOT + ../smart-socket-parent + + 4.0.0 + + aio-enhance + + \ No newline at end of file diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java new file mode 100644 index 00000000..1c8821df --- /dev/null +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java @@ -0,0 +1,40 @@ +/******************************************************************************* + * Copyright (c) 2017-2021, org.smartboot. All rights reserved. + * project name: smart-socket + * file name: ByteBufferArray.java + * Date: 2021-07-29 + * Author: sandao (zhengjunweimail@163.com) + * + ******************************************************************************/ + +package org.smartboot.socket.enhance; + +import java.nio.ByteBuffer; + + +/** + * @author 三刀 + */ +final class ByteBufferArray { + private final ByteBuffer[] buffers; + private final int offset; + private final int length; + + public ByteBufferArray(ByteBuffer[] buffers, int offset, int length) { + this.buffers = buffers; + this.offset = offset; + this.length = length; + } + + public ByteBuffer[] getBuffers() { + return buffers; + } + + public int getOffset() { + return offset; + } + + public int getLength() { + return length; + } +} \ No newline at end of file diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java new file mode 100644 index 00000000..b940f595 --- /dev/null +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java @@ -0,0 +1,351 @@ +/******************************************************************************* + * Copyright (c) 2017-2021, org.smartboot. All rights reserved. + * project name: smart-socket + * file name: EnhanceAsynchronousChannelGroup.java + * Date: 2021-07-29 + * Author: sandao (zhengjunweimail@163.com) + * + ******************************************************************************/ + +package org.smartboot.socket.enhance; + +import java.io.IOException; +import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.AsynchronousChannelProvider; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +/** + * @author 三刀 + * @version V1.0 , 2020/5/25 + */ +class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { + /** + * 递归回调次数上限 + */ + public static final int MAX_INVOKER = 8; + /** + * 写线程数 + */ + private static final String WRITE_THREAD_NUM = "org.smartboot.aio.writeThreadNum"; + /** + * accept线程数,该线程数只可少于等于进程内启用的服务端个数,多出无效 + */ + private static final String ACCEPT_THREAD_NUM = "org.smartboot.aio.acceptThreadNum"; + /** + * 读回调处理线程池,可用于业务处理 + */ + private final ExecutorService readExecutorService; + /** + * 写回调线程池 + */ + private final ExecutorService writeExecutorService; + /** + * write工作组 + */ + private final Worker[] writeWorkers; + /** + * read工作组 + */ + private final Worker[] readWorkers; + /** + * 线程池分配索引 + */ + private final AtomicInteger readIndex = new AtomicInteger(0); + private final AtomicInteger writeIndex = new AtomicInteger(0); + /** + * 定时任务线程池 + */ + private final ScheduledThreadPoolExecutor scheduledExecutor; + /** + * 服务端accept线程池 + */ + private final ExecutorService acceptExecutorService; + /** + * accept工作组 + */ + private final Worker[] acceptWorkers; + private Worker futureWorker; + /** + * 同步IO线程池 + */ + private ExecutorService futureExecutorService; + /** + * group运行状态 + */ + private boolean running = true; + + /** + * Initialize a new instance of this class. + * + * @param provider The asynchronous channel provider for this group + */ + protected EnhanceAsynchronousChannelGroup(AsynchronousChannelProvider provider, ExecutorService readExecutorService, int threadNum) throws IOException { + super(provider); + //init threadPool for read + this.readExecutorService = readExecutorService; + this.readWorkers = new Worker[threadNum]; + for (int i = 0; i < threadNum; i++) { + readWorkers[i] = new Worker(selectionKey -> { + EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); + asynchronousSocketChannel.doRead(true); + }); + this.readExecutorService.execute(readWorkers[i]); + } + + //init threadPool for write and connect + final int writeThreadNum = getIntSystemProperty(WRITE_THREAD_NUM, 1); + final int acceptThreadNum = getIntSystemProperty(ACCEPT_THREAD_NUM, 1); + writeExecutorService = getThreadPoolExecutor("smart-socket:write-", writeThreadNum); + this.writeWorkers = new Worker[writeThreadNum]; + + for (int i = 0; i < writeThreadNum; i++) { + writeWorkers[i] = new Worker(selectionKey -> { + EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); + asynchronousSocketChannel.doWrite(); + }); + writeExecutorService.execute(writeWorkers[i]); + } + + //init threadPool for accept + acceptExecutorService = getThreadPoolExecutor("smart-socket:connect-", acceptThreadNum); + acceptWorkers = new Worker[acceptThreadNum]; + for (int i = 0; i < acceptThreadNum; i++) { + acceptWorkers[i] = new Worker(selectionKey -> { + if (selectionKey.isAcceptable()) { + EnhanceAsynchronousServerSocketChannel serverSocketChannel = (EnhanceAsynchronousServerSocketChannel) selectionKey.attachment(); + serverSocketChannel.doAccept(); + } else if (selectionKey.isConnectable()) { + EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); + asynchronousSocketChannel.doConnect(); + } + }); + acceptExecutorService.execute(acceptWorkers[i]); + } + + scheduledExecutor = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "smart-socket:scheduled")); + } + + /** + * 同步IO注册异步线程,防止主IO线程阻塞 + * + * @param register + * @throws IOException + */ + public synchronized void registerFuture(Consumer register, int opType) throws IOException { + if (futureWorker == null) { + futureExecutorService = getThreadPoolExecutor("smart-socket:future-", 1); + futureWorker = new Worker(selectionKey -> { + EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); + switch (opType) { + case SelectionKey.OP_READ: + removeOps(selectionKey, SelectionKey.OP_READ); + asynchronousSocketChannel.doRead(true); + break; + case SelectionKey.OP_WRITE: + removeOps(selectionKey, SelectionKey.OP_WRITE); + asynchronousSocketChannel.doWrite(); + break; + default: + throw new UnsupportedOperationException("unSupport opType: " + opType); + } + + }); + futureExecutorService.execute(futureWorker); + } + futureWorker.addRegister(register); + } + + private ThreadPoolExecutor getThreadPoolExecutor(final String prefix, int threadNum) { + return new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), new ThreadFactory() { + private final AtomicInteger atomicInteger = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, prefix + atomicInteger.getAndIncrement()); + } + }); + } + + private int getIntSystemProperty(String key, int defaultValue) { + String value = System.getProperty(key); + if (value == null || value.length() == 0) { + return defaultValue; + } + try { + return Integer.parseInt(value); + } catch (Exception e) { + e.printStackTrace(); + } + return defaultValue; + } + + + /** + * 移除关注事件 + * + * @param selectionKey 待操作的selectionKey + * @param opt 移除的事件 + */ + public void removeOps(SelectionKey selectionKey, int opt) { + if (selectionKey.isValid() && (selectionKey.interestOps() & opt) != 0) { + selectionKey.interestOps(selectionKey.interestOps() & ~opt); + } + } + + public Worker getReadWorker() { + return readWorkers[(readIndex.getAndIncrement() & Integer.MAX_VALUE) % readWorkers.length]; + } + + public Worker getWriteWorker() { + return writeWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % writeWorkers.length]; + } + + public Worker getAcceptWorker() { + return acceptWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % acceptWorkers.length]; + } + + public Worker getConnectWorker() { + return acceptWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % acceptWorkers.length]; + } + + public ScheduledThreadPoolExecutor getScheduledExecutor() { + return scheduledExecutor; + } + + @Override + public boolean isShutdown() { + return readExecutorService.isShutdown(); + } + + @Override + public boolean isTerminated() { + return readExecutorService.isTerminated(); + } + + @Override + public void shutdown() { + running = false; + readExecutorService.shutdown(); + writeExecutorService.shutdown(); + if (acceptExecutorService != null) { + acceptExecutorService.shutdown(); + } + if (futureExecutorService != null) { + futureExecutorService.shutdown(); + } + scheduledExecutor.shutdown(); + } + + @Override + public void shutdownNow() { + running = false; + readExecutorService.shutdownNow(); + writeExecutorService.shutdownNow(); + if (acceptExecutorService != null) { + acceptExecutorService.shutdownNow(); + } + if (futureExecutorService != null) { + futureExecutorService.shutdownNow(); + } + scheduledExecutor.shutdownNow(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return readExecutorService.awaitTermination(timeout, unit); + } + + public void interestOps(Worker worker, SelectionKey selectionKey, int opt) { + if ((selectionKey.interestOps() & opt) != 0) { + return; + } + selectionKey.interestOps(selectionKey.interestOps() | opt); + //Worker线程无需wakeup + if (worker.getWorkerThread() != Thread.currentThread()) { + selectionKey.selector().wakeup(); + } + } + + class Worker implements Runnable { + /** + * 当前Worker绑定的Selector + */ + private final Selector selector; + /** + * 待注册的事件 + */ + private final ConcurrentLinkedQueue> registers = new ConcurrentLinkedQueue<>(); + private final Consumer consumer; + int invoker = 0; + int modCount; + private Thread workerThread; + + Worker(Consumer consumer) throws IOException { + this.selector = Selector.open(); + this.consumer = consumer; + } + + /** + * 注册事件 + */ + final void addRegister(Consumer register) { + registers.offer(register); + modCount++; + selector.wakeup(); + } + + public final Thread getWorkerThread() { + return workerThread; + } + + @Override + public final void run() { + workerThread = Thread.currentThread(); + // 优先获取SelectionKey,若无关注事件触发则阻塞在selector.select(),减少select被调用次数 + Set keySet = selector.selectedKeys(); + try { + int v = 0; + while (running) { + Consumer register; + if (v != modCount) { + v = modCount; + while ((register = registers.poll()) != null) { + register.accept(selector); + } + } + + selector.select(); + Iterator keyIterator = keySet.iterator(); + // 执行本次已触发待处理的事件 + while (keyIterator.hasNext()) { + SelectionKey key = keyIterator.next(); + invoker = 0; + keyIterator.remove(); + consumer.accept(key); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + selector.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } +} diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java new file mode 100644 index 00000000..1be645e2 --- /dev/null +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelProvider.java @@ -0,0 +1,58 @@ +/******************************************************************************* + * Copyright (c) 2017-2021, org.smartboot. All rights reserved. + * project name: smart-socket + * file name: EnhanceAsynchronousChannelProvider.java + * Date: 2021-07-29 + * Author: sandao (zhengjunweimail@163.com) + * + ******************************************************************************/ + +package org.smartboot.socket.enhance; + +import java.io.IOException; +import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.AsynchronousChannelProvider; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @author 三刀 + * @version V1.0 , 2020/5/25 + */ +public final class EnhanceAsynchronousChannelProvider extends AsynchronousChannelProvider { + @Override + public AsynchronousChannelGroup openAsynchronousChannelGroup(int nThreads, ThreadFactory threadFactory) throws IOException { + return new EnhanceAsynchronousChannelGroup(this, new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue(nThreads), + threadFactory), nThreads); + } + + @Override + public AsynchronousChannelGroup openAsynchronousChannelGroup(ExecutorService executor, int initialSize) throws IOException { + return new EnhanceAsynchronousChannelGroup(this, executor, initialSize); + } + + @Override + public AsynchronousServerSocketChannel openAsynchronousServerSocketChannel(AsynchronousChannelGroup group) throws IOException { + return new EnhanceAsynchronousServerSocketChannel(checkAndGet(group)); + } + + @Override + public AsynchronousSocketChannel openAsynchronousSocketChannel(AsynchronousChannelGroup group) throws IOException { + return new EnhanceAsynchronousSocketChannel(checkAndGet(group), SocketChannel.open()); + } + + private EnhanceAsynchronousChannelGroup checkAndGet(AsynchronousChannelGroup group) { + if (!(group instanceof EnhanceAsynchronousChannelGroup)) { + throw new RuntimeException("invalid class"); + } + return (EnhanceAsynchronousChannelGroup) group; + } +} diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java new file mode 100644 index 00000000..b6e8fedd --- /dev/null +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java @@ -0,0 +1,160 @@ +/******************************************************************************* + * Copyright (c) 2017-2021, org.smartboot. All rights reserved. + * project name: smart-socket + * file name: EnhanceAsynchronousServerSocketChannel.java + * Date: 2021-07-29 + * Author: sandao (zhengjunweimail@163.com) + * + ******************************************************************************/ + +package org.smartboot.socket.enhance; + +import java.io.IOException; +import java.net.SocketAddress; +import java.net.SocketOption; +import java.nio.channels.AcceptPendingException; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.CompletionHandler; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Set; +import java.util.concurrent.Future; + +/** + * @author 三刀 + * @version V1.0 , 2020/5/25 + */ +final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSocketChannel { + private static final String VERSION = "1.0.6"; + private final ServerSocketChannel serverSocketChannel; + private final EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup; + private final EnhanceAsynchronousChannelGroup.Worker acceptWorker; + private CompletionHandler acceptCompletionHandler; + private FutureCompletionHandler acceptFuture; + private Object attachment; + private SelectionKey selectionKey; + private boolean acceptPending; + + /** + * Initializes a new instance of this class. + */ + protected EnhanceAsynchronousServerSocketChannel(EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup) throws IOException { + super(enhanceAsynchronousChannelGroup.provider()); + this.enhanceAsynchronousChannelGroup = enhanceAsynchronousChannelGroup; + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(false); + acceptWorker = enhanceAsynchronousChannelGroup.getAcceptWorker(); + System.out.println("enhance[" + VERSION + "]..."); + } + + @Override + public AsynchronousServerSocketChannel bind(SocketAddress local, int backlog) throws IOException { + serverSocketChannel.bind(local, backlog); + return this; + } + + @Override + public AsynchronousServerSocketChannel setOption(SocketOption name, T value) throws IOException { + serverSocketChannel.setOption(name, value); + return this; + } + + @Override + public T getOption(SocketOption name) throws IOException { + return serverSocketChannel.getOption(name); + } + + @Override + public Set> supportedOptions() { + return serverSocketChannel.supportedOptions(); + } + + @Override + public void accept(A attachment, CompletionHandler handler) { + if (acceptPending) { + throw new AcceptPendingException(); + } + acceptPending = true; + this.acceptCompletionHandler = (CompletionHandler) handler; + this.attachment = attachment; + doAccept(); + } + + public void doAccept() { + try { + //此前通过Future调用,且触发了cancel + if (acceptFuture != null && acceptFuture.isDone()) { + resetAccept(); + enhanceAsynchronousChannelGroup.removeOps(selectionKey, SelectionKey.OP_ACCEPT); + return; + } + boolean directAccept = (acceptWorker.getWorkerThread() == Thread.currentThread() + && acceptWorker.invoker++ < EnhanceAsynchronousChannelGroup.MAX_INVOKER); + SocketChannel socketChannel = null; + if (directAccept) { + socketChannel = serverSocketChannel.accept(); + } + if (socketChannel != null) { + EnhanceAsynchronousSocketChannel asynchronousSocketChannel = new EnhanceAsynchronousSocketChannel(enhanceAsynchronousChannelGroup, socketChannel); + socketChannel.finishConnect(); + socketChannel.socket().setTcpNoDelay(true); + CompletionHandler completionHandler = acceptCompletionHandler; + Object attach = attachment; + resetAccept(); + completionHandler.completed(asynchronousSocketChannel, attach); + if (!acceptPending && selectionKey != null) { + enhanceAsynchronousChannelGroup.removeOps(selectionKey, SelectionKey.OP_ACCEPT); + } + } + //首次注册selector + else if (selectionKey == null) { + acceptWorker.addRegister(selector -> { + try { + selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + selectionKey.attach(EnhanceAsynchronousServerSocketChannel.this); + } catch (ClosedChannelException e) { + acceptCompletionHandler.failed(e, attachment); + } + }); + } else { + enhanceAsynchronousChannelGroup.interestOps(acceptWorker, selectionKey, SelectionKey.OP_ACCEPT); + } + } catch (IOException e) { + this.acceptCompletionHandler.failed(e, attachment); + } + + } + + private void resetAccept() { + acceptPending = false; + acceptFuture = null; + acceptCompletionHandler = null; + attachment = null; + } + + @Override + public Future accept() { + FutureCompletionHandler acceptFuture = new FutureCompletionHandler<>(); + accept(null, acceptFuture); + this.acceptFuture = acceptFuture; + return acceptFuture; + } + + @Override + public SocketAddress getLocalAddress() throws IOException { + return serverSocketChannel.getLocalAddress(); + } + + @Override + public boolean isOpen() { + return serverSocketChannel.isOpen(); + } + + @Override + public void close() throws IOException { + serverSocketChannel.close(); + } +} diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java new file mode 100644 index 00000000..ad98466b --- /dev/null +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -0,0 +1,487 @@ +/******************************************************************************* + * Copyright (c) 2017-2021, org.smartboot. All rights reserved. + * project name: smart-socket + * file name: EnhanceAsynchronousSocketChannel.java + * Date: 2021-07-29 + * Author: sandao (zhengjunweimail@163.com) + * + ******************************************************************************/ + +package org.smartboot.socket.enhance; + +import java.io.IOException; +import java.net.SocketAddress; +import java.net.SocketOption; +import java.nio.ByteBuffer; +import java.nio.channels.AlreadyConnectedException; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.CompletionHandler; +import java.nio.channels.ConnectionPendingException; +import java.nio.channels.ReadPendingException; +import java.nio.channels.SelectionKey; +import java.nio.channels.ShutdownChannelGroupException; +import java.nio.channels.SocketChannel; +import java.nio.channels.WritePendingException; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * 模拟JDK7的AIO处理方式 + * + * @author 三刀 + * @version V1.0 , 2018/5/24 + */ +final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { + private final SocketChannel channel; + private final EnhanceAsynchronousChannelGroup group; + private final EnhanceAsynchronousChannelGroup.Worker readWorker; + private final EnhanceAsynchronousChannelGroup.Worker writeWorker; + private final EnhanceAsynchronousChannelGroup.Worker connectWorker; + private ByteBuffer readBuffer; + private ByteBufferArray scatteringReadBuffer; + private ByteBuffer writeBuffer; + private ByteBufferArray gatheringWriteBuffer; + private CompletionHandler readCompletionHandler; + private CompletionHandler writeCompletionHandler; + private CompletionHandler connectCompletionHandler; + private FutureCompletionHandler connectFuture; + private FutureCompletionHandler readFuture; + private FutureCompletionHandler writeFuture; + private Object readAttachment; + private Object writeAttachment; + private Object connectAttachment; + private SelectionKey readSelectionKey; + private SelectionKey readFutureSelectionKey; + private SelectionKey writeSelectionKey; + private SelectionKey connectSelectionKey; + private boolean writePending; + private boolean readPending; + private boolean connectionPending; + private SocketAddress remote; + + public EnhanceAsynchronousSocketChannel(EnhanceAsynchronousChannelGroup group, SocketChannel channel) throws IOException { + super(group.provider()); + this.group = group; + this.channel = channel; + readWorker = group.getReadWorker(); + writeWorker = group.getWriteWorker(); + connectWorker = group.getConnectWorker(); + channel.configureBlocking(false); + } + + @Override + public void close() throws IOException { + IOException exception = null; + try { + if (channel != null && channel.isOpen()) { + channel.close(); + } + } catch (IOException e) { + exception = e; + } + if (readSelectionKey != null) { + readSelectionKey.cancel(); + readSelectionKey = null; + } + if (readFutureSelectionKey != null) { + readFutureSelectionKey.cancel(); + readFutureSelectionKey = null; + } + if (writeSelectionKey != null) { + writeSelectionKey.cancel(); + writeSelectionKey = null; + } + if (connectSelectionKey != null) { + connectSelectionKey.cancel(); + connectSelectionKey = null; + } + if (exception != null) { + throw exception; + } + } + + @Override + public AsynchronousSocketChannel bind(SocketAddress local) throws IOException { + channel.bind(local); + return this; + } + + @Override + public AsynchronousSocketChannel setOption(SocketOption name, T value) throws IOException { + channel.setOption(name, value); + return this; + } + + @Override + public T getOption(SocketOption name) throws IOException { + return channel.getOption(name); + } + + @Override + public Set> supportedOptions() { + return channel.supportedOptions(); + } + + @Override + public AsynchronousSocketChannel shutdownInput() throws IOException { + channel.shutdownInput(); + return this; + } + + @Override + public AsynchronousSocketChannel shutdownOutput() throws IOException { + channel.shutdownOutput(); + return this; + } + + @Override + public SocketAddress getRemoteAddress() throws IOException { + return channel.getRemoteAddress(); + } + + @Override + public void connect(SocketAddress remote, A attachment, CompletionHandler handler) { + if (group.isTerminated()) { + throw new ShutdownChannelGroupException(); + } + if (channel.isConnected()) { + throw new AlreadyConnectedException(); + } + if (connectionPending) { + throw new ConnectionPendingException(); + } + connectionPending = true; + this.connectAttachment = attachment; + this.connectCompletionHandler = (CompletionHandler) handler; + this.remote = remote; + doConnect(); + } + + @Override + public Future connect(SocketAddress remote) { + FutureCompletionHandler connectFuture = new FutureCompletionHandler<>(); + connect(remote, null, connectFuture); + this.connectFuture = connectFuture; + return connectFuture; + } + + @Override + public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + read0(dst, null, timeout, unit, attachment, handler); + } + + private void read0(ByteBuffer readBuffer, ByteBufferArray scattering, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + if (readPending) { + throw new ReadPendingException(); + } + readPending = true; + this.readBuffer = readBuffer; + this.scatteringReadBuffer = scattering; + this.readAttachment = attachment; + if (timeout > 0) { + readFuture = new FutureCompletionHandler<>((CompletionHandler) handler, readAttachment); + readCompletionHandler = (CompletionHandler) readFuture; + group.getScheduledExecutor().schedule(readFuture, timeout, unit); + } else { + this.readCompletionHandler = (CompletionHandler) handler; + } + doRead(readFuture != null); + } + + @Override + public Future read(ByteBuffer readBuffer) { + FutureCompletionHandler readFuture = new FutureCompletionHandler<>(); + this.readFuture = readFuture; + read(readBuffer, 0, TimeUnit.MILLISECONDS, null, readFuture); + return readFuture; + } + + @Override + public void read(ByteBuffer[] dsts, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + read0(null, new ByteBufferArray(dsts, offset, length), timeout, unit, attachment, handler); + } + + @Override + public void write(ByteBuffer src, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + write0(src, null, timeout, unit, attachment, handler); + } + + private void write0(ByteBuffer writeBuffer, ByteBufferArray gathering, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + if (writePending) { + throw new WritePendingException(); + } + + writePending = true; + this.writeBuffer = writeBuffer; + this.gatheringWriteBuffer = gathering; + this.writeAttachment = attachment; + if (timeout > 0) { + writeFuture = new FutureCompletionHandler<>((CompletionHandler) handler, writeAttachment); + writeCompletionHandler = (CompletionHandler) writeFuture; + group.getScheduledExecutor().schedule(writeFuture, timeout, unit); + } else { + this.writeCompletionHandler = (CompletionHandler) handler; + } + doWrite(); + } + + @Override + public Future write(ByteBuffer src) { + FutureCompletionHandler writeFuture = new FutureCompletionHandler<>(); + this.writeFuture = writeFuture; + write0(src, null, 0, TimeUnit.MILLISECONDS, null, writeFuture); + return writeFuture; + } + + @Override + public void write(ByteBuffer[] srcs, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + write0(null, new ByteBufferArray(srcs, offset, length), timeout, unit, attachment, handler); + } + + @Override + public SocketAddress getLocalAddress() throws IOException { + return channel.getLocalAddress(); + } + + public void doConnect() { + try { + //此前通过Future调用,且触发了cancel + if (connectFuture != null && connectFuture.isDone()) { + resetConnect(); + return; + } + boolean connected = channel.isConnectionPending(); + if (connected || channel.connect(remote)) { + connected = channel.finishConnect(); + } + if (connected) { + CompletionHandler completionHandler = connectCompletionHandler; + Object attach = connectAttachment; + resetConnect(); + completionHandler.completed(null, attach); + } else if (connectSelectionKey == null) { + connectWorker.addRegister(selector -> { + try { + connectSelectionKey = channel.register(selector, SelectionKey.OP_CONNECT); + connectSelectionKey.attach(EnhanceAsynchronousSocketChannel.this); + } catch (ClosedChannelException e) { + connectCompletionHandler.failed(e, connectAttachment); + } + }); + } else { + throw new IOException("unKnow exception"); + } + } catch (IOException e) { + connectCompletionHandler.failed(e, connectAttachment); + } + + } + + private void resetConnect() { + connectionPending = false; + connectFuture = null; + connectAttachment = null; + connectCompletionHandler = null; + } + + public void doRead(boolean direct) { + try { + //此前通过Future调用,且触发了cancel + if (readFuture != null && readFuture.isDone()) { + group.removeOps(readSelectionKey, SelectionKey.OP_READ); + resetRead(); + return; + } + boolean isReadWorkThread = Thread.currentThread() == readWorker.getWorkerThread(); + boolean directRead = direct || (isReadWorkThread && readWorker.invoker++ < EnhanceAsynchronousChannelGroup.MAX_INVOKER); + + long readSize = 0; + boolean hasRemain = true; + if (directRead) { + if (scatteringReadBuffer != null) { + readSize = channel.read(scatteringReadBuffer.getBuffers(), scatteringReadBuffer.getOffset(), scatteringReadBuffer.getLength()); + hasRemain = hasRemaining(scatteringReadBuffer); + } else { + readSize = channel.read(readBuffer); + hasRemain = readBuffer.hasRemaining(); + } + //The read buffer is not full, there may be no readable data + if (hasRemain && isReadWorkThread) { + readWorker.invoker = EnhanceAsynchronousChannelGroup.MAX_INVOKER; + } + } + + //注册至异步线程 + if (readFuture != null && readSize == 0) { + group.removeOps(readSelectionKey, SelectionKey.OP_READ); + group.registerFuture(selector -> { + try { + readFutureSelectionKey = channel.register(selector, SelectionKey.OP_READ); + readFutureSelectionKey.attach(EnhanceAsynchronousSocketChannel.this); + } catch (ClosedChannelException e) { + e.printStackTrace(); + doRead(true); + } + }, SelectionKey.OP_READ); + return; + } + + if (readSize != 0 || !hasRemain) { + CompletionHandler completionHandler = readCompletionHandler; + Object attach = readAttachment; + ByteBufferArray scattering = scatteringReadBuffer; + resetRead(); + if (scattering == null) { + completionHandler.completed((int) readSize, attach); + } else { + completionHandler.completed(readSize, attach); + } + + if (!readPending && readSelectionKey != null) { + group.removeOps(readSelectionKey, SelectionKey.OP_READ); + } + } else if (readSelectionKey == null) { + readWorker.addRegister(selector -> { + try { + readSelectionKey = channel.register(selector, SelectionKey.OP_READ); + readSelectionKey.attach(EnhanceAsynchronousSocketChannel.this); + } catch (ClosedChannelException e) { + readCompletionHandler.failed(e, readAttachment); + } + }); + } else { + group.interestOps(readWorker, readSelectionKey, SelectionKey.OP_READ); + } + + } catch (Throwable e) { + if (readCompletionHandler == null) { + e.printStackTrace(); + try { + close(); + } catch (IOException ioException) { + ioException.printStackTrace(); + } + } else { + readCompletionHandler.failed(e, readAttachment); + } + } + } + + private void resetRead() { + readPending = false; + readFuture = null; + readCompletionHandler = null; + readAttachment = null; + readBuffer = null; + scatteringReadBuffer = null; + } + + public void doWrite() { + try { + //此前通过Future调用,且触发了cancel + if (writeFuture != null && writeFuture.isDone()) { + resetWrite(); + return; + } + boolean directWrite; + boolean isWriteWorkThread = Thread.currentThread() == writeWorker.getWorkerThread(); + if (isWriteWorkThread && writeFuture != null) { + directWrite = writeWorker.invoker++ < EnhanceAsynchronousChannelGroup.MAX_INVOKER; + } else { + directWrite = true; + } + long writeSize = 0; + boolean hasRemain = true; + if (directWrite) { + if (gatheringWriteBuffer != null) { + writeSize = channel.write(gatheringWriteBuffer.getBuffers(), gatheringWriteBuffer.getOffset(), gatheringWriteBuffer.getLength()); + hasRemain = hasRemaining(gatheringWriteBuffer); + } else { + writeSize = channel.write(writeBuffer); + hasRemain = writeBuffer.hasRemaining(); + } + //The write buffer has not been emptied, there may be remaining data cannot be output + if (isWriteWorkThread && hasRemain) { + writeWorker.invoker = EnhanceAsynchronousChannelGroup.MAX_INVOKER; + } + } + + //注册至异步线程 + if (writeFuture != null && writeSize == 0) { + group.removeOps(writeSelectionKey, SelectionKey.OP_WRITE); + group.registerFuture(selector -> { + try { + SelectionKey readSelectionKey = channel.register(selector, SelectionKey.OP_WRITE); + readSelectionKey.attach(EnhanceAsynchronousSocketChannel.this); + } catch (ClosedChannelException e) { + e.printStackTrace(); + doWrite(); + } + }, SelectionKey.OP_WRITE); + return; + } + + if (writeSize != 0 || !hasRemain) { + CompletionHandler completionHandler = writeCompletionHandler; + Object attach = writeAttachment; + ByteBufferArray scattering = gatheringWriteBuffer; + resetWrite(); + if (scattering == null) { + completionHandler.completed((int) writeSize, attach); + } else { + completionHandler.completed(writeSize, attach); + } + if (!writePending && writeSelectionKey != null) { + group.removeOps(writeSelectionKey, SelectionKey.OP_WRITE); + } + } else if (writeSelectionKey == null) { + writeWorker.addRegister(selector -> { + try { + writeSelectionKey = channel.register(selector, SelectionKey.OP_WRITE); + writeSelectionKey.attach(EnhanceAsynchronousSocketChannel.this); + } catch (ClosedChannelException e) { + writeCompletionHandler.failed(e, writeAttachment); + } + }); + } else { + group.interestOps(writeWorker, writeSelectionKey, SelectionKey.OP_WRITE); + } + } catch (Throwable e) { + if (writeCompletionHandler == null) { + e.printStackTrace(); + try { + close(); + } catch (IOException ioException) { + ioException.printStackTrace(); + } + } else { + writeCompletionHandler.failed(e, writeAttachment); + } + } + } + + private boolean hasRemaining(ByteBufferArray scattering) { + for (int i = 0; i < scattering.getLength(); i++) { + if (scattering.getBuffers()[scattering.getOffset() + i].hasRemaining()) { + return true; + } + } + return false; + } + + private void resetWrite() { + writePending = false; + writeFuture = null; + writeAttachment = null; + writeCompletionHandler = null; + writeBuffer = null; + gatheringWriteBuffer = null; + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } +} diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java new file mode 100644 index 00000000..b24de780 --- /dev/null +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java @@ -0,0 +1,111 @@ +/******************************************************************************* + * Copyright (c) 2017-2021, org.smartboot. All rights reserved. + * project name: smart-socket + * file name: FutureCompletionHandler.java + * Date: 2021-07-29 + * Author: sandao (zhengjunweimail@163.com) + * + ******************************************************************************/ + +package org.smartboot.socket.enhance; + +import java.nio.channels.CompletionHandler; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +final class FutureCompletionHandler implements CompletionHandler, Future, Runnable { + private CompletionHandler completionHandler; + private A attach; + private V result; + private boolean done = false; + private boolean cancel = false; + private Throwable exception; + + public FutureCompletionHandler(CompletionHandler completionHandler, A attach) { + this.completionHandler = completionHandler; + this.attach = attach; + } + + public FutureCompletionHandler() { + } + + @Override + public void completed(V result, A selectionKey) { + this.result = result; + done = true; + synchronized (this) { + this.notify(); + } + if (completionHandler != null) { + completionHandler.completed(result, attach); + } + } + + @Override + public void failed(Throwable exc, A attachment) { + exception = exc; + done = true; + if (completionHandler != null) { + completionHandler.failed(exc, attachment); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (done || cancel) { + return false; + } + cancel = true; + done = true; + synchronized (this) { + notify(); + } + return true; + } + + @Override + public boolean isCancelled() { + return cancel; + } + + @Override + public boolean isDone() { + return done; + } + + @Override + public synchronized V get() throws InterruptedException, ExecutionException { + if (done) { + if (exception != null) { + throw new ExecutionException(exception); + } + return result; + } else { + wait(); + } + return get(); + } + + @Override + public synchronized V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (done) { + return get(); + } else { + wait(unit.toMillis(timeout)); + } + if (done) { + return get(); + } + throw new TimeoutException(); + } + + @Override + public synchronized void run() { + if (!done) { + cancel(true); + completionHandler.failed(new TimeoutException(), attach); + } + } +} diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml index 6746a5d4..c5dae951 100644 --- a/smart-socket-parent/pom.xml +++ b/smart-socket-parent/pom.xml @@ -26,6 +26,11 @@
+ + org.smartboot.socket + aio-enhance + ${aio.version} + org.smartboot.socket aio-core @@ -260,5 +265,6 @@ ../aio-core ../aio-pro + ../aio-enhance \ No newline at end of file -- Gitee From fe573713203beb1bc7f34052f369cb5fb04d35e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 31 Jul 2021 09:04:38 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartboot/socket/transport/AioQuickServer.java | 14 ++++++++++++-- example/pom.xml | 13 ------------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java index 16a9af38..dba5cedc 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java @@ -124,7 +124,7 @@ public final class AioQuickServer { */ public void start() throws IOException { if (config.isBannerEnabled()) { - System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket ::\t(" + IoServerConfig.VERSION + ")"); + System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket " + (config.isAioEnhance() ? "[enhance]" : "") + "::\t(" + IoServerConfig.VERSION + ")"); } start0(channel -> new TcpAioSession(channel, config, aioReadCompletionHandler, aioWriteCompletionHandler, bufferPool.allocateBufferPage())); } @@ -272,7 +272,7 @@ public final class AioQuickServer { /** * 停止服务端 */ - public final void shutdown() { + public void shutdown() { try { if (serverSocketChannel != null) { serverSocketChannel.close(); @@ -314,6 +314,16 @@ public final class AioQuickServer { return this; } + /** + * 是否启用 AIO 增强模式。默认:true + * + * @param enabled true:启用;false:禁用 + */ + public AioQuickServer setAioEnhance(boolean enabled) { + config.setAioEnhance(enabled); + return this; + } + /** * 是否启用控制台Banner打印 * diff --git a/example/pom.xml b/example/pom.xml index 38ac2ddd..19f73ba4 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -5,9 +5,6 @@ org.smartboot.socket 1.0.0 4.0.0 - - 1.0.4-SNAPSHOT - example @@ -28,11 +25,6 @@ aio-pro 1.5.11-SNAPSHOT - - org.smartboot.socket - aio-core - 1.5.11-SNAPSHOT - org.apache.commons commons-lang3 @@ -43,11 +35,6 @@ slf4j-simple 1.7.21 - - org.smartboot.aio - aio-enhance - ${aio.enhance.version} - com.alibaba fastjson -- Gitee From 7ae954cf3eaff96dad07fd0ed6b905d857958c04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 14 Aug 2021 09:08:58 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/socket/example/benchmark/StringClient.java | 4 ++-- .../org/smartboot/socket/example/benchmark/StringServer.java | 1 - .../java/org/smartboot/socket/example/netty/SmartServer.java | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/example/src/main/java/org/smartboot/socket/example/benchmark/StringClient.java b/example/src/main/java/org/smartboot/socket/example/benchmark/StringClient.java index 7b6ca800..97930c4b 100644 --- a/example/src/main/java/org/smartboot/socket/example/benchmark/StringClient.java +++ b/example/src/main/java/org/smartboot/socket/example/benchmark/StringClient.java @@ -4,6 +4,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.smartboot.socket.StateMachineEnum; import org.smartboot.socket.buffer.BufferPagePool; +import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider; import org.smartboot.socket.extension.plugins.MonitorPlugin; import org.smartboot.socket.extension.processor.AbstractMessageProcessor; import org.smartboot.socket.extension.protocol.StringProtocol; @@ -25,7 +26,6 @@ public class StringClient { public static void main(String[] args) throws IOException { - System.setProperty("java.nio.channels.spi.AsynchronousChannelProvider", "org.smartboot.aio.EnhanceAsynchronousChannelProvider"); BufferPagePool bufferPagePool = new BufferPagePool(1024 * 1024 * 32, 10, true); AbstractMessageProcessor processor = new AbstractMessageProcessor() { @Override @@ -41,7 +41,7 @@ public class StringClient { } }; processor.addPlugin(new MonitorPlugin(5)); - AsynchronousChannelGroup asynchronousChannelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { + AsynchronousChannelGroup asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider().openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "ClientGroup"); diff --git a/example/src/main/java/org/smartboot/socket/example/benchmark/StringServer.java b/example/src/main/java/org/smartboot/socket/example/benchmark/StringServer.java index 1d2e81bf..1592bcba 100644 --- a/example/src/main/java/org/smartboot/socket/example/benchmark/StringServer.java +++ b/example/src/main/java/org/smartboot/socket/example/benchmark/StringServer.java @@ -22,7 +22,6 @@ public class StringServer { private static final Logger LOGGER = LoggerFactory.getLogger(StringServer.class); public static void main(String[] args) throws IOException { - System.setProperty("java.nio.channels.spi.AsynchronousChannelProvider", "org.smartboot.aio.EnhanceAsynchronousChannelProvider"); AbstractMessageProcessor processor = new AbstractMessageProcessor() { @Override public void process0(AioSession session, String msg) { diff --git a/example/src/main/java/org/smartboot/socket/example/netty/SmartServer.java b/example/src/main/java/org/smartboot/socket/example/netty/SmartServer.java index 4d671adb..1a0e3c15 100644 --- a/example/src/main/java/org/smartboot/socket/example/netty/SmartServer.java +++ b/example/src/main/java/org/smartboot/socket/example/netty/SmartServer.java @@ -23,7 +23,6 @@ import java.io.IOException; */ public class SmartServer { public static void main(String[] args) throws Exception { - System.setProperty("java.nio.channels.spi.AsynchronousChannelProvider", "org.smartboot.aio.EnhanceAsynchronousChannelProvider"); AioQuickServer server = new AioQuickServer("localhost", 8080, new LongProtocol(), (session, msg) -> { long now = System.nanoTime(); // System.out.println("cost: " + (now - msg)); -- Gitee From 98746ef11a37454b5edc44da74886290aaf6e9fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 14 Aug 2021 09:20:54 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/smartboot/socket/transport/IoServerConfig.java | 7 +++++-- .../enhance/EnhanceAsynchronousServerSocketChannel.java | 7 +------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index c7a4d615..f8e881d5 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -250,16 +250,19 @@ final class IoServerConfig { public String toString() { return "IoServerConfig{" + "readBufferSize=" + readBufferSize + - ", writeQueueCapacity=" + writeBufferCapacity + + ", writeBufferSize=" + writeBufferSize + + ", writeBufferCapacity=" + writeBufferCapacity + ", host='" + host + '\'' + ", monitor=" + monitor + ", port=" + port + + ", backlog=" + backlog + ", processor=" + processor + ", protocol=" + protocol + ", bannerEnabled=" + bannerEnabled + ", socketOptions=" + socketOptions + ", threadNum=" + threadNum + - ", writeBufferSize=" + writeBufferSize + + ", bufferFactory=" + bufferFactory + + ", aioEnhance=" + aioEnhance + '}'; } } diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java index b6e8fedd..48ea208c 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java @@ -28,7 +28,6 @@ import java.util.concurrent.Future; * @version V1.0 , 2020/5/25 */ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSocketChannel { - private static final String VERSION = "1.0.6"; private final ServerSocketChannel serverSocketChannel; private final EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup; private final EnhanceAsynchronousChannelGroup.Worker acceptWorker; @@ -38,16 +37,12 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc private SelectionKey selectionKey; private boolean acceptPending; - /** - * Initializes a new instance of this class. - */ - protected EnhanceAsynchronousServerSocketChannel(EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup) throws IOException { + EnhanceAsynchronousServerSocketChannel(EnhanceAsynchronousChannelGroup enhanceAsynchronousChannelGroup) throws IOException { super(enhanceAsynchronousChannelGroup.provider()); this.enhanceAsynchronousChannelGroup = enhanceAsynchronousChannelGroup; serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); acceptWorker = enhanceAsynchronousChannelGroup.getAcceptWorker(); - System.out.println("enhance[" + VERSION + "]..."); } @Override -- Gitee From e7aa05370807d7c2efbb7db4c15991751ea8d162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Sat, 14 Aug 2021 09:51:20 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E5=8F=91=E5=B8=83=E6=AD=A3=E5=BC=8F?= =?UTF-8?q?=E5=8C=85=201.5.11?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aio-core/pom.xml | 2 +- .../java/org/smartboot/socket/transport/IoServerConfig.java | 2 +- aio-enhance/pom.xml | 2 +- aio-pro/pom.xml | 2 +- example/pom.xml | 2 +- pom.xml | 2 +- smart-socket-parent/pom.xml | 4 ++-- 7 files changed, 8 insertions(+), 8 deletions(-) diff --git a/aio-core/pom.xml b/aio-core/pom.xml index 6dfbe9e7..8ab12a80 100644 --- a/aio-core/pom.xml +++ b/aio-core/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.5.11-SNAPSHOT + 1.5.11 ../smart-socket-parent diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index f8e881d5..5b3b8de0 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -39,7 +39,7 @@ final class IoServerConfig { /** * 当前smart-socket版本号 */ - public static final String VERSION = "v1.5.11-SNAPSHOT"; + public static final String VERSION = "v1.5.11"; /** * 消息体缓存大小,字节 diff --git a/aio-enhance/pom.xml b/aio-enhance/pom.xml index e7347eeb..0894834b 100644 --- a/aio-enhance/pom.xml +++ b/aio-enhance/pom.xml @@ -14,7 +14,7 @@ smart-socket-parent org.smartboot.socket - 1.5.11-SNAPSHOT + 1.5.11 ../smart-socket-parent 4.0.0 diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml index b45bdf41..0073e7d0 100644 --- a/aio-pro/pom.xml +++ b/aio-pro/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.5.11-SNAPSHOT + 1.5.11 ../smart-socket-parent diff --git a/example/pom.xml b/example/pom.xml index 19f73ba4..0bc19380 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -23,7 +23,7 @@ org.smartboot.socket aio-pro - 1.5.11-SNAPSHOT + 1.5.11 org.apache.commons diff --git a/pom.xml b/pom.xml index 0883cd2a..c598a9c3 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ org.smartboot.socket smart-socket-parent - 1.5.11-SNAPSHOT + 1.5.11 2.6 diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml index c5dae951..1e9bafb3 100644 --- a/smart-socket-parent/pom.xml +++ b/smart-socket-parent/pom.xml @@ -15,14 +15,14 @@ 4.0.0 org.smartboot.socket smart-socket-parent - 1.5.11-SNAPSHOT + 1.5.11 pom UTF-8 1.7.29 - 1.5.11-SNAPSHOT + 1.5.11 -- Gitee