From 5fee56a42ee04666d9b85824ee4ac7031aeb9468 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 27 Feb 2023 19:54:04 +0800 Subject: [PATCH 01/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../EnhanceAsynchronousChannelGroup.java | 70 ++++++------------- ...nhanceAsynchronousServerSocketChannel.java | 2 +- .../EnhanceAsynchronousSocketChannel.java | 45 +++++------- 3 files changed, 39 insertions(+), 78 deletions(-) diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java index 1dc28913..956363fd 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java @@ -40,11 +40,11 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { /** * 写回调线程池 */ - private final ExecutorService writeExecutorService; + private final ExecutorService commonExecutorService; /** * write工作组 */ - private final Worker[] writeWorkers; + private final Worker[] commonWorkers; /** * read工作组 */ @@ -53,19 +53,12 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { * 线程池分配索引 */ private final AtomicInteger readIndex = new AtomicInteger(0); - private final AtomicInteger writeIndex = new AtomicInteger(0); + private final AtomicInteger commonIndex = new AtomicInteger(0); /** * 定时任务线程池 */ private final ScheduledThreadPoolExecutor scheduledExecutor; - /** - * 服务端accept线程池 - */ - private final ExecutorService acceptExecutorService; - /** - * accept工作组 - */ - private final Worker[] acceptWorkers; + private Worker futureWorker; /** * 同步IO线程池 @@ -95,27 +88,18 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { } //init threadPool for write and connect - final int writeThreadNum = 1; - final int acceptThreadNum = 1; - writeExecutorService = getSingleThreadExecutor("smart-socket:write"); - this.writeWorkers = new Worker[writeThreadNum]; - - for (int i = 0; i < writeThreadNum; i++) { - writeWorkers[i] = new Worker(Selector.open(), selectionKey -> { - EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); - //直接调用interestOps的效果比 removeOps(selectionKey, SelectionKey.OP_WRITE) 更好 - selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE); - asynchronousSocketChannel.doWrite(); - }); - writeExecutorService.execute(writeWorkers[i]); - } + final int commonThreadNum = 1; + commonExecutorService = getSingleThreadExecutor("smart-socket:common"); + this.commonWorkers = new Worker[commonThreadNum]; - //init threadPool for accept - acceptExecutorService = getSingleThreadExecutor("smart-socket:connect"); - acceptWorkers = new Worker[acceptThreadNum]; - for (int i = 0; i < acceptThreadNum; i++) { - acceptWorkers[i] = new Worker(Selector.open(), selectionKey -> { - if (selectionKey.isAcceptable()) { + for (int i = 0; i < commonThreadNum; i++) { + commonWorkers[i] = new Worker(Selector.open(), selectionKey -> { + if (selectionKey.isWritable()) { + EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); + //直接调用interestOps的效果比 removeOps(selectionKey, SelectionKey.OP_WRITE) 更好 + selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE); + asynchronousSocketChannel.doWrite(); + } else if (selectionKey.isAcceptable()) { EnhanceAsynchronousServerSocketChannel serverSocketChannel = (EnhanceAsynchronousServerSocketChannel) selectionKey.attachment(); serverSocketChannel.doAccept(); } else if (selectionKey.isConnectable()) { @@ -123,7 +107,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { asynchronousSocketChannel.doConnect(); } }); - acceptExecutorService.execute(acceptWorkers[i]); + commonExecutorService.execute(commonWorkers[i]); } scheduledExecutor = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "smart-socket:scheduled")); @@ -177,16 +161,8 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { return readWorkers[(readIndex.getAndIncrement() & Integer.MAX_VALUE) % readWorkers.length]; } - public Worker getWriteWorker() { - return writeWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % writeWorkers.length]; - } - - public Worker getAcceptWorker() { - return acceptWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % acceptWorkers.length]; - } - - public Worker getConnectWorker() { - return acceptWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % acceptWorkers.length]; + public Worker getCommonWorker() { + return commonWorkers[(commonIndex.getAndIncrement() & Integer.MAX_VALUE) % commonWorkers.length]; } public ScheduledThreadPoolExecutor getScheduledExecutor() { @@ -207,10 +183,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { public void shutdown() { running = false; readExecutorService.shutdown(); - writeExecutorService.shutdown(); - if (acceptExecutorService != null) { - acceptExecutorService.shutdown(); - } + commonExecutorService.shutdown(); if (futureExecutorService != null) { futureExecutorService.shutdown(); } @@ -221,10 +194,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { public void shutdownNow() { running = false; readExecutorService.shutdownNow(); - writeExecutorService.shutdownNow(); - if (acceptExecutorService != null) { - acceptExecutorService.shutdownNow(); - } + commonExecutorService.shutdownNow(); if (futureExecutorService != null) { futureExecutorService.shutdownNow(); } diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java index 5f398682..4516469e 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java @@ -43,7 +43,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc this.enhanceAsynchronousChannelGroup = enhanceAsynchronousChannelGroup; serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); - acceptWorker = enhanceAsynchronousChannelGroup.getAcceptWorker(); + acceptWorker = enhanceAsynchronousChannelGroup.getCommonWorker(); this.lowMemory = lowMemory; } diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index 95e80074..1134d952 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -49,11 +49,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { /** * 处理 write 事件的线程资源 */ - private final EnhanceAsynchronousChannelGroup.Worker writeWorker; - /** - * 处理 connect 事件的线程资源 - */ - private final EnhanceAsynchronousChannelGroup.Worker connectWorker; + private final EnhanceAsynchronousChannelGroup.Worker commonWorker; + /** * 用于接收 read 通道数据的缓冲区,经解码后腾出缓冲区以供下一批数据的读取 */ @@ -99,9 +96,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { private Object connectAttachment; private SelectionKey readSelectionKey; private SelectionKey readFutureSelectionKey; - private SelectionKey writeSelectionKey; + private SelectionKey commonSelectionKey; private SelectionKey writeFutureSelectionKey; - private SelectionKey connectSelectionKey; /** * 当前是否正在执行 write 操作 */ @@ -120,15 +116,14 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { private SocketAddress remote; private int writeInvoker; - private boolean lowMemory; + private final boolean lowMemory; public EnhanceAsynchronousSocketChannel(EnhanceAsynchronousChannelGroup group, SocketChannel channel, boolean lowMemory) throws IOException { super(group.provider()); this.group = group; this.channel = channel; readWorker = group.getReadWorker(); - writeWorker = group.getWriteWorker(); - connectWorker = group.getConnectWorker(); + commonWorker = group.getCommonWorker(); this.lowMemory = lowMemory; } @@ -150,18 +145,14 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { readFutureSelectionKey.cancel(); readFutureSelectionKey = null; } - if (writeSelectionKey != null) { - writeSelectionKey.cancel(); - writeSelectionKey = null; + if (commonSelectionKey != null) { + commonSelectionKey.cancel(); + commonSelectionKey = null; } if (writeFutureSelectionKey != null) { writeFutureSelectionKey.cancel(); writeFutureSelectionKey = null; } - if (connectSelectionKey != null) { - connectSelectionKey.cancel(); - connectSelectionKey = null; - } if (exception != null) { throw exception; } @@ -327,10 +318,10 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { Object attach = connectAttachment; resetConnect(); completionHandler.completed(null, attach); - } else if (connectSelectionKey == null) { - connectWorker.addRegister(selector -> { + } else if (commonSelectionKey == null) { + commonWorker.addRegister(selector -> { try { - connectSelectionKey = channel.register(selector, SelectionKey.OP_CONNECT, EnhanceAsynchronousSocketChannel.this); + commonSelectionKey = channel.register(selector, SelectionKey.OP_CONNECT, EnhanceAsynchronousSocketChannel.this); } catch (ClosedChannelException e) { connectCompletionHandler.failed(e, connectAttachment); } @@ -457,8 +448,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { } int invoker = 0; //防止无限递归导致堆栈溢出 - if (writeWorker.getWorkerThread() == Thread.currentThread()) { - invoker = ++writeWorker.invoker; + if (commonWorker.getWorkerThread() == Thread.currentThread()) { + invoker = ++commonWorker.invoker; } else if (readWorker.getWorkerThread() != Thread.currentThread()) { invoker = ++writeInvoker; } @@ -478,7 +469,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { //注册至异步线程 if (writeFuture != null && writeSize == 0) { - group.removeOps(writeSelectionKey, SelectionKey.OP_WRITE); + group.removeOps(commonSelectionKey, SelectionKey.OP_WRITE); group.registerFuture(selector -> { try { writeFutureSelectionKey = channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this); @@ -495,16 +486,16 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { Object attach = writeAttachment; resetWrite(); completionHandler.completed(writeSize, attach); - } else if (writeSelectionKey == null) { - writeWorker.addRegister(selector -> { + } else if (commonSelectionKey == null) { + commonWorker.addRegister(selector -> { try { - writeSelectionKey = channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this); + commonSelectionKey = channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this); } catch (ClosedChannelException e) { writeCompletionHandler.failed(e, writeAttachment); } }); } else { - group.interestOps(writeWorker, writeSelectionKey, SelectionKey.OP_WRITE); + group.interestOps(commonWorker, commonSelectionKey, SelectionKey.OP_WRITE); } } catch (Throwable e) { if (writeCompletionHandler == null) { -- Gitee From f8c2fc693e7aeada537af18dc76d8dc3121065c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 27 Feb 2023 20:21:50 +0800 Subject: [PATCH 02/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/enhance/ByteBufferArray.java | 40 ------------ .../EnhanceAsynchronousSocketChannel.java | 61 ++++--------------- 2 files changed, 13 insertions(+), 88 deletions(-) delete mode 100644 aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java deleted file mode 100644 index 1c8821df..00000000 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java +++ /dev/null @@ -1,40 +0,0 @@ -/******************************************************************************* - * Copyright (c) 2017-2021, org.smartboot. All rights reserved. - * project name: smart-socket - * file name: ByteBufferArray.java - * Date: 2021-07-29 - * Author: sandao (zhengjunweimail@163.com) - * - ******************************************************************************/ - -package org.smartboot.socket.enhance; - -import java.nio.ByteBuffer; - - -/** - * @author 三刀 - */ -final class ByteBufferArray { - private final ByteBuffer[] buffers; - private final int offset; - private final int length; - - public ByteBufferArray(ByteBuffer[] buffers, int offset, int length) { - this.buffers = buffers; - this.offset = offset; - this.length = length; - } - - public ByteBuffer[] getBuffers() { - return buffers; - } - - public int getOffset() { - return offset; - } - - public int getLength() { - return length; - } -} \ No newline at end of file diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index 1134d952..c1c7b1b3 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -55,18 +55,11 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { * 用于接收 read 通道数据的缓冲区,经解码后腾出缓冲区以供下一批数据的读取 */ private ByteBuffer readBuffer; - /** - * 用于接收 read 通道数据的缓冲区集合 - */ - private ByteBufferArray scatteringReadBuffer; /** * 存放待输出数据的缓冲区 */ private ByteBuffer writeBuffer; - /** - * 存放待输出数据的缓冲区集合 - */ - private ByteBufferArray gatheringWriteBuffer; + /** * read 回调事件处理器 */ @@ -225,16 +218,15 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { @Override public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read0(dst, null, timeout, unit, attachment, handler); + read0(dst, timeout, unit, attachment, handler); } - private void read0(ByteBuffer readBuffer, ByteBufferArray scattering, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + private void read0(ByteBuffer readBuffer, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { if (readPending) { throw new ReadPendingException(); } readPending = true; this.readBuffer = readBuffer; - this.scatteringReadBuffer = scattering; this.readAttachment = attachment; if (timeout > 0) { readFuture = new FutureCompletionHandler<>((CompletionHandler) handler, readAttachment); @@ -256,22 +248,21 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { @Override public void read(ByteBuffer[] dsts, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read0(null, new ByteBufferArray(dsts, offset, length), timeout, unit, attachment, handler); + throw new UnsupportedOperationException(); } @Override public void write(ByteBuffer src, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - write0(src, null, timeout, unit, attachment, handler); + write0(src, timeout, unit, attachment, handler); } - private void write0(ByteBuffer writeBuffer, ByteBufferArray gathering, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + private void write0(ByteBuffer writeBuffer, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { if (writePending) { throw new WritePendingException(); } writePending = true; this.writeBuffer = writeBuffer; - this.gatheringWriteBuffer = gathering; this.writeAttachment = attachment; if (timeout > 0) { writeFuture = new FutureCompletionHandler<>((CompletionHandler) handler, writeAttachment); @@ -287,13 +278,13 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { public Future write(ByteBuffer src) { FutureCompletionHandler writeFuture = new FutureCompletionHandler<>(); this.writeFuture = writeFuture; - write0(src, null, 0, TimeUnit.MILLISECONDS, null, writeFuture); + write0(src, 0, TimeUnit.MILLISECONDS, null, writeFuture); return writeFuture; } @Override public void write(ByteBuffer[] srcs, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - write0(null, new ByteBufferArray(srcs, offset, length), timeout, unit, attachment, handler); + throw new UnsupportedOperationException(); } @Override @@ -362,13 +353,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { long readSize = 0; boolean hasRemain = true; if (directRead) { - if (scatteringReadBuffer != null) { - readSize = channel.read(scatteringReadBuffer.getBuffers(), scatteringReadBuffer.getOffset(), scatteringReadBuffer.getLength()); - hasRemain = hasRemaining(scatteringReadBuffer); - } else { - readSize = channel.read(readBuffer); - hasRemain = readBuffer.hasRemaining(); - } + readSize = channel.read(readBuffer); + hasRemain = readBuffer.hasRemaining(); } //注册至异步线程 @@ -393,13 +379,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { if (readSize != 0 || !hasRemain) { CompletionHandler completionHandler = readCompletionHandler; Object attach = readAttachment; - ByteBufferArray scattering = scatteringReadBuffer; resetRead(); - if (scattering == null) { - completionHandler.completed((int) readSize, attach); - } else { - completionHandler.completed(readSize, attach); - } + completionHandler.completed((int) readSize, attach); if (!readPending && readSelectionKey != null) { group.removeOps(readSelectionKey, SelectionKey.OP_READ); @@ -436,7 +417,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { readCompletionHandler = null; readAttachment = null; readBuffer = null; - scatteringReadBuffer = null; } public void doWrite() { @@ -456,13 +436,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { int writeSize = 0; boolean hasRemain = true; if (invoker < EnhanceAsynchronousChannelGroup.MAX_INVOKER) { - if (gatheringWriteBuffer != null) { - writeSize = (int) channel.write(gatheringWriteBuffer.getBuffers(), gatheringWriteBuffer.getOffset(), gatheringWriteBuffer.getLength()); - hasRemain = hasRemaining(gatheringWriteBuffer); - } else { - writeSize = channel.write(writeBuffer); - hasRemain = writeBuffer.hasRemaining(); - } + writeSize = channel.write(writeBuffer); + hasRemain = writeBuffer.hasRemaining(); } else { writeInvoker = 0; } @@ -511,22 +486,12 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { } } - private boolean hasRemaining(ByteBufferArray scattering) { - for (int i = 0; i < scattering.getLength(); i++) { - if (scattering.getBuffers()[scattering.getOffset() + i].hasRemaining()) { - return true; - } - } - return false; - } - private void resetWrite() { writePending = false; writeFuture = null; writeAttachment = null; writeCompletionHandler = null; writeBuffer = null; - gatheringWriteBuffer = null; } @Override -- Gitee From c2cfa6dd800f6d2e6662b5565a53761e41453a06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 27 Feb 2023 20:37:26 +0800 Subject: [PATCH 03/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/enhance/EnhanceAsynchronousSocketChannel.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index c1c7b1b3..f752f038 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -351,10 +351,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { boolean directRead = direct || (Thread.currentThread() == readWorker.getWorkerThread() && readWorker.invoker++ < EnhanceAsynchronousChannelGroup.MAX_INVOKER); long readSize = 0; - boolean hasRemain = true; if (directRead) { readSize = channel.read(readBuffer); - hasRemain = readBuffer.hasRemaining(); } //注册至异步线程 @@ -376,7 +374,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { readCompletionHandler.completed(EnhanceAsynchronousChannelProvider.READ_MONITOR_SIGNAL, readAttachment); } - if (readSize != 0 || !hasRemain) { + if (readSize != 0 || !readBuffer.hasRemaining()) { CompletionHandler completionHandler = readCompletionHandler; Object attach = readAttachment; resetRead(); @@ -434,10 +432,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { invoker = ++writeInvoker; } int writeSize = 0; - boolean hasRemain = true; if (invoker < EnhanceAsynchronousChannelGroup.MAX_INVOKER) { writeSize = channel.write(writeBuffer); - hasRemain = writeBuffer.hasRemaining(); } else { writeInvoker = 0; } @@ -456,7 +452,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { return; } - if (writeSize != 0 || !hasRemain) { + if (writeSize != 0 || !writeBuffer.hasRemaining()) { CompletionHandler completionHandler = writeCompletionHandler; Object attach = writeAttachment; resetWrite(); -- Gitee From 001d4a1bb9872251c444bf65b79547db8ff0c05e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 27 Feb 2023 21:00:36 +0800 Subject: [PATCH 04/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../EnhanceAsynchronousChannelGroup.java | 2 +- .../EnhanceAsynchronousSocketChannel.java | 90 +++++++------------ 2 files changed, 32 insertions(+), 60 deletions(-) diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java index 956363fd..5f143e30 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java @@ -221,7 +221,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { /** * 当前Worker绑定的Selector */ - private final Selector selector; + final Selector selector; private final Consumer consumer; private final ConcurrentLinkedQueue> consumers = new ConcurrentLinkedQueue<>(); int invoker = 0; diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index f752f038..d9a16d57 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -74,7 +74,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { private CompletionHandler connectCompletionHandler; private FutureCompletionHandler connectFuture; private FutureCompletionHandler readFuture; - private FutureCompletionHandler writeFuture; /** * read 回调事件关联绑定的附件对象 */ @@ -89,8 +88,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { private Object connectAttachment; private SelectionKey readSelectionKey; private SelectionKey readFutureSelectionKey; - private SelectionKey commonSelectionKey; - private SelectionKey writeFutureSelectionKey; /** * 当前是否正在执行 write 操作 */ @@ -124,7 +121,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { public void close() throws IOException { IOException exception = null; try { - if (channel != null && channel.isOpen()) { + if (channel.isOpen()) { channel.close(); } } catch (IOException e) { @@ -138,13 +135,9 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { readFutureSelectionKey.cancel(); readFutureSelectionKey = null; } - if (commonSelectionKey != null) { - commonSelectionKey.cancel(); - commonSelectionKey = null; - } - if (writeFutureSelectionKey != null) { - writeFutureSelectionKey.cancel(); - writeFutureSelectionKey = null; + SelectionKey key = channel.keyFor(commonWorker.selector); + if (key != null) { + key.cancel(); } if (exception != null) { throw exception; @@ -253,10 +246,13 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { @Override public void write(ByteBuffer src, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - write0(src, timeout, unit, attachment, handler); + if (timeout > 0) { + throw new UnsupportedOperationException(); + } + write0(src, attachment, handler); } - private void write0(ByteBuffer writeBuffer, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + private void write0(ByteBuffer writeBuffer, A attachment, CompletionHandler handler) { if (writePending) { throw new WritePendingException(); } @@ -264,22 +260,13 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { writePending = true; this.writeBuffer = writeBuffer; this.writeAttachment = attachment; - if (timeout > 0) { - writeFuture = new FutureCompletionHandler<>((CompletionHandler) handler, writeAttachment); - writeCompletionHandler = (CompletionHandler) writeFuture; - group.getScheduledExecutor().schedule(writeFuture, timeout, unit); - } else { - this.writeCompletionHandler = (CompletionHandler) handler; - } + this.writeCompletionHandler = (CompletionHandler) handler; doWrite(); } @Override public Future write(ByteBuffer src) { - FutureCompletionHandler writeFuture = new FutureCompletionHandler<>(); - this.writeFuture = writeFuture; - write0(src, 0, TimeUnit.MILLISECONDS, null, writeFuture); - return writeFuture; + throw new UnsupportedOperationException(); } @Override @@ -309,16 +296,14 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { Object attach = connectAttachment; resetConnect(); completionHandler.completed(null, attach); - } else if (commonSelectionKey == null) { + } else { commonWorker.addRegister(selector -> { try { - commonSelectionKey = channel.register(selector, SelectionKey.OP_CONNECT, EnhanceAsynchronousSocketChannel.this); + channel.register(selector, SelectionKey.OP_CONNECT, EnhanceAsynchronousSocketChannel.this); } catch (ClosedChannelException e) { connectCompletionHandler.failed(e, connectAttachment); } }); - } else { - throw new IOException("unKnow exception"); } } catch (IOException e) { connectCompletionHandler.failed(e, connectAttachment); @@ -351,8 +336,10 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { boolean directRead = direct || (Thread.currentThread() == readWorker.getWorkerThread() && readWorker.invoker++ < EnhanceAsynchronousChannelGroup.MAX_INVOKER); long readSize = 0; + boolean hasRemain = true; if (directRead) { readSize = channel.read(readBuffer); + hasRemain = readBuffer.hasRemaining(); } //注册至异步线程 @@ -374,7 +361,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { readCompletionHandler.completed(EnhanceAsynchronousChannelProvider.READ_MONITOR_SIGNAL, readAttachment); } - if (readSize != 0 || !readBuffer.hasRemaining()) { + if (readSize != 0 || !hasRemain) { CompletionHandler completionHandler = readCompletionHandler; Object attach = readAttachment; resetRead(); @@ -419,11 +406,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { public void doWrite() { try { - //此前通过Future调用,且触发了cancel - if (writeFuture != null && writeFuture.isDone()) { - resetWrite(); - return; - } int invoker = 0; //防止无限递归导致堆栈溢出 if (commonWorker.getWorkerThread() == Thread.currentThread()) { @@ -432,41 +414,32 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { invoker = ++writeInvoker; } int writeSize = 0; + boolean hasRemain = true; if (invoker < EnhanceAsynchronousChannelGroup.MAX_INVOKER) { writeSize = channel.write(writeBuffer); + hasRemain = writeBuffer.hasRemaining(); } else { writeInvoker = 0; } - //注册至异步线程 - if (writeFuture != null && writeSize == 0) { - group.removeOps(commonSelectionKey, SelectionKey.OP_WRITE); - group.registerFuture(selector -> { - try { - writeFutureSelectionKey = channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this); - } catch (ClosedChannelException e) { - e.printStackTrace(); - doWrite(); - } - }, SelectionKey.OP_WRITE); - return; - } - - if (writeSize != 0 || !writeBuffer.hasRemaining()) { + if (writeSize != 0 || !hasRemain) { CompletionHandler completionHandler = writeCompletionHandler; Object attach = writeAttachment; resetWrite(); completionHandler.completed(writeSize, attach); - } else if (commonSelectionKey == null) { - commonWorker.addRegister(selector -> { - try { - commonSelectionKey = channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this); - } catch (ClosedChannelException e) { - writeCompletionHandler.failed(e, writeAttachment); - } - }); } else { - group.interestOps(commonWorker, commonSelectionKey, SelectionKey.OP_WRITE); + SelectionKey commonSelectionKey = channel.keyFor(commonWorker.selector); + if (commonSelectionKey == null) { + commonWorker.addRegister(selector -> { + try { + channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this); + } catch (ClosedChannelException e) { + writeCompletionHandler.failed(e, writeAttachment); + } + }); + } else { + group.interestOps(commonWorker, commonSelectionKey, SelectionKey.OP_WRITE); + } } } catch (Throwable e) { if (writeCompletionHandler == null) { @@ -484,7 +457,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { private void resetWrite() { writePending = false; - writeFuture = null; writeAttachment = null; writeCompletionHandler = null; writeBuffer = null; -- Gitee From 281d1d568faa31f53f9feec8ca00c01c2a1ac264 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 27 Feb 2023 21:07:45 +0800 Subject: [PATCH 05/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/enhance/EnhanceAsynchronousChannelGroup.java | 2 +- .../socket/enhance/EnhanceAsynchronousSocketChannel.java | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java index 5f143e30..aeabd2ef 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java @@ -104,7 +104,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { serverSocketChannel.doAccept(); } else if (selectionKey.isConnectable()) { EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); - asynchronousSocketChannel.doConnect(); + asynchronousSocketChannel.doConnect(null); } }); commonExecutorService.execute(commonWorkers[i]); diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index d9a16d57..fe6086fd 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -100,10 +100,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { * 当前是否正在执行 connect 操作 */ private boolean connectionPending; - /** - * 远程连接的地址 - */ - private SocketAddress remote; private int writeInvoker; private final boolean lowMemory; @@ -197,8 +193,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { connectionPending = true; this.connectAttachment = attachment; this.connectCompletionHandler = (CompletionHandler) handler; - this.remote = remote; - doConnect(); + doConnect(remote); } @Override @@ -279,7 +274,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { return channel.getLocalAddress(); } - public void doConnect() { + public void doConnect(SocketAddress remote) { try { //此前通过Future调用,且触发了cancel if (connectFuture != null && connectFuture.isDone()) { -- Gitee From 09e23f60b708f7e455aa2a5b1fb6bfcbc2b55979 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 27 Feb 2023 21:17:04 +0800 Subject: [PATCH 06/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/enhance/EnhanceAsynchronousServerSocketChannel.java | 2 +- .../socket/enhance/EnhanceAsynchronousSocketChannel.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java index 4516469e..6d60c9e7 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java @@ -96,7 +96,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc } if (socketChannel != null) { EnhanceAsynchronousSocketChannel asynchronousSocketChannel = new EnhanceAsynchronousSocketChannel(enhanceAsynchronousChannelGroup, socketChannel, lowMemory); - socketChannel.configureBlocking(false); + socketChannel.finishConnect(); CompletionHandler completionHandler = acceptCompletionHandler; Object attach = attachment; diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index fe6086fd..8d1ed21e 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -111,6 +111,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { readWorker = group.getReadWorker(); commonWorker = group.getCommonWorker(); this.lowMemory = lowMemory; + channel.configureBlocking(false); } @Override @@ -285,7 +286,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { if (connected || channel.connect(remote)) { connected = channel.finishConnect(); } - channel.configureBlocking(false); if (connected) { CompletionHandler completionHandler = connectCompletionHandler; Object attach = connectAttachment; -- Gitee From 92007a54229efb6eb9ff6073172c2ad6e246faa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 27 Feb 2023 21:38:15 +0800 Subject: [PATCH 07/14] v1.6.3-SNAPSHOT --- aio-core/pom.xml | 2 +- .../java/org/smartboot/socket/transport/IoServerConfig.java | 2 +- aio-enhance/pom.xml | 2 +- aio-pro/pom.xml | 2 +- benchmark/pom.xml | 2 +- example/pom.xml | 2 +- pom.xml | 2 +- smart-socket-parent/pom.xml | 4 ++-- 8 files changed, 9 insertions(+), 9 deletions(-) diff --git a/aio-core/pom.xml b/aio-core/pom.xml index 3d528b4b..115ea3b1 100644 --- a/aio-core/pom.xml +++ b/aio-core/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.6.2 + 1.6.3-SNAPSHOT ../smart-socket-parent diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index ef80c268..7474327c 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -39,7 +39,7 @@ final class IoServerConfig { /** * 当前smart-socket版本号 */ - public static final String VERSION = "v1.6.2"; + public static final String VERSION = "v1.6.3-SNAPSHOT"; /** * 消息体缓存大小,字节 diff --git a/aio-enhance/pom.xml b/aio-enhance/pom.xml index 90e3d17b..00e6596a 100644 --- a/aio-enhance/pom.xml +++ b/aio-enhance/pom.xml @@ -14,7 +14,7 @@ smart-socket-parent org.smartboot.socket - 1.6.2 + 1.6.3-SNAPSHOT ../smart-socket-parent 4.0.0 diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml index facd8a13..f71ec98b 100644 --- a/aio-pro/pom.xml +++ b/aio-pro/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.6.2 + 1.6.3-SNAPSHOT ../smart-socket-parent diff --git a/benchmark/pom.xml b/benchmark/pom.xml index 4d8856fc..8b96c8b6 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -18,7 +18,7 @@ org.smartboot.socket aio-pro - 1.6.2 + 1.6.3-SNAPSHOT org.slf4j diff --git a/example/pom.xml b/example/pom.xml index b0928048..c95e0dfa 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -23,7 +23,7 @@ org.smartboot.socket aio-pro - 1.6.2 + 1.6.3-SNAPSHOT org.apache.commons diff --git a/pom.xml b/pom.xml index 140bf49b..90e0e684 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ org.smartboot.socket smart-socket-parent - 1.6.2 + 1.6.3-SNAPSHOT 2.6 diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml index d3d3ab57..9ad0da1c 100644 --- a/smart-socket-parent/pom.xml +++ b/smart-socket-parent/pom.xml @@ -15,14 +15,14 @@ 4.0.0 org.smartboot.socket smart-socket-parent - 1.6.2 + 1.6.3-SNAPSHOT pom UTF-8 1.7.36 - 1.6.2 + 1.6.3-SNAPSHOT 4.13.2 -- Gitee From beef4496999641bb161e8d406e619764e478211d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Mon, 27 Feb 2023 22:36:45 +0800 Subject: [PATCH 08/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../EnhanceAsynchronousChannelGroup.java | 18 ++++--------- .../EnhanceAsynchronousSocketChannel.java | 26 +++++++------------ .../enhance/FutureCompletionHandler.java | 25 +----------------- 3 files changed, 15 insertions(+), 54 deletions(-) diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java index aeabd2ef..228035dd 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java @@ -17,7 +17,6 @@ import java.nio.channels.spi.AsynchronousChannelProvider; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -54,10 +53,6 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { */ private final AtomicInteger readIndex = new AtomicInteger(0); private final AtomicInteger commonIndex = new AtomicInteger(0); - /** - * 定时任务线程池 - */ - private final ScheduledThreadPoolExecutor scheduledExecutor; private Worker futureWorker; /** @@ -105,12 +100,15 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { } else if (selectionKey.isConnectable()) { EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); asynchronousSocketChannel.doConnect(null); + } else if (selectionKey.isReadable()) { + //仅同步read会用到此线程资源 + EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); + removeOps(selectionKey, SelectionKey.OP_READ); + asynchronousSocketChannel.doRead(true); } }); commonExecutorService.execute(commonWorkers[i]); } - - scheduledExecutor = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "smart-socket:scheduled")); } /** @@ -165,10 +163,6 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { return commonWorkers[(commonIndex.getAndIncrement() & Integer.MAX_VALUE) % commonWorkers.length]; } - public ScheduledThreadPoolExecutor getScheduledExecutor() { - return scheduledExecutor; - } - @Override public boolean isShutdown() { return readExecutorService.isShutdown(); @@ -187,7 +181,6 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { if (futureExecutorService != null) { futureExecutorService.shutdown(); } - scheduledExecutor.shutdown(); } @Override @@ -198,7 +191,6 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { if (futureExecutorService != null) { futureExecutorService.shutdownNow(); } - scheduledExecutor.shutdownNow(); } @Override diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index 8d1ed21e..32158e84 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -87,7 +87,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { */ private Object connectAttachment; private SelectionKey readSelectionKey; - private SelectionKey readFutureSelectionKey; /** * 当前是否正在执行 write 操作 */ @@ -128,10 +127,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { readSelectionKey.cancel(); readSelectionKey = null; } - if (readFutureSelectionKey != null) { - readFutureSelectionKey.cancel(); - readFutureSelectionKey = null; - } SelectionKey key = channel.keyFor(commonWorker.selector); if (key != null) { key.cancel(); @@ -207,23 +202,20 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { @Override public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { - read0(dst, timeout, unit, attachment, handler); + if (timeout > 0) { + throw new UnsupportedOperationException(); + } + read0(dst, attachment, handler); } - private void read0(ByteBuffer readBuffer, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) { + private void read0(ByteBuffer readBuffer, A attachment, CompletionHandler handler) { if (readPending) { throw new ReadPendingException(); } readPending = true; this.readBuffer = readBuffer; this.readAttachment = attachment; - if (timeout > 0) { - readFuture = new FutureCompletionHandler<>((CompletionHandler) handler, readAttachment); - readCompletionHandler = (CompletionHandler) readFuture; - group.getScheduledExecutor().schedule(readFuture, timeout, unit); - } else { - this.readCompletionHandler = (CompletionHandler) handler; - } + this.readCompletionHandler = (CompletionHandler) handler; doRead(readFuture != null); } @@ -340,14 +332,14 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { //注册至异步线程 if (readFuture != null && readSize == 0) { group.removeOps(readSelectionKey, SelectionKey.OP_READ); - group.registerFuture(selector -> { + commonWorker.addRegister(selector -> { try { - readFutureSelectionKey = channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this); + channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this); } catch (ClosedChannelException e) { e.printStackTrace(); doRead(true); } - }, SelectionKey.OP_READ); + }); return; } //释放内存 diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java index b24de780..0cb228bc 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java @@ -15,22 +15,12 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -final class FutureCompletionHandler implements CompletionHandler, Future, Runnable { - private CompletionHandler completionHandler; - private A attach; +final class FutureCompletionHandler implements CompletionHandler, Future { private V result; private boolean done = false; private boolean cancel = false; private Throwable exception; - public FutureCompletionHandler(CompletionHandler completionHandler, A attach) { - this.completionHandler = completionHandler; - this.attach = attach; - } - - public FutureCompletionHandler() { - } - @Override public void completed(V result, A selectionKey) { this.result = result; @@ -38,18 +28,12 @@ final class FutureCompletionHandler implements CompletionHandler, Fu synchronized (this) { this.notify(); } - if (completionHandler != null) { - completionHandler.completed(result, attach); - } } @Override public void failed(Throwable exc, A attachment) { exception = exc; done = true; - if (completionHandler != null) { - completionHandler.failed(exc, attachment); - } } @Override @@ -101,11 +85,4 @@ final class FutureCompletionHandler implements CompletionHandler, Fu throw new TimeoutException(); } - @Override - public synchronized void run() { - if (!done) { - cancel(true); - completionHandler.failed(new TimeoutException(), attach); - } - } } -- Gitee From 9d5d0b83e0d1be3102e832d92aba46b31ddfc83c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Tue, 28 Feb 2023 09:24:23 +0800 Subject: [PATCH 09/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../EnhanceAsynchronousChannelGroup.java | 39 +------------------ 1 file changed, 1 insertion(+), 38 deletions(-) diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java index 228035dd..326e4385 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java @@ -54,11 +54,6 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { private final AtomicInteger readIndex = new AtomicInteger(0); private final AtomicInteger commonIndex = new AtomicInteger(0); - private Worker futureWorker; - /** - * 同步IO线程池 - */ - private ExecutorService futureExecutorService; /** * group运行状态 */ @@ -102,6 +97,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { asynchronousSocketChannel.doConnect(null); } else if (selectionKey.isReadable()) { //仅同步read会用到此线程资源 + System.out.println("........"); EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); removeOps(selectionKey, SelectionKey.OP_READ); asynchronousSocketChannel.doRead(true); @@ -111,33 +107,6 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { } } - /** - * 同步IO注册异步线程,防止主IO线程阻塞 - */ - public synchronized void registerFuture(Consumer register, int opType) throws IOException { - if (futureWorker == null) { - futureExecutorService = getSingleThreadExecutor("smart-socket:future"); - futureWorker = new Worker(Selector.open(), selectionKey -> { - EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); - switch (opType) { - case SelectionKey.OP_READ: - removeOps(selectionKey, SelectionKey.OP_READ); - asynchronousSocketChannel.doRead(true); - break; - case SelectionKey.OP_WRITE: - removeOps(selectionKey, SelectionKey.OP_WRITE); - asynchronousSocketChannel.doWrite(); - break; - default: - throw new UnsupportedOperationException("unSupport opType: " + opType); - } - - }); - futureExecutorService.execute(futureWorker); - } - futureWorker.addRegister(register); - } - private ThreadPoolExecutor getSingleThreadExecutor(final String prefix) { return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), r -> new Thread(r, prefix)); @@ -178,9 +147,6 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { running = false; readExecutorService.shutdown(); commonExecutorService.shutdown(); - if (futureExecutorService != null) { - futureExecutorService.shutdown(); - } } @Override @@ -188,9 +154,6 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { running = false; readExecutorService.shutdownNow(); commonExecutorService.shutdownNow(); - if (futureExecutorService != null) { - futureExecutorService.shutdownNow(); - } } @Override -- Gitee From 3b8a0c5541cce99ac86b8b5e57fb61461c1a7b08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Tue, 28 Feb 2023 22:50:06 +0800 Subject: [PATCH 10/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/transport/AioQuickServer.java | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java index b537cf52..15209135 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java @@ -112,10 +112,14 @@ public final class AioQuickServer { * @throws IOException IO异常 */ public void start() throws IOException { - if (config.isBannerEnabled()) { - System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket " + "::\t(" + IoServerConfig.VERSION + ") [port: " + config.getPort() + ", threadNum:" + config.getThreadNum() + "]"); - } - start0(); + asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(lowMemory).openAsynchronousChannelGroup(config.getThreadNum(), new ThreadFactory() { + private byte index = 0; + + @Override + public Thread newThread(Runnable r) { + return bufferPool.newThread(r, "smart-socket:Thread-" + (++index)); + } + }); } /** @@ -123,20 +127,16 @@ public final class AioQuickServer { * * @throws IOException IO异常 */ - private void start0() throws IOException { + public void start(AsynchronousChannelGroup asynchronousChannelGroup) throws IOException { + if (config.isBannerEnabled()) { + System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket " + "::\t(" + IoServerConfig.VERSION + ") [port: " + config.getPort() + ", threadNum:" + config.getThreadNum() + "]"); + } try { if (bufferPool == null) { this.bufferPool = config.getBufferFactory().create(); this.innerBufferPool = bufferPool; } - asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(lowMemory).openAsynchronousChannelGroup(config.getThreadNum(), new ThreadFactory() { - private byte index = 0; - @Override - public Thread newThread(Runnable r) { - return bufferPool.newThread(r, "smart-socket:Thread-" + (++index)); - } - }); this.serverSocketChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup); //set socket options if (config.getSocketOptions() != null) { @@ -224,18 +224,20 @@ public final class AioQuickServer { e.printStackTrace(); } - if (!asynchronousChannelGroup.isTerminated()) { + if (asynchronousChannelGroup != null) { + if (!asynchronousChannelGroup.isTerminated()) { + try { + asynchronousChannelGroup.shutdownNow(); + } catch (IOException e) { + e.printStackTrace(); + } + } try { - asynchronousChannelGroup.shutdownNow(); - } catch (IOException e) { + asynchronousChannelGroup.awaitTermination(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { e.printStackTrace(); } } - try { - asynchronousChannelGroup.awaitTermination(3, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } if (innerBufferPool != null) { innerBufferPool.release(); } -- Gitee From 36ae73d4493841891b6f5901f7f3648a064d2415 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 3 Mar 2023 16:52:07 +0800 Subject: [PATCH 11/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/socket/transport/AioQuickServer.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java index 15209135..83c1f384 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java @@ -112,6 +112,10 @@ public final class AioQuickServer { * @throws IOException IO异常 */ public void start() throws IOException { + if (bufferPool == null) { + this.bufferPool = config.getBufferFactory().create(); + this.innerBufferPool = bufferPool; + } asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(lowMemory).openAsynchronousChannelGroup(config.getThreadNum(), new ThreadFactory() { private byte index = 0; @@ -120,6 +124,7 @@ public final class AioQuickServer { return bufferPool.newThread(r, "smart-socket:Thread-" + (++index)); } }); + start(asynchronousChannelGroup); } /** -- Gitee From 94153c63540419a785c6408aafe223cbe32c902a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 3 Mar 2023 21:21:37 +0800 Subject: [PATCH 12/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/enhance/EnhanceAsynchronousServerSocketChannel.java | 3 ++- .../socket/enhance/EnhanceAsynchronousSocketChannel.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java index 6d60c9e7..b57b235c 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java @@ -96,7 +96,8 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc } if (socketChannel != null) { EnhanceAsynchronousSocketChannel asynchronousSocketChannel = new EnhanceAsynchronousSocketChannel(enhanceAsynchronousChannelGroup, socketChannel, lowMemory); - + //这行代码不要乱动 + socketChannel.configureBlocking(false); socketChannel.finishConnect(); CompletionHandler completionHandler = acceptCompletionHandler; Object attach = attachment; diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java index 32158e84..ee1b4365 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java @@ -110,7 +110,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { readWorker = group.getReadWorker(); commonWorker = group.getCommonWorker(); this.lowMemory = lowMemory; - channel.configureBlocking(false); } @Override @@ -278,6 +277,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel { if (connected || channel.connect(remote)) { connected = channel.finishConnect(); } + //这行代码不要乱动 + channel.configureBlocking(false); if (connected) { CompletionHandler completionHandler = connectCompletionHandler; Object attach = connectAttachment; -- Gitee From 4bd2aa7a806f4cb0646c2654040ee8f08be7088c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 3 Mar 2023 22:53:20 +0800 Subject: [PATCH 13/14] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/enhance/EnhanceAsynchronousChannelGroup.java | 1 - 1 file changed, 1 deletion(-) diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java index 326e4385..6dddae16 100644 --- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java +++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java @@ -97,7 +97,6 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup { asynchronousSocketChannel.doConnect(null); } else if (selectionKey.isReadable()) { //仅同步read会用到此线程资源 - System.out.println("........"); EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment(); removeOps(selectionKey, SelectionKey.OP_READ); asynchronousSocketChannel.doRead(true); -- Gitee From 1814f973fecdea6e4c57e5261f36a862388025ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=89=E5=88=80?= Date: Fri, 3 Mar 2023 22:56:14 +0800 Subject: [PATCH 14/14] =?UTF-8?q?=E5=8F=91=E5=B8=83=E6=AD=A3=E5=BC=8F?= =?UTF-8?q?=E7=89=881.6.3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aio-core/pom.xml | 2 +- .../java/org/smartboot/socket/transport/IoServerConfig.java | 2 +- aio-enhance/pom.xml | 2 +- aio-pro/pom.xml | 2 +- benchmark/pom.xml | 2 +- example/pom.xml | 2 +- pom.xml | 2 +- smart-socket-parent/pom.xml | 4 ++-- 8 files changed, 9 insertions(+), 9 deletions(-) diff --git a/aio-core/pom.xml b/aio-core/pom.xml index 115ea3b1..b80ce55d 100644 --- a/aio-core/pom.xml +++ b/aio-core/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.6.3-SNAPSHOT + 1.6.3 ../smart-socket-parent diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index 7474327c..08ea4e3e 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -39,7 +39,7 @@ final class IoServerConfig { /** * 当前smart-socket版本号 */ - public static final String VERSION = "v1.6.3-SNAPSHOT"; + public static final String VERSION = "v1.6.3"; /** * 消息体缓存大小,字节 diff --git a/aio-enhance/pom.xml b/aio-enhance/pom.xml index 00e6596a..d4eb5cae 100644 --- a/aio-enhance/pom.xml +++ b/aio-enhance/pom.xml @@ -14,7 +14,7 @@ smart-socket-parent org.smartboot.socket - 1.6.3-SNAPSHOT + 1.6.3 ../smart-socket-parent 4.0.0 diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml index f71ec98b..88e91216 100644 --- a/aio-pro/pom.xml +++ b/aio-pro/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.6.3-SNAPSHOT + 1.6.3 ../smart-socket-parent diff --git a/benchmark/pom.xml b/benchmark/pom.xml index 8b96c8b6..d9a24ce1 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -18,7 +18,7 @@ org.smartboot.socket aio-pro - 1.6.3-SNAPSHOT + 1.6.3 org.slf4j diff --git a/example/pom.xml b/example/pom.xml index c95e0dfa..e0f65553 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -23,7 +23,7 @@ org.smartboot.socket aio-pro - 1.6.3-SNAPSHOT + 1.6.3 org.apache.commons diff --git a/pom.xml b/pom.xml index 90e0e684..9e9dc207 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ org.smartboot.socket smart-socket-parent - 1.6.3-SNAPSHOT + 1.6.3 2.6 diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml index 9ad0da1c..d95f106f 100644 --- a/smart-socket-parent/pom.xml +++ b/smart-socket-parent/pom.xml @@ -15,14 +15,14 @@ 4.0.0 org.smartboot.socket smart-socket-parent - 1.6.3-SNAPSHOT + 1.6.3 pom UTF-8 1.7.36 - 1.6.3-SNAPSHOT + 1.6.3 4.13.2 -- Gitee