diff --git a/aio-core/pom.xml b/aio-core/pom.xml index 3d528b4b3454cbeab17e847f63c7f29d28e9ace4..b80ce55d11f3b56a51e3ee6dc2925d16d0ee7fa7 100644 --- a/aio-core/pom.xml +++ b/aio-core/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.6.2 + 1.6.3 ../smart-socket-parent diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java index b537cf526279c16e75c348d689af138c40679c20..83c1f384b4ef829092881c0d68857b3381874c5d 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java @@ -112,10 +112,19 @@ public final class AioQuickServer { * @throws IOException IO异常 */ public void start() throws IOException { - if (config.isBannerEnabled()) { - System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket " + "::\t(" + IoServerConfig.VERSION + ") [port: " + config.getPort() + ", threadNum:" + config.getThreadNum() + "]"); + if (bufferPool == null) { + this.bufferPool = config.getBufferFactory().create(); + this.innerBufferPool = bufferPool; } - start0(); + asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(lowMemory).openAsynchronousChannelGroup(config.getThreadNum(), new ThreadFactory() { + private byte index = 0; + + @Override + public Thread newThread(Runnable r) { + return bufferPool.newThread(r, "smart-socket:Thread-" + (++index)); + } + }); + start(asynchronousChannelGroup); } /** @@ -123,20 +132,16 @@ public final class AioQuickServer { * * @throws IOException IO异常 */ - private void start0() throws IOException { + public void start(AsynchronousChannelGroup asynchronousChannelGroup) throws IOException { + if (config.isBannerEnabled()) { + System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket " + "::\t(" + IoServerConfig.VERSION + ") [port: " + config.getPort() + ", threadNum:" + config.getThreadNum() + "]"); + } try { if (bufferPool == null) { this.bufferPool = config.getBufferFactory().create(); this.innerBufferPool = bufferPool; } - asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(lowMemory).openAsynchronousChannelGroup(config.getThreadNum(), new ThreadFactory() { - private byte index = 0; - @Override - public Thread newThread(Runnable r) { - return bufferPool.newThread(r, "smart-socket:Thread-" + (++index)); - } - }); this.serverSocketChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup); //set socket options if (config.getSocketOptions() != null) { @@ -224,18 +229,20 @@ public final class AioQuickServer { e.printStackTrace(); } - if (!asynchronousChannelGroup.isTerminated()) { + if (asynchronousChannelGroup != null) { + if (!asynchronousChannelGroup.isTerminated()) { + try { + asynchronousChannelGroup.shutdownNow(); + } catch (IOException e) { + e.printStackTrace(); + } + } try { - asynchronousChannelGroup.shutdownNow(); - } catch (IOException e) { + asynchronousChannelGroup.awaitTermination(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { e.printStackTrace(); } } - try { - asynchronousChannelGroup.awaitTermination(3, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } if (innerBufferPool != null) { innerBufferPool.release(); } diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index ef80c2687afad7f3a8ef690177d96e339c5369f9..08ea4e3ebf30f09c5013963465721ad46b91673c 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -39,7 +39,7 @@ final class IoServerConfig { /** * 当前smart-socket版本号 */ - public static final String VERSION = "v1.6.2"; + public static final String VERSION = "v1.6.3"; /** * 消息体缓存大小,字节 diff --git a/aio-enhance/pom.xml b/aio-enhance/pom.xml index 90e3d17b775fb6183b858c116555f4b9d3d9e44a..d4eb5cae7cda376a459f46ea1f043d99ae0e88c2 100644 --- a/aio-enhance/pom.xml +++ b/aio-enhance/pom.xml @@ -14,7 +14,7 @@ smart-socket-parent org.smartboot.socket - 1.6.2 + 1.6.3 ../smart-socket-parent 4.0.0 diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java deleted file mode 100644 index 1c8821dfe08c67763e3e5df892614e57e19b9949..0000000000000000000000000000000000000000 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java +++ /dev/null @@ -1,40 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2017-2021, org.smartboot. All rights reserved. - * project name: smart-socket - * file name: ByteBufferArray.java - * Date: 2021-07-29 - * Author: sandao (zhengjunweimail@163.com) - * - ******************************************************************************/ - -package org.smartboot.socket.enhance; - -import java.nio.ByteBuffer; - - -/** - * @author 三刀 - */ -final class ByteBufferArray { - private final ByteBuffer[] buffers; - private final int offset; - private final int length; - - public ByteBufferArray(ByteBuffer[] buffers, int offset, int length) { - this.buffers = buffers; - this.offset = offset; - this.length = length; - } - - public ByteBuffer[] getBuffers() { - return buffers; - } - - public int getOffset() { - return offset; - } - - public int getLength() { - return length; - } -} \ No newline at end of file diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java index 1dc28913262412d32c270bcb579e598fecac33f3..6dddae16d857a3d0a555fc3eaae49fe77d1b3fb8 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java @@ -17,7 +17,6 @@ import java.nio.channels.spi.AsynchronousChannelProvider; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -40,11 +39,11 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { /** * 写回调线程池 */ - private final ExecutorService writeExecutorService; + private final ExecutorService commonExecutorService; /** * write工作组 */ - private final Worker[] writeWorkers; + private final Worker[] commonWorkers; /** * read工作组 */ @@ -53,24 +52,8 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { * 线程池分配索引 */ private final AtomicInteger readIndex = new AtomicInteger(0); - private final AtomicInteger writeIndex = new AtomicInteger(0); - /** - * 定时任务线程池 - */ - private final ScheduledThreadPoolExecutor scheduledExecutor; - /** - * 服务端accept线程池 - */ - private final ExecutorService acceptExecutorService; - /** - * accept工作组 - */ - private final Worker[] acceptWorkers; - private Worker futureWorker; - /** - * 同步IO线程池 - */ - private ExecutorService futureExecutorService; + private final AtomicInteger commonIndex = new AtomicInteger(0); + /** * group运行状态 */ @@ -95,65 +78,32 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { } //init threadPool for write and connect - final int writeThreadNum = 1; - final int acceptThreadNum = 1; - writeExecutorService = getSingleThreadExecutor("smart-socket:write"); - this.writeWorkers = new Worker[writeThreadNum]; + final int commonThreadNum = 1; + commonExecutorService = getSingleThreadExecutor("smart-socket:common"); + this.commonWorkers = new Worker[commonThreadNum]; - for (int i = 0; i < writeThreadNum; i++) { - writeWorkers[i] = new Worker(Selector.open(), selectionKey -> { - EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); - //直接调用interestOps的效果比 removeOps(selectionKey, SelectionKey.OP_WRITE) 更好 - selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE); - asynchronousSocketChannel.doWrite(); - }); - writeExecutorService.execute(writeWorkers[i]); - } - - //init threadPool for accept - acceptExecutorService = getSingleThreadExecutor("smart-socket:connect"); - acceptWorkers = new Worker[acceptThreadNum]; - for (int i = 0; i < acceptThreadNum; i++) { - acceptWorkers[i] = new Worker(Selector.open(), selectionKey -> { - if (selectionKey.isAcceptable()) { + for (int i = 0; i < commonThreadNum; i++) { + commonWorkers[i] = new Worker(Selector.open(), selectionKey -> { + if (selectionKey.isWritable()) { + EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); + //直接调用interestOps的效果比 removeOps(selectionKey, SelectionKey.OP_WRITE) 更好 + selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE); + asynchronousSocketChannel.doWrite(); + } else if (selectionKey.isAcceptable()) { EnhanceAsynchronousServerSocketChannel serverSocketChannel = (EnhanceAsynchronousServerSocketChannel) selectionKey.attachment(); serverSocketChannel.doAccept(); } else if (selectionKey.isConnectable()) { EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); - asynchronousSocketChannel.doConnect(); - } - }); - acceptExecutorService.execute(acceptWorkers[i]); - } - - scheduledExecutor = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "smart-socket:scheduled")); - } - - /** - * 同步IO注册异步线程,防止主IO线程阻塞 - */ - public synchronized void registerFuture(Consumer register, int opType) throws IOException { - if (futureWorker == null) { - futureExecutorService = getSingleThreadExecutor("smart-socket:future"); - futureWorker = new Worker(Selector.open(), selectionKey -> { - EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); - switch (opType) { - case SelectionKey.OP_READ: - removeOps(selectionKey, SelectionKey.OP_READ); - asynchronousSocketChannel.doRead(true); - break; - case SelectionKey.OP_WRITE: - removeOps(selectionKey, SelectionKey.OP_WRITE); - asynchronousSocketChannel.doWrite(); - break; - default: - throw new UnsupportedOperationException("unSupport opType: " + opType); + asynchronousSocketChannel.doConnect(null); + } else if (selectionKey.isReadable()) { + //仅同步read会用到此线程资源 + EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); + removeOps(selectionKey, SelectionKey.OP_READ); + asynchronousSocketChannel.doRead(true); } - }); - futureExecutorService.execute(futureWorker); + commonExecutorService.execute(commonWorkers[i]); } - futureWorker.addRegister(register); } private ThreadPoolExecutor getSingleThreadExecutor(final String prefix) { @@ -177,20 +127,8 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { return readWorkers[(readIndex.getAndIncrement() & Integer.MAX_VALUE) % readWorkers.length]; } - public Worker getWriteWorker() { - return writeWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % writeWorkers.length]; - } - - public Worker getAcceptWorker() { - return acceptWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % acceptWorkers.length]; - } - - public Worker getConnectWorker() { - return acceptWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % acceptWorkers.length]; - } - - public ScheduledThreadPoolExecutor getScheduledExecutor() { - return scheduledExecutor; + public Worker getCommonWorker() { + return commonWorkers[(commonIndex.getAndIncrement() & Integer.MAX_VALUE) % commonWorkers.length]; } @Override @@ -207,28 +145,14 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { public void shutdown() { running = false; readExecutorService.shutdown(); - writeExecutorService.shutdown(); - if (acceptExecutorService != null) { - acceptExecutorService.shutdown(); - } - if (futureExecutorService != null) { - futureExecutorService.shutdown(); - } - scheduledExecutor.shutdown(); + commonExecutorService.shutdown(); } @Override public void shutdownNow() { running = false; readExecutorService.shutdownNow(); - writeExecutorService.shutdownNow(); - if (acceptExecutorService != null) { - acceptExecutorService.shutdownNow(); - } - if (futureExecutorService != null) { - futureExecutorService.shutdownNow(); - } - scheduledExecutor.shutdownNow(); + commonExecutorService.shutdownNow(); } @Override @@ -251,7 +175,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { /** * 当前Worker绑定的Selector */ - private final Selector selector; + final Selector selector; private final Consumer consumer; private final ConcurrentLinkedQueue> consumers = new ConcurrentLinkedQueue<>(); int invoker = 0; diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java index 5f3986826b2337736910fea35f5e42a49a8e794b..b57b235c33799eca04b666222c3c69bfdd8e3359 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java @@ -43,7 +43,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc this.enhanceAsynchronousChannelGroup = enhanceAsynchronousChannelGroup; serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); - acceptWorker = enhanceAsynchronousChannelGroup.getAcceptWorker(); + acceptWorker = enhanceAsynchronousChannelGroup.getCommonWorker(); this.lowMemory = lowMemory; } @@ -96,6 +96,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc } if (socketChannel != null) { EnhanceAsynchronousSocketChannel asynchronousSocketChannel = new EnhanceAsynchronousSocketChannel(enhanceAsynchronousChannelGroup, socketChannel, lowMemory); + //这行代码不要乱动 socketChannel.configureBlocking(false); socketChannel.finishConnect(); CompletionHandler completionHandler = acceptCompletionHandler; diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index 95e80074e7776de284655c13bacc248c0f2fb927..ee1b4365e79e651bf1b4565daef09251f421ab78 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -49,27 +49,17 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { /** * 处理 write 事件的线程资源 */ - private final EnhanceAsynchronousChannelGroup.Worker writeWorker; - /** - * 处理 connect 事件的线程资源 - */ - private final EnhanceAsynchronousChannelGroup.Worker connectWorker; + private final EnhanceAsynchronousChannelGroup.Worker commonWorker; + /** * 用于接收 read 通道数据的缓冲区,经解码后腾出缓冲区以供下一批数据的读取 */ private ByteBuffer readBuffer; - /** - * 用于接收 read 通道数据的缓冲区集合 - */ - private ByteBufferArray scatteringReadBuffer; /** * 存放待输出数据的缓冲区 */ private ByteBuffer writeBuffer; - /** - * 存放待输出数据的缓冲区集合 - */ - private ByteBufferArray gatheringWriteBuffer; + /** * read 回调事件处理器 */ @@ -84,7 +74,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { private CompletionHandler connectCompletionHandler; private FutureCompletionHandler connectFuture; private FutureCompletionHandler readFuture; - private FutureCompletionHandler writeFuture; /** * read 回调事件关联绑定的附件对象 */ @@ -98,10 +87,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { */ private Object connectAttachment; private SelectionKey readSelectionKey; - private SelectionKey readFutureSelectionKey; - private SelectionKey writeSelectionKey; - private SelectionKey writeFutureSelectionKey; - private SelectionKey connectSelectionKey; /** * 当前是否正在执行 write 操作 */ @@ -114,21 +99,16 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { * 当前是否正在执行 connect 操作 */ private boolean connectionPending; - /** - * 远程连接的地址 - */ - private SocketAddress remote; private int writeInvoker; - private boolean lowMemory; + private final boolean lowMemory; public EnhanceAsynchronousSocketChannel(EnhanceAsynchronousChannelGroup group, SocketChannel channel, boolean lowMemory) throws IOException { super(group.provider()); this.group = group; this.channel = channel; readWorker = group.getReadWorker(); - writeWorker = group.getWriteWorker(); - connectWorker = group.getConnectWorker(); + commonWorker = group.getCommonWorker(); this.lowMemory = lowMemory; } @@ -136,7 +116,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { public void close() throws IOException { IOException exception = null; try { - if (channel != null && channel.isOpen()) { + if (channel.isOpen()) { channel.close(); } } catch (IOException e) { @@ -146,21 +126,9 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { readSelectionKey.cancel(); readSelectionKey = null; } - if (readFutureSelectionKey != null) { - readFutureSelectionKey.cancel(); - readFutureSelectionKey = null; - } - if (writeSelectionKey != null) { - writeSelectionKey.cancel(); - writeSelectionKey = null; - } - if (writeFutureSelectionKey != null) { - writeFutureSelectionKey.cancel(); - writeFutureSelectionKey = null; - } - if (connectSelectionKey != null) { - connectSelectionKey.cancel(); - connectSelectionKey = null; + SelectionKey key = channel.keyFor(commonWorker.selector); + if (key != null) { + key.cancel(); } if (exception != null) { throw exception; @@ -220,8 +188,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { connectionPending = true; this.connectAttachment = attachment; this.connectCompletionHandler = (CompletionHandler) handler; - this.remote = remote; - doConnect(); + doConnect(remote); } @Override @@ -234,24 +201,20 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { @Override public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read0(dst, null, timeout, unit, attachment, handler); + if (timeout > 0) { + throw new UnsupportedOperationException(); + } + read0(dst, attachment, handler); } - private void read0(ByteBuffer readBuffer, ByteBufferArray scattering, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + private void read0(ByteBuffer readBuffer, A attachment, CompletionHandler handler) { if (readPending) { throw new ReadPendingException(); } readPending = true; this.readBuffer = readBuffer; - this.scatteringReadBuffer = scattering; this.readAttachment = attachment; - if (timeout > 0) { - readFuture = new FutureCompletionHandler<>((CompletionHandler) handler, readAttachment); - readCompletionHandler = (CompletionHandler) readFuture; - group.getScheduledExecutor().schedule(readFuture, timeout, unit); - } else { - this.readCompletionHandler = (CompletionHandler) handler; - } + this.readCompletionHandler = (CompletionHandler) handler; doRead(readFuture != null); } @@ -265,44 +228,37 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { @Override public void read(ByteBuffer[] dsts, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read0(null, new ByteBufferArray(dsts, offset, length), timeout, unit, attachment, handler); + throw new UnsupportedOperationException(); } @Override public void write(ByteBuffer src, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - write0(src, null, timeout, unit, attachment, handler); + if (timeout > 0) { + throw new UnsupportedOperationException(); + } + write0(src, attachment, handler); } - private void write0(ByteBuffer writeBuffer, ByteBufferArray gathering, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + private void write0(ByteBuffer writeBuffer, A attachment, CompletionHandler handler) { if (writePending) { throw new WritePendingException(); } writePending = true; this.writeBuffer = writeBuffer; - this.gatheringWriteBuffer = gathering; this.writeAttachment = attachment; - if (timeout > 0) { - writeFuture = new FutureCompletionHandler<>((CompletionHandler) handler, writeAttachment); - writeCompletionHandler = (CompletionHandler) writeFuture; - group.getScheduledExecutor().schedule(writeFuture, timeout, unit); - } else { - this.writeCompletionHandler = (CompletionHandler) handler; - } + this.writeCompletionHandler = (CompletionHandler) handler; doWrite(); } @Override public Future write(ByteBuffer src) { - FutureCompletionHandler writeFuture = new FutureCompletionHandler<>(); - this.writeFuture = writeFuture; - write0(src, null, 0, TimeUnit.MILLISECONDS, null, writeFuture); - return writeFuture; + throw new UnsupportedOperationException(); } @Override public void write(ByteBuffer[] srcs, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - write0(null, new ByteBufferArray(srcs, offset, length), timeout, unit, attachment, handler); + throw new UnsupportedOperationException(); } @Override @@ -310,7 +266,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { return channel.getLocalAddress(); } - public void doConnect() { + public void doConnect(SocketAddress remote) { try { //此前通过Future调用,且触发了cancel if (connectFuture != null && connectFuture.isDone()) { @@ -321,22 +277,21 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { if (connected || channel.connect(remote)) { connected = channel.finishConnect(); } + //这行代码不要乱动 channel.configureBlocking(false); if (connected) { CompletionHandler completionHandler = connectCompletionHandler; Object attach = connectAttachment; resetConnect(); completionHandler.completed(null, attach); - } else if (connectSelectionKey == null) { - connectWorker.addRegister(selector -> { + } else { + commonWorker.addRegister(selector -> { try { - connectSelectionKey = channel.register(selector, SelectionKey.OP_CONNECT, EnhanceAsynchronousSocketChannel.this); + channel.register(selector, SelectionKey.OP_CONNECT, EnhanceAsynchronousSocketChannel.this); } catch (ClosedChannelException e) { connectCompletionHandler.failed(e, connectAttachment); } }); - } else { - throw new IOException("unKnow exception"); } } catch (IOException e) { connectCompletionHandler.failed(e, connectAttachment); @@ -371,26 +326,21 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { long readSize = 0; boolean hasRemain = true; if (directRead) { - if (scatteringReadBuffer != null) { - readSize = channel.read(scatteringReadBuffer.getBuffers(), scatteringReadBuffer.getOffset(), scatteringReadBuffer.getLength()); - hasRemain = hasRemaining(scatteringReadBuffer); - } else { - readSize = channel.read(readBuffer); - hasRemain = readBuffer.hasRemaining(); - } + readSize = channel.read(readBuffer); + hasRemain = readBuffer.hasRemaining(); } //注册至异步线程 if (readFuture != null && readSize == 0) { group.removeOps(readSelectionKey, SelectionKey.OP_READ); - group.registerFuture(selector -> { + commonWorker.addRegister(selector -> { try { - readFutureSelectionKey = channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this); + channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this); } catch (ClosedChannelException e) { e.printStackTrace(); doRead(true); } - }, SelectionKey.OP_READ); + }); return; } //释放内存 @@ -402,13 +352,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { if (readSize != 0 || !hasRemain) { CompletionHandler completionHandler = readCompletionHandler; Object attach = readAttachment; - ByteBufferArray scattering = scatteringReadBuffer; resetRead(); - if (scattering == null) { - completionHandler.completed((int) readSize, attach); - } else { - completionHandler.completed(readSize, attach); - } + completionHandler.completed((int) readSize, attach); if (!readPending && readSelectionKey != null) { group.removeOps(readSelectionKey, SelectionKey.OP_READ); @@ -445,66 +390,44 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { readCompletionHandler = null; readAttachment = null; readBuffer = null; - scatteringReadBuffer = null; } public void doWrite() { try { - //此前通过Future调用,且触发了cancel - if (writeFuture != null && writeFuture.isDone()) { - resetWrite(); - return; - } int invoker = 0; //防止无限递归导致堆栈溢出 - if (writeWorker.getWorkerThread() == Thread.currentThread()) { - invoker = ++writeWorker.invoker; + if (commonWorker.getWorkerThread() == Thread.currentThread()) { + invoker = ++commonWorker.invoker; } else if (readWorker.getWorkerThread() != Thread.currentThread()) { invoker = ++writeInvoker; } int writeSize = 0; boolean hasRemain = true; if (invoker < EnhanceAsynchronousChannelGroup.MAX_INVOKER) { - if (gatheringWriteBuffer != null) { - writeSize = (int) channel.write(gatheringWriteBuffer.getBuffers(), gatheringWriteBuffer.getOffset(), gatheringWriteBuffer.getLength()); - hasRemain = hasRemaining(gatheringWriteBuffer); - } else { - writeSize = channel.write(writeBuffer); - hasRemain = writeBuffer.hasRemaining(); - } + writeSize = channel.write(writeBuffer); + hasRemain = writeBuffer.hasRemaining(); } else { writeInvoker = 0; } - //注册至异步线程 - if (writeFuture != null && writeSize == 0) { - group.removeOps(writeSelectionKey, SelectionKey.OP_WRITE); - group.registerFuture(selector -> { - try { - writeFutureSelectionKey = channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this); - } catch (ClosedChannelException e) { - e.printStackTrace(); - doWrite(); - } - }, SelectionKey.OP_WRITE); - return; - } - if (writeSize != 0 || !hasRemain) { CompletionHandler completionHandler = writeCompletionHandler; Object attach = writeAttachment; resetWrite(); completionHandler.completed(writeSize, attach); - } else if (writeSelectionKey == null) { - writeWorker.addRegister(selector -> { - try { - writeSelectionKey = channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this); - } catch (ClosedChannelException e) { - writeCompletionHandler.failed(e, writeAttachment); - } - }); } else { - group.interestOps(writeWorker, writeSelectionKey, SelectionKey.OP_WRITE); + SelectionKey commonSelectionKey = channel.keyFor(commonWorker.selector); + if (commonSelectionKey == null) { + commonWorker.addRegister(selector -> { + try { + channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this); + } catch (ClosedChannelException e) { + writeCompletionHandler.failed(e, writeAttachment); + } + }); + } else { + group.interestOps(commonWorker, commonSelectionKey, SelectionKey.OP_WRITE); + } } } catch (Throwable e) { if (writeCompletionHandler == null) { @@ -520,22 +443,11 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { } } - private boolean hasRemaining(ByteBufferArray scattering) { - for (int i = 0; i < scattering.getLength(); i++) { - if (scattering.getBuffers()[scattering.getOffset() + i].hasRemaining()) { - return true; - } - } - return false; - } - private void resetWrite() { writePending = false; - writeFuture = null; writeAttachment = null; writeCompletionHandler = null; writeBuffer = null; - gatheringWriteBuffer = null; } @Override diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java index b24de7807549eae6238fc730df2fb7a89803f641..0cb228bc6ebcdf3119eaf9514c296fbee3462eed 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java @@ -15,22 +15,12 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -final class FutureCompletionHandler implements CompletionHandler, Future, Runnable { - private CompletionHandler completionHandler; - private A attach; +final class FutureCompletionHandler implements CompletionHandler, Future { private V result; private boolean done = false; private boolean cancel = false; private Throwable exception; - public FutureCompletionHandler(CompletionHandler completionHandler, A attach) { - this.completionHandler = completionHandler; - this.attach = attach; - } - - public FutureCompletionHandler() { - } - @Override public void completed(V result, A selectionKey) { this.result = result; @@ -38,18 +28,12 @@ final class FutureCompletionHandler implements CompletionHandler, Fu synchronized (this) { this.notify(); } - if (completionHandler != null) { - completionHandler.completed(result, attach); - } } @Override public void failed(Throwable exc, A attachment) { exception = exc; done = true; - if (completionHandler != null) { - completionHandler.failed(exc, attachment); - } } @Override @@ -101,11 +85,4 @@ final class FutureCompletionHandler implements CompletionHandler, Fu throw new TimeoutException(); } - @Override - public synchronized void run() { - if (!done) { - cancel(true); - completionHandler.failed(new TimeoutException(), attach); - } - } } diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml index facd8a13ad70a2e3074924b5ddf4b09edd0acf00..88e912163cd486f3670b3725457047d5e4dbc88a 100644 --- a/aio-pro/pom.xml +++ b/aio-pro/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.6.2 + 1.6.3 ../smart-socket-parent diff --git a/benchmark/pom.xml b/benchmark/pom.xml index 4d8856fc8163159d1012ed62941ddd587abe95c8..d9a24ce1a3c33592c3d598cd09d354e85f4c9ee2 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -18,7 +18,7 @@ org.smartboot.socket aio-pro - 1.6.2 + 1.6.3 org.slf4j diff --git a/example/pom.xml b/example/pom.xml index b0928048658cdf3a06b4e72aa3fa3f8f4fc1cedd..e0f6555378e2e8229ce7e9c525a5d9833f108895 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -23,7 +23,7 @@ org.smartboot.socket aio-pro - 1.6.2 + 1.6.3 org.apache.commons diff --git a/pom.xml b/pom.xml index 140bf49b6eee643edec2ac740c0f5de3f6e7ac33..9e9dc2076b14e6522c815399a2dc5ae9cdb73a98 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ org.smartboot.socket smart-socket-parent - 1.6.2 + 1.6.3 2.6 diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml index d3d3ab57a5ba7d63b8914b91eb0d811adeb4e04a..d95f106f441eee227d554be770972f8960efa3c9 100644 --- a/smart-socket-parent/pom.xml +++ b/smart-socket-parent/pom.xml @@ -15,14 +15,14 @@ 4.0.0 org.smartboot.socket smart-socket-parent - 1.6.2 + 1.6.3 pom UTF-8 1.7.36 - 1.6.2 + 1.6.3 4.13.2