diff --git a/aio-core/pom.xml b/aio-core/pom.xml
index 3d528b4b3454cbeab17e847f63c7f29d28e9ace4..b80ce55d11f3b56a51e3ee6dc2925d16d0ee7fa7 100644
--- a/aio-core/pom.xml
+++ b/aio-core/pom.xml
@@ -19,7 +19,7 @@
org.smartboot.socket
smart-socket-parent
- 1.6.2
+ 1.6.3
../smart-socket-parent
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java
index b537cf526279c16e75c348d689af138c40679c20..83c1f384b4ef829092881c0d68857b3381874c5d 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java
@@ -112,10 +112,19 @@ public final class AioQuickServer {
* @throws IOException IO异常
*/
public void start() throws IOException {
- if (config.isBannerEnabled()) {
- System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket " + "::\t(" + IoServerConfig.VERSION + ") [port: " + config.getPort() + ", threadNum:" + config.getThreadNum() + "]");
+ if (bufferPool == null) {
+ this.bufferPool = config.getBufferFactory().create();
+ this.innerBufferPool = bufferPool;
}
- start0();
+ asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(lowMemory).openAsynchronousChannelGroup(config.getThreadNum(), new ThreadFactory() {
+ private byte index = 0;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return bufferPool.newThread(r, "smart-socket:Thread-" + (++index));
+ }
+ });
+ start(asynchronousChannelGroup);
}
/**
@@ -123,20 +132,16 @@ public final class AioQuickServer {
*
* @throws IOException IO异常
*/
- private void start0() throws IOException {
+ public void start(AsynchronousChannelGroup asynchronousChannelGroup) throws IOException {
+ if (config.isBannerEnabled()) {
+ System.out.println(IoServerConfig.BANNER + "\r\n :: smart-socket " + "::\t(" + IoServerConfig.VERSION + ") [port: " + config.getPort() + ", threadNum:" + config.getThreadNum() + "]");
+ }
try {
if (bufferPool == null) {
this.bufferPool = config.getBufferFactory().create();
this.innerBufferPool = bufferPool;
}
- asynchronousChannelGroup = new EnhanceAsynchronousChannelProvider(lowMemory).openAsynchronousChannelGroup(config.getThreadNum(), new ThreadFactory() {
- private byte index = 0;
- @Override
- public Thread newThread(Runnable r) {
- return bufferPool.newThread(r, "smart-socket:Thread-" + (++index));
- }
- });
this.serverSocketChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup);
//set socket options
if (config.getSocketOptions() != null) {
@@ -224,18 +229,20 @@ public final class AioQuickServer {
e.printStackTrace();
}
- if (!asynchronousChannelGroup.isTerminated()) {
+ if (asynchronousChannelGroup != null) {
+ if (!asynchronousChannelGroup.isTerminated()) {
+ try {
+ asynchronousChannelGroup.shutdownNow();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
try {
- asynchronousChannelGroup.shutdownNow();
- } catch (IOException e) {
+ asynchronousChannelGroup.awaitTermination(3, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
e.printStackTrace();
}
}
- try {
- asynchronousChannelGroup.awaitTermination(3, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
if (innerBufferPool != null) {
innerBufferPool.release();
}
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
index ef80c2687afad7f3a8ef690177d96e339c5369f9..08ea4e3ebf30f09c5013963465721ad46b91673c 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
@@ -39,7 +39,7 @@ final class IoServerConfig {
/**
* 当前smart-socket版本号
*/
- public static final String VERSION = "v1.6.2";
+ public static final String VERSION = "v1.6.3";
/**
* 消息体缓存大小,字节
diff --git a/aio-enhance/pom.xml b/aio-enhance/pom.xml
index 90e3d17b775fb6183b858c116555f4b9d3d9e44a..d4eb5cae7cda376a459f46ea1f043d99ae0e88c2 100644
--- a/aio-enhance/pom.xml
+++ b/aio-enhance/pom.xml
@@ -14,7 +14,7 @@
smart-socket-parent
org.smartboot.socket
- 1.6.2
+ 1.6.3
../smart-socket-parent
4.0.0
diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java
deleted file mode 100644
index 1c8821dfe08c67763e3e5df892614e57e19b9949..0000000000000000000000000000000000000000
--- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/ByteBufferArray.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2017-2021, org.smartboot. All rights reserved.
- * project name: smart-socket
- * file name: ByteBufferArray.java
- * Date: 2021-07-29
- * Author: sandao (zhengjunweimail@163.com)
- *
- ******************************************************************************/
-
-package org.smartboot.socket.enhance;
-
-import java.nio.ByteBuffer;
-
-
-/**
- * @author 三刀
- */
-final class ByteBufferArray {
- private final ByteBuffer[] buffers;
- private final int offset;
- private final int length;
-
- public ByteBufferArray(ByteBuffer[] buffers, int offset, int length) {
- this.buffers = buffers;
- this.offset = offset;
- this.length = length;
- }
-
- public ByteBuffer[] getBuffers() {
- return buffers;
- }
-
- public int getOffset() {
- return offset;
- }
-
- public int getLength() {
- return length;
- }
-}
\ No newline at end of file
diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java
index 1dc28913262412d32c270bcb579e598fecac33f3..6dddae16d857a3d0a555fc3eaae49fe77d1b3fb8 100644
--- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java
+++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousChannelGroup.java
@@ -17,7 +17,6 @@ import java.nio.channels.spi.AsynchronousChannelProvider;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -40,11 +39,11 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup {
/**
* 写回调线程池
*/
- private final ExecutorService writeExecutorService;
+ private final ExecutorService commonExecutorService;
/**
* write工作组
*/
- private final Worker[] writeWorkers;
+ private final Worker[] commonWorkers;
/**
* read工作组
*/
@@ -53,24 +52,8 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup {
* 线程池分配索引
*/
private final AtomicInteger readIndex = new AtomicInteger(0);
- private final AtomicInteger writeIndex = new AtomicInteger(0);
- /**
- * 定时任务线程池
- */
- private final ScheduledThreadPoolExecutor scheduledExecutor;
- /**
- * 服务端accept线程池
- */
- private final ExecutorService acceptExecutorService;
- /**
- * accept工作组
- */
- private final Worker[] acceptWorkers;
- private Worker futureWorker;
- /**
- * 同步IO线程池
- */
- private ExecutorService futureExecutorService;
+ private final AtomicInteger commonIndex = new AtomicInteger(0);
+
/**
* group运行状态
*/
@@ -95,65 +78,32 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup {
}
//init threadPool for write and connect
- final int writeThreadNum = 1;
- final int acceptThreadNum = 1;
- writeExecutorService = getSingleThreadExecutor("smart-socket:write");
- this.writeWorkers = new Worker[writeThreadNum];
+ final int commonThreadNum = 1;
+ commonExecutorService = getSingleThreadExecutor("smart-socket:common");
+ this.commonWorkers = new Worker[commonThreadNum];
- for (int i = 0; i < writeThreadNum; i++) {
- writeWorkers[i] = new Worker(Selector.open(), selectionKey -> {
- EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment();
- //直接调用interestOps的效果比 removeOps(selectionKey, SelectionKey.OP_WRITE) 更好
- selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
- asynchronousSocketChannel.doWrite();
- });
- writeExecutorService.execute(writeWorkers[i]);
- }
-
- //init threadPool for accept
- acceptExecutorService = getSingleThreadExecutor("smart-socket:connect");
- acceptWorkers = new Worker[acceptThreadNum];
- for (int i = 0; i < acceptThreadNum; i++) {
- acceptWorkers[i] = new Worker(Selector.open(), selectionKey -> {
- if (selectionKey.isAcceptable()) {
+ for (int i = 0; i < commonThreadNum; i++) {
+ commonWorkers[i] = new Worker(Selector.open(), selectionKey -> {
+ if (selectionKey.isWritable()) {
+ EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment();
+ //直接调用interestOps的效果比 removeOps(selectionKey, SelectionKey.OP_WRITE) 更好
+ selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
+ asynchronousSocketChannel.doWrite();
+ } else if (selectionKey.isAcceptable()) {
EnhanceAsynchronousServerSocketChannel serverSocketChannel = (EnhanceAsynchronousServerSocketChannel) selectionKey.attachment();
serverSocketChannel.doAccept();
} else if (selectionKey.isConnectable()) {
EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment();
- asynchronousSocketChannel.doConnect();
- }
- });
- acceptExecutorService.execute(acceptWorkers[i]);
- }
-
- scheduledExecutor = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "smart-socket:scheduled"));
- }
-
- /**
- * 同步IO注册异步线程,防止主IO线程阻塞
- */
- public synchronized void registerFuture(Consumer register, int opType) throws IOException {
- if (futureWorker == null) {
- futureExecutorService = getSingleThreadExecutor("smart-socket:future");
- futureWorker = new Worker(Selector.open(), selectionKey -> {
- EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment();
- switch (opType) {
- case SelectionKey.OP_READ:
- removeOps(selectionKey, SelectionKey.OP_READ);
- asynchronousSocketChannel.doRead(true);
- break;
- case SelectionKey.OP_WRITE:
- removeOps(selectionKey, SelectionKey.OP_WRITE);
- asynchronousSocketChannel.doWrite();
- break;
- default:
- throw new UnsupportedOperationException("unSupport opType: " + opType);
+ asynchronousSocketChannel.doConnect(null);
+ } else if (selectionKey.isReadable()) {
+ //仅同步read会用到此线程资源
+ EnhanceAsynchronousSocketChannel asynchronousSocketChannel = (EnhanceAsynchronousSocketChannel) selectionKey.attachment();
+ removeOps(selectionKey, SelectionKey.OP_READ);
+ asynchronousSocketChannel.doRead(true);
}
-
});
- futureExecutorService.execute(futureWorker);
+ commonExecutorService.execute(commonWorkers[i]);
}
- futureWorker.addRegister(register);
}
private ThreadPoolExecutor getSingleThreadExecutor(final String prefix) {
@@ -177,20 +127,8 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup {
return readWorkers[(readIndex.getAndIncrement() & Integer.MAX_VALUE) % readWorkers.length];
}
- public Worker getWriteWorker() {
- return writeWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % writeWorkers.length];
- }
-
- public Worker getAcceptWorker() {
- return acceptWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % acceptWorkers.length];
- }
-
- public Worker getConnectWorker() {
- return acceptWorkers[(writeIndex.getAndIncrement() & Integer.MAX_VALUE) % acceptWorkers.length];
- }
-
- public ScheduledThreadPoolExecutor getScheduledExecutor() {
- return scheduledExecutor;
+ public Worker getCommonWorker() {
+ return commonWorkers[(commonIndex.getAndIncrement() & Integer.MAX_VALUE) % commonWorkers.length];
}
@Override
@@ -207,28 +145,14 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup {
public void shutdown() {
running = false;
readExecutorService.shutdown();
- writeExecutorService.shutdown();
- if (acceptExecutorService != null) {
- acceptExecutorService.shutdown();
- }
- if (futureExecutorService != null) {
- futureExecutorService.shutdown();
- }
- scheduledExecutor.shutdown();
+ commonExecutorService.shutdown();
}
@Override
public void shutdownNow() {
running = false;
readExecutorService.shutdownNow();
- writeExecutorService.shutdownNow();
- if (acceptExecutorService != null) {
- acceptExecutorService.shutdownNow();
- }
- if (futureExecutorService != null) {
- futureExecutorService.shutdownNow();
- }
- scheduledExecutor.shutdownNow();
+ commonExecutorService.shutdownNow();
}
@Override
@@ -251,7 +175,7 @@ class EnhanceAsynchronousChannelGroup extends AsynchronousChannelGroup {
/**
* 当前Worker绑定的Selector
*/
- private final Selector selector;
+ final Selector selector;
private final Consumer consumer;
private final ConcurrentLinkedQueue> consumers = new ConcurrentLinkedQueue<>();
int invoker = 0;
diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java
index 5f3986826b2337736910fea35f5e42a49a8e794b..b57b235c33799eca04b666222c3c69bfdd8e3359 100644
--- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java
+++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousServerSocketChannel.java
@@ -43,7 +43,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
this.enhanceAsynchronousChannelGroup = enhanceAsynchronousChannelGroup;
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
- acceptWorker = enhanceAsynchronousChannelGroup.getAcceptWorker();
+ acceptWorker = enhanceAsynchronousChannelGroup.getCommonWorker();
this.lowMemory = lowMemory;
}
@@ -96,6 +96,7 @@ final class EnhanceAsynchronousServerSocketChannel extends AsynchronousServerSoc
}
if (socketChannel != null) {
EnhanceAsynchronousSocketChannel asynchronousSocketChannel = new EnhanceAsynchronousSocketChannel(enhanceAsynchronousChannelGroup, socketChannel, lowMemory);
+ //这行代码不要乱动
socketChannel.configureBlocking(false);
socketChannel.finishConnect();
CompletionHandler completionHandler = acceptCompletionHandler;
diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java
index 95e80074e7776de284655c13bacc248c0f2fb927..ee1b4365e79e651bf1b4565daef09251f421ab78 100644
--- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java
+++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/EnhanceAsynchronousSocketChannel.java
@@ -49,27 +49,17 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
/**
* 处理 write 事件的线程资源
*/
- private final EnhanceAsynchronousChannelGroup.Worker writeWorker;
- /**
- * 处理 connect 事件的线程资源
- */
- private final EnhanceAsynchronousChannelGroup.Worker connectWorker;
+ private final EnhanceAsynchronousChannelGroup.Worker commonWorker;
+
/**
* 用于接收 read 通道数据的缓冲区,经解码后腾出缓冲区以供下一批数据的读取
*/
private ByteBuffer readBuffer;
- /**
- * 用于接收 read 通道数据的缓冲区集合
- */
- private ByteBufferArray scatteringReadBuffer;
/**
* 存放待输出数据的缓冲区
*/
private ByteBuffer writeBuffer;
- /**
- * 存放待输出数据的缓冲区集合
- */
- private ByteBufferArray gatheringWriteBuffer;
+
/**
* read 回调事件处理器
*/
@@ -84,7 +74,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
private CompletionHandler connectCompletionHandler;
private FutureCompletionHandler connectFuture;
private FutureCompletionHandler extends Number, Object> readFuture;
- private FutureCompletionHandler extends Number, Object> writeFuture;
/**
* read 回调事件关联绑定的附件对象
*/
@@ -98,10 +87,6 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
*/
private Object connectAttachment;
private SelectionKey readSelectionKey;
- private SelectionKey readFutureSelectionKey;
- private SelectionKey writeSelectionKey;
- private SelectionKey writeFutureSelectionKey;
- private SelectionKey connectSelectionKey;
/**
* 当前是否正在执行 write 操作
*/
@@ -114,21 +99,16 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
* 当前是否正在执行 connect 操作
*/
private boolean connectionPending;
- /**
- * 远程连接的地址
- */
- private SocketAddress remote;
private int writeInvoker;
- private boolean lowMemory;
+ private final boolean lowMemory;
public EnhanceAsynchronousSocketChannel(EnhanceAsynchronousChannelGroup group, SocketChannel channel, boolean lowMemory) throws IOException {
super(group.provider());
this.group = group;
this.channel = channel;
readWorker = group.getReadWorker();
- writeWorker = group.getWriteWorker();
- connectWorker = group.getConnectWorker();
+ commonWorker = group.getCommonWorker();
this.lowMemory = lowMemory;
}
@@ -136,7 +116,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
public void close() throws IOException {
IOException exception = null;
try {
- if (channel != null && channel.isOpen()) {
+ if (channel.isOpen()) {
channel.close();
}
} catch (IOException e) {
@@ -146,21 +126,9 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
readSelectionKey.cancel();
readSelectionKey = null;
}
- if (readFutureSelectionKey != null) {
- readFutureSelectionKey.cancel();
- readFutureSelectionKey = null;
- }
- if (writeSelectionKey != null) {
- writeSelectionKey.cancel();
- writeSelectionKey = null;
- }
- if (writeFutureSelectionKey != null) {
- writeFutureSelectionKey.cancel();
- writeFutureSelectionKey = null;
- }
- if (connectSelectionKey != null) {
- connectSelectionKey.cancel();
- connectSelectionKey = null;
+ SelectionKey key = channel.keyFor(commonWorker.selector);
+ if (key != null) {
+ key.cancel();
}
if (exception != null) {
throw exception;
@@ -220,8 +188,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
connectionPending = true;
this.connectAttachment = attachment;
this.connectCompletionHandler = (CompletionHandler) handler;
- this.remote = remote;
- doConnect();
+ doConnect(remote);
}
@Override
@@ -234,24 +201,20 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
@Override
public void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) {
- read0(dst, null, timeout, unit, attachment, handler);
+ if (timeout > 0) {
+ throw new UnsupportedOperationException();
+ }
+ read0(dst, attachment, handler);
}
- private void read0(ByteBuffer readBuffer, ByteBufferArray scattering, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) {
+ private void read0(ByteBuffer readBuffer, A attachment, CompletionHandler handler) {
if (readPending) {
throw new ReadPendingException();
}
readPending = true;
this.readBuffer = readBuffer;
- this.scatteringReadBuffer = scattering;
this.readAttachment = attachment;
- if (timeout > 0) {
- readFuture = new FutureCompletionHandler<>((CompletionHandler) handler, readAttachment);
- readCompletionHandler = (CompletionHandler) readFuture;
- group.getScheduledExecutor().schedule(readFuture, timeout, unit);
- } else {
- this.readCompletionHandler = (CompletionHandler) handler;
- }
+ this.readCompletionHandler = (CompletionHandler) handler;
doRead(readFuture != null);
}
@@ -265,44 +228,37 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
@Override
public void read(ByteBuffer[] dsts, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) {
- read0(null, new ByteBufferArray(dsts, offset, length), timeout, unit, attachment, handler);
+ throw new UnsupportedOperationException();
}
@Override
public void write(ByteBuffer src, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) {
- write0(src, null, timeout, unit, attachment, handler);
+ if (timeout > 0) {
+ throw new UnsupportedOperationException();
+ }
+ write0(src, attachment, handler);
}
- private void write0(ByteBuffer writeBuffer, ByteBufferArray gathering, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) {
+ private void write0(ByteBuffer writeBuffer, A attachment, CompletionHandler handler) {
if (writePending) {
throw new WritePendingException();
}
writePending = true;
this.writeBuffer = writeBuffer;
- this.gatheringWriteBuffer = gathering;
this.writeAttachment = attachment;
- if (timeout > 0) {
- writeFuture = new FutureCompletionHandler<>((CompletionHandler) handler, writeAttachment);
- writeCompletionHandler = (CompletionHandler) writeFuture;
- group.getScheduledExecutor().schedule(writeFuture, timeout, unit);
- } else {
- this.writeCompletionHandler = (CompletionHandler) handler;
- }
+ this.writeCompletionHandler = (CompletionHandler) handler;
doWrite();
}
@Override
public Future write(ByteBuffer src) {
- FutureCompletionHandler writeFuture = new FutureCompletionHandler<>();
- this.writeFuture = writeFuture;
- write0(src, null, 0, TimeUnit.MILLISECONDS, null, writeFuture);
- return writeFuture;
+ throw new UnsupportedOperationException();
}
@Override
public void write(ByteBuffer[] srcs, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler handler) {
- write0(null, new ByteBufferArray(srcs, offset, length), timeout, unit, attachment, handler);
+ throw new UnsupportedOperationException();
}
@Override
@@ -310,7 +266,7 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
return channel.getLocalAddress();
}
- public void doConnect() {
+ public void doConnect(SocketAddress remote) {
try {
//此前通过Future调用,且触发了cancel
if (connectFuture != null && connectFuture.isDone()) {
@@ -321,22 +277,21 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
if (connected || channel.connect(remote)) {
connected = channel.finishConnect();
}
+ //这行代码不要乱动
channel.configureBlocking(false);
if (connected) {
CompletionHandler completionHandler = connectCompletionHandler;
Object attach = connectAttachment;
resetConnect();
completionHandler.completed(null, attach);
- } else if (connectSelectionKey == null) {
- connectWorker.addRegister(selector -> {
+ } else {
+ commonWorker.addRegister(selector -> {
try {
- connectSelectionKey = channel.register(selector, SelectionKey.OP_CONNECT, EnhanceAsynchronousSocketChannel.this);
+ channel.register(selector, SelectionKey.OP_CONNECT, EnhanceAsynchronousSocketChannel.this);
} catch (ClosedChannelException e) {
connectCompletionHandler.failed(e, connectAttachment);
}
});
- } else {
- throw new IOException("unKnow exception");
}
} catch (IOException e) {
connectCompletionHandler.failed(e, connectAttachment);
@@ -371,26 +326,21 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
long readSize = 0;
boolean hasRemain = true;
if (directRead) {
- if (scatteringReadBuffer != null) {
- readSize = channel.read(scatteringReadBuffer.getBuffers(), scatteringReadBuffer.getOffset(), scatteringReadBuffer.getLength());
- hasRemain = hasRemaining(scatteringReadBuffer);
- } else {
- readSize = channel.read(readBuffer);
- hasRemain = readBuffer.hasRemaining();
- }
+ readSize = channel.read(readBuffer);
+ hasRemain = readBuffer.hasRemaining();
}
//注册至异步线程
if (readFuture != null && readSize == 0) {
group.removeOps(readSelectionKey, SelectionKey.OP_READ);
- group.registerFuture(selector -> {
+ commonWorker.addRegister(selector -> {
try {
- readFutureSelectionKey = channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this);
+ channel.register(selector, SelectionKey.OP_READ, EnhanceAsynchronousSocketChannel.this);
} catch (ClosedChannelException e) {
e.printStackTrace();
doRead(true);
}
- }, SelectionKey.OP_READ);
+ });
return;
}
//释放内存
@@ -402,13 +352,8 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
if (readSize != 0 || !hasRemain) {
CompletionHandler completionHandler = readCompletionHandler;
Object attach = readAttachment;
- ByteBufferArray scattering = scatteringReadBuffer;
resetRead();
- if (scattering == null) {
- completionHandler.completed((int) readSize, attach);
- } else {
- completionHandler.completed(readSize, attach);
- }
+ completionHandler.completed((int) readSize, attach);
if (!readPending && readSelectionKey != null) {
group.removeOps(readSelectionKey, SelectionKey.OP_READ);
@@ -445,66 +390,44 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
readCompletionHandler = null;
readAttachment = null;
readBuffer = null;
- scatteringReadBuffer = null;
}
public void doWrite() {
try {
- //此前通过Future调用,且触发了cancel
- if (writeFuture != null && writeFuture.isDone()) {
- resetWrite();
- return;
- }
int invoker = 0;
//防止无限递归导致堆栈溢出
- if (writeWorker.getWorkerThread() == Thread.currentThread()) {
- invoker = ++writeWorker.invoker;
+ if (commonWorker.getWorkerThread() == Thread.currentThread()) {
+ invoker = ++commonWorker.invoker;
} else if (readWorker.getWorkerThread() != Thread.currentThread()) {
invoker = ++writeInvoker;
}
int writeSize = 0;
boolean hasRemain = true;
if (invoker < EnhanceAsynchronousChannelGroup.MAX_INVOKER) {
- if (gatheringWriteBuffer != null) {
- writeSize = (int) channel.write(gatheringWriteBuffer.getBuffers(), gatheringWriteBuffer.getOffset(), gatheringWriteBuffer.getLength());
- hasRemain = hasRemaining(gatheringWriteBuffer);
- } else {
- writeSize = channel.write(writeBuffer);
- hasRemain = writeBuffer.hasRemaining();
- }
+ writeSize = channel.write(writeBuffer);
+ hasRemain = writeBuffer.hasRemaining();
} else {
writeInvoker = 0;
}
- //注册至异步线程
- if (writeFuture != null && writeSize == 0) {
- group.removeOps(writeSelectionKey, SelectionKey.OP_WRITE);
- group.registerFuture(selector -> {
- try {
- writeFutureSelectionKey = channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this);
- } catch (ClosedChannelException e) {
- e.printStackTrace();
- doWrite();
- }
- }, SelectionKey.OP_WRITE);
- return;
- }
-
if (writeSize != 0 || !hasRemain) {
CompletionHandler completionHandler = writeCompletionHandler;
Object attach = writeAttachment;
resetWrite();
completionHandler.completed(writeSize, attach);
- } else if (writeSelectionKey == null) {
- writeWorker.addRegister(selector -> {
- try {
- writeSelectionKey = channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this);
- } catch (ClosedChannelException e) {
- writeCompletionHandler.failed(e, writeAttachment);
- }
- });
} else {
- group.interestOps(writeWorker, writeSelectionKey, SelectionKey.OP_WRITE);
+ SelectionKey commonSelectionKey = channel.keyFor(commonWorker.selector);
+ if (commonSelectionKey == null) {
+ commonWorker.addRegister(selector -> {
+ try {
+ channel.register(selector, SelectionKey.OP_WRITE, EnhanceAsynchronousSocketChannel.this);
+ } catch (ClosedChannelException e) {
+ writeCompletionHandler.failed(e, writeAttachment);
+ }
+ });
+ } else {
+ group.interestOps(commonWorker, commonSelectionKey, SelectionKey.OP_WRITE);
+ }
}
} catch (Throwable e) {
if (writeCompletionHandler == null) {
@@ -520,22 +443,11 @@ final class EnhanceAsynchronousSocketChannel extends AsynchronousSocketChannel {
}
}
- private boolean hasRemaining(ByteBufferArray scattering) {
- for (int i = 0; i < scattering.getLength(); i++) {
- if (scattering.getBuffers()[scattering.getOffset() + i].hasRemaining()) {
- return true;
- }
- }
- return false;
- }
-
private void resetWrite() {
writePending = false;
- writeFuture = null;
writeAttachment = null;
writeCompletionHandler = null;
writeBuffer = null;
- gatheringWriteBuffer = null;
}
@Override
diff --git a/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java b/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java
index b24de7807549eae6238fc730df2fb7a89803f641..0cb228bc6ebcdf3119eaf9514c296fbee3462eed 100644
--- a/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java
+++ b/aio-enhance/src/main/java/org/smartboot/socket/enhance/FutureCompletionHandler.java
@@ -15,22 +15,12 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-final class FutureCompletionHandler implements CompletionHandler, Future, Runnable {
- private CompletionHandler completionHandler;
- private A attach;
+final class FutureCompletionHandler implements CompletionHandler, Future {
private V result;
private boolean done = false;
private boolean cancel = false;
private Throwable exception;
- public FutureCompletionHandler(CompletionHandler completionHandler, A attach) {
- this.completionHandler = completionHandler;
- this.attach = attach;
- }
-
- public FutureCompletionHandler() {
- }
-
@Override
public void completed(V result, A selectionKey) {
this.result = result;
@@ -38,18 +28,12 @@ final class FutureCompletionHandler implements CompletionHandler, Fu
synchronized (this) {
this.notify();
}
- if (completionHandler != null) {
- completionHandler.completed(result, attach);
- }
}
@Override
public void failed(Throwable exc, A attachment) {
exception = exc;
done = true;
- if (completionHandler != null) {
- completionHandler.failed(exc, attachment);
- }
}
@Override
@@ -101,11 +85,4 @@ final class FutureCompletionHandler implements CompletionHandler, Fu
throw new TimeoutException();
}
- @Override
- public synchronized void run() {
- if (!done) {
- cancel(true);
- completionHandler.failed(new TimeoutException(), attach);
- }
- }
}
diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml
index facd8a13ad70a2e3074924b5ddf4b09edd0acf00..88e912163cd486f3670b3725457047d5e4dbc88a 100644
--- a/aio-pro/pom.xml
+++ b/aio-pro/pom.xml
@@ -19,7 +19,7 @@
org.smartboot.socket
smart-socket-parent
- 1.6.2
+ 1.6.3
../smart-socket-parent
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index 4d8856fc8163159d1012ed62941ddd587abe95c8..d9a24ce1a3c33592c3d598cd09d354e85f4c9ee2 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -18,7 +18,7 @@
org.smartboot.socket
aio-pro
- 1.6.2
+ 1.6.3
org.slf4j
diff --git a/example/pom.xml b/example/pom.xml
index b0928048658cdf3a06b4e72aa3fa3f8f4fc1cedd..e0f6555378e2e8229ce7e9c525a5d9833f108895 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -23,7 +23,7 @@
org.smartboot.socket
aio-pro
- 1.6.2
+ 1.6.3
org.apache.commons
diff --git a/pom.xml b/pom.xml
index 140bf49b6eee643edec2ac740c0f5de3f6e7ac33..9e9dc2076b14e6522c815399a2dc5ae9cdb73a98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
org.smartboot.socket
smart-socket-parent
- 1.6.2
+ 1.6.3
2.6
diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml
index d3d3ab57a5ba7d63b8914b91eb0d811adeb4e04a..d95f106f441eee227d554be770972f8960efa3c9 100644
--- a/smart-socket-parent/pom.xml
+++ b/smart-socket-parent/pom.xml
@@ -15,14 +15,14 @@
4.0.0
org.smartboot.socket
smart-socket-parent
- 1.6.2
+ 1.6.3
pom
UTF-8
1.7.36
- 1.6.2
+ 1.6.3
4.13.2