diff --git a/README.md b/README.md
index 1faae78bd675f8e831d563ccb2f5e37e0e1647bf..2cfd2ac4a3e7caef353827d1b08600ba2abfa33f 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,22 @@
## smart-socket [English](README_EN.md)
smart-socket是一款国产开源的Java AIO框架,追求代码量、性能、稳定性、接口设计各方面都达到极致。如果smart-socket对您有一丝帮助,请Star一下我们的项目并持续关注;如果您对smart-socket并不满意,那请多一些耐心,smart-socket一直在努力变得更好。
+## 版本说明
+
+| 系列 | 最新版 | 文档 | 说明 |
+| -- | -- | -- | -- |
+| 1.3 | [1.3.23](https://mvnrepository.com/artifact/org.smartboot.socket/aio-core/1.3.23) | 《[smart-socket技术小册](https://smartboot.gitee.io/docs/smart-socket/)》 | 企业级,已稳定运行在众多企业的生产环境上 |
+| 1.4 | 1.4.0-rc.1 | 暂无 |暂无|
+
+**特色:**
+1. 代码量极少,可读性强
+2. 上手快,二次开发只需实现两个接口
+3. 性能爆表,充分压榨CPU、带宽
+4. 资源占用极低,IO线程0感知
+5. 自带流控、缓存压缩、流量/消息量监控等黑科技
+6. 文档齐全
+
+
**:**
| 群号 | 群类型 | 入群条件 | 福利 |
@@ -23,50 +39,6 @@ smart-socket是一款国产开源的Java AIO框架,追求代码量、性能、
- smart-socket能处理半包、粘包吗?
-**特色:**
-1. 代码量极少,可读性强
-2. 上手快,二次开发只需实现两个接口
-3. 性能爆表,充分压榨CPU、带宽
-4. 资源占用极低,IO线程0感知
-5. 自带流控、缓存压缩、流量/消息量监控等黑科技
-6. 文档齐全《[smart-socket技术小册](https://smartboot.gitee.io/docs/smart-socket/)》
-
-### Maven
-smart-socket发布了两种类型的包供大家选用:
-
-1. aio-core,针对Socket的初级用户提供的开发包,仅提供基本的AIO通讯服务。
-
- ```xml
-
- org.smartboot.socket
- aio-core
- 1.3.22
-
- ```
-
-2. aio-pro,面向资深用户提供的进阶版,不仅包含了aio-core的所有功能,还提供了TLS/SSL通讯功能,并提供一些用于辅助编解码的工具类。
-
- ```xml
-
- org.smartboot.socket
- aio-pro
- 1.3.22
-
- ```
-
-## 性能测试报告
-
-| 项目 | 结果 |
-| --- | --- |
-|CPU| i7-4790 3.60Ghz|
-|内存| 8G|
-|测试代码|服务端:P2PServer,客户端:P2PMultiClient|
-|测试时长|大于两分钟(服务端与客户端启动后的第一分钟数据是无效的,因为实际未跑满一分钟)
-|时间单位|1分钟|
-|数据总流量|7064MB|
-|消息大小|33B|
-|消息数|224484842|
-
## 标题党
- [《每秒处理 500W 条消息,人、机为之颤抖》](https://www.oschina.net/news/90988/smart-socket-1-2-0-beta)
- [《再见,Netty!你好,smart-socket!》](https://my.oschina.net/u/2385344/blog/1603648)
diff --git a/aio-core/pom.xml b/aio-core/pom.xml
index 091d2a411c432557602c56c46f9161adc516eb8e..21acd65baba0e3a8f771acb8a6b85a357b01f393 100644
--- a/aio-core/pom.xml
+++ b/aio-core/pom.xml
@@ -11,7 +11,7 @@
org.smartboot.socket
smart-socket-parent
- 1.4.0.1231-beta
+ 1.4.0-rc.1
../smart-socket-parent
@@ -31,4 +31,26 @@
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+
+
+ true
+ public
+ -Xdoclint:none
+
+
+
+ attach-javadocs
+
+ jar
+
+
+
+
+
+
diff --git a/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java b/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java
index b807716105fda806af811e401a331e7b12006777..d5e6174743092656e63c4a2f18de25f5394f7f00 100644
--- a/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java
+++ b/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java
@@ -74,4 +74,12 @@ public enum StateMachineEnum {
*
AioSession关闭成功
*/
SESSION_CLOSED,
+ /**
+ * 释放流控
+ */
+ RELEASE_FLOW_CONTROL,
+ /**
+ * 流控
+ */
+ FLOW_CONTROL
}
diff --git a/aio-core/src/main/java/org/smartboot/socket/buffer/BufferPage.java b/aio-core/src/main/java/org/smartboot/socket/buffer/BufferPage.java
index cddc5a56d838296a0826fe38c4c9c5b011f21228..f55ecf7efb0a3adf206cb71700d7208ed0bea251 100644
--- a/aio-core/src/main/java/org/smartboot/socket/buffer/BufferPage.java
+++ b/aio-core/src/main/java/org/smartboot/socket/buffer/BufferPage.java
@@ -67,22 +67,15 @@ public final class BufferPage {
freeChunk.setParentPosition(buffer.limit());
}
if (bufferChunk.buffer().remaining() != size) {
- LOGGER.error(bufferChunk.buffer().remaining() + "aaaa" + size);
throw new RuntimeException("allocate " + size + ", buffer:" + bufferChunk);
}
return bufferChunk;
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("bufferPage has no available space: " + size);
- }
+ LOGGER.warn("bufferPage has no available space: " + size);
return new VirtualBuffer(null, allocate0(size, false), 0, 0);
}
synchronized void clean(VirtualBuffer cleanBuffer) {
- if (freeList.isEmpty()) {
- freeList.add(cleanBuffer);
- return;
- }
int index = 0;
Iterator iterator = freeList.iterator();
while (iterator.hasNext()) {
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java
index 222fae8dd7a1c8fc9a1cd56e14e936d34efdab90..fd3e5696ee0be301dbcddfde70ee709fe9b4b989 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java
@@ -136,8 +136,19 @@ public class AioQuickClient {
*
*/
public final void shutdown() {
+ showdown0(false);
+ }
+
+ /**
+ * 立即关闭客户端
+ */
+ public final void shutdownNow() {
+ showdown0(true);
+ }
+
+ private void showdown0(boolean flag) {
if (session != null) {
- session.close();
+ session.close(flag);
session = null;
}
//仅Client内部创建的ChannelGroup需要shutdown
@@ -146,7 +157,6 @@ public class AioQuickClient {
}
}
-
/**
* 设置读缓存区大小
*
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java
index 739e8c15084ec104ce23cc8176848e99a1541e0b..ef5b84d9adb9e69f2fd5576915d57b290114178e 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java
@@ -74,6 +74,7 @@ public class AioQuickServer {
config.setPort(port);
config.setProtocol(protocol);
config.setProcessor(messageProcessor);
+ config.setFlowControlEnabled(true);
}
/**
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java
index c36b693bb1dcaea066c9dd93968d76836f01b6ba..df7c8f6e818b8071c31b4c2bbfdb06a1be36e1cf 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java
@@ -88,20 +88,19 @@ public class AioSession {
* @see AioSession#SESSION_STATUS_ENABLED
*/
protected byte status = SESSION_STATUS_ENABLED;
+ Semaphore readSemaphore = new Semaphore(1);
/**
* 输出信号量,防止并发write导致异常
*/
private Semaphore semaphore = new Semaphore(1);
-
- /**
- * 内存页,用于申请当前AioSession所需的VirtualBuffer
- */
- private BufferPage bufferPage;
/**
* 附件对象
*/
private Object attachment;
-
+ /**
+ * 是否流控,客户端写流控,服务端读流控
+ */
+ private boolean flowControl;
private ReadCompletionHandler readCompletionHandler;
private WriteCompletionHandler writeCompletionHandler;
private IoServerConfig ioServerConfig;
@@ -117,7 +116,6 @@ public class AioSession {
*/
AioSession(AsynchronousSocketChannel channel, final IoServerConfig config, ReadCompletionHandler readCompletionHandler, WriteCompletionHandler writeCompletionHandler, BufferPage bufferPage) {
this.channel = channel;
- this.bufferPage = bufferPage;
this.readCompletionHandler = readCompletionHandler;
this.writeCompletionHandler = writeCompletionHandler;
this.ioServerConfig = config;
@@ -126,6 +124,10 @@ public class AioSession {
byteBuf = new WriteBuffer(bufferPage, new Function, Void>() {
@Override
public Void apply(BlockingQueue var) {
+ if (ioServerConfig.isFlowControlEnabled() && var.size() >= ioServerConfig.getFlowControlSize()) {
+ flowControl = true;
+ ioServerConfig.getProcessor().stateEvent(AioSession.this, StateMachineEnum.FLOW_CONTROL, null);
+ }
if (!semaphore.tryAcquire()) {
return null;
}
@@ -146,6 +148,7 @@ public class AioSession {
* 初始化AioSession
*/
void initSession() {
+ readSemaphore.tryAcquire();
continueRead();
}
@@ -162,6 +165,13 @@ public class AioSession {
}
if (writeBuffer != null) {
+ //如果存在流控并符合释放条件,则触发读操作
+ //一定要放在continueWrite之前
+ if (flowControl && byteBuf.bufList.size() < ioServerConfig.getReleaseFlowControlSize()) {
+ ioServerConfig.getProcessor().stateEvent(AioSession.this, StateMachineEnum.RELEASE_FLOW_CONTROL, null);
+ flowControl = false;
+ readFromChannel(false);
+ }
continueWrite(writeBuffer);
return;
}
@@ -275,9 +285,13 @@ public class AioSession {
/**
- * 触发通道的读操作,当发现存在严重消息积压时,会触发流控
+ * 触发通道的读回调操作
*/
void readFromChannel(boolean eof) {
+ //处于流控状态
+ if (flowControl || !readSemaphore.tryAcquire()) {
+ return;
+ }
final ByteBuffer readBuffer = this.readBuffer.buffer();
readBuffer.flip();
final MessageProcessor messageProcessor = ioServerConfig.getProcessor();
@@ -324,6 +338,13 @@ public class AioSession {
readBuffer.position(readBuffer.limit());
readBuffer.limit(readBuffer.capacity());
}
+
+ //读缓冲区已满
+ if (!readBuffer.hasRemaining()) {
+ RuntimeException exception = new RuntimeException("readBuffer has no remaining");
+ messageProcessor.stateEvent(this, StateMachineEnum.DECODE_EXCEPTION, exception);
+ throw exception;
+ }
continueRead();
}
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
index e28e5501074178350b2130440fe21718c5a38b0e..dbe5e542894ed0f62469f1fa8c32b924e9af292d 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
@@ -32,7 +32,16 @@ final class IoServerConfig {
"\\__, \\| ( ) ( ) |( (_| || | | |_ \\__, \\( (_) )( (___ | |\\`\\ ( ___/| |_ \n" +
"(____/(_) (_) (_)`\\__,_)(_) `\\__) (____/`\\___/'`\\____)(_) (_)`\\____)`\\__)";
- public static final String VERSION = "v1.4.0.1231-beta";
+ public static final String VERSION = "v1.4.0-rc.1";
+ /**
+ * 释放流控阈值
+ */
+ private final int releaseFlowControlSize = getIntProperty(Property.SERVER_RELEASE_FLOW_CONTROL_SIZE, 10);
+
+ /**
+ * 流控阈值
+ */
+ private final int flowControlSize = getIntProperty(Property.SERVER_FLOW_CONTROL_SIZE, 20);
/**
* 消息体缓存大小,字节
*/
@@ -61,11 +70,14 @@ final class IoServerConfig {
* 服务器处理线程数
*/
private int threadNum = Runtime.getRuntime().availableProcessors() + 1;
-
/**
* 是否启用控制台banner
*/
private boolean bannerEnabled = true;
+ /**
+ * 流控功能开关
+ */
+ private boolean flowControlEnabled = false;
/**
* Socket 配置
*/
@@ -132,9 +144,7 @@ final class IoServerConfig {
public final void setProcessor(MessageProcessor processor) {
this.processor = processor;
- if (processor instanceof NetMonitor) {
- this.monitor = (NetMonitor) processor;
- }
+ this.monitor = (processor instanceof NetMonitor) ? (NetMonitor) processor : null;
}
public int getReadBufferSize() {
@@ -164,6 +174,22 @@ final class IoServerConfig {
socketOptions.put(socketOption, f);
}
+ public boolean isFlowControlEnabled() {
+ return flowControlEnabled;
+ }
+
+ public void setFlowControlEnabled(boolean flowControlEnabled) {
+ this.flowControlEnabled = flowControlEnabled;
+ }
+
+ public int getReleaseFlowControlSize() {
+ return releaseFlowControlSize;
+ }
+
+ public int getFlowControlSize() {
+ return flowControlSize;
+ }
+
@Override
public String toString() {
return "IoServerConfig{" +
@@ -189,5 +215,7 @@ final class IoServerConfig {
String SERVER_PAGE_SIZE = PROJECT_NAME + ".server.pageSize";
String CLIENT_PAGE_SIZE = PROJECT_NAME + ".client.pageSize";
String SERVER_PAGE_IS_DIRECT = PROJECT_NAME + ".server.page.isDirect";
+ String SERVER_FLOW_CONTROL_SIZE = PROJECT_NAME + ".server.flowControlSize";
+ String SERVER_RELEASE_FLOW_CONTROL_SIZE = PROJECT_NAME + ".server.releaseFlowControlSize";
}
}
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java b/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java
index 3d461b654bd76bbc87cb55919c477b839dff75b4..17cb9bf815d3428f41fb72af9ae988db9789024f 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java
@@ -32,6 +32,7 @@ class ReadCompletionHandler implements CompletionHandler bufList = new LinkedBlockingQueue<>();
+ BlockingQueue bufList = new LinkedBlockingQueue<>();
private VirtualBuffer writeInBuf;
private BufferPage bufferPage;
private boolean closed = false;
@@ -51,7 +51,7 @@ public final class WriteBuffer extends OutputStream {
function.apply(bufList);
}
- public void writeInt(int v) throws IOException {
+ public synchronized void writeInt(int v) throws IOException {
cacheByte[0] = (byte) ((v >>> 24) & 0xFF);
cacheByte[1] = (byte) ((v >>> 16) & 0xFF);
cacheByte[2] = (byte) ((v >>> 8) & 0xFF);
@@ -105,7 +105,6 @@ public final class WriteBuffer extends OutputStream {
}
if (bufList.size() > 0) {
function.apply(bufList);
- return;
}
}
@@ -129,11 +128,11 @@ public final class WriteBuffer extends OutputStream {
}
}
- public boolean isClosed() {
+ boolean isClosed() {
return closed;
}
- public boolean hasData() {
+ boolean hasData() {
return bufList.size() > 0 || (writeInBuf != null && writeInBuf.buffer().position() > 0);
}
}
\ No newline at end of file
diff --git a/aio-core/src/test/java/org/smartboot/socket/test/IntegerClient.java b/aio-core/src/test/java/org/smartboot/socket/test/IntegerClient.java
index c517282978955633e49146949b55d7e4c9a2dc94..3ad7538f353913f2897db6e93fbe841dbc3494f0 100644
--- a/aio-core/src/test/java/org/smartboot/socket/test/IntegerClient.java
+++ b/aio-core/src/test/java/org/smartboot/socket/test/IntegerClient.java
@@ -8,13 +8,11 @@ import org.smartboot.socket.transport.AioSession;
*/
public class IntegerClient {
public static void main(String[] args) throws Exception {
- IntegerClientProcessor processor = new IntegerClientProcessor();
- AioQuickClient aioQuickClient = new AioQuickClient("localhost", 8888, new IntegerProtocol(), processor);
+ AioQuickClient aioQuickClient = new AioQuickClient("localhost", 8888, new IntegerProtocol(), new IntegerClientProcessor());
AioSession session = aioQuickClient.start();
session.writeBuffer().writeInt(1);
-// session.getOutputStream().flush();
-// session.getOutputStream().close();
- Thread.sleep(1000);
- aioQuickClient.shutdown();
+// session.writeBuffer().flush();
+// Thread.sleep(1000);
+ aioQuickClient.shutdownNow();
}
}
diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml
index f32fed9ea43f81a4cf3156fa5da60b7f380576cd..5c6998543c9f1c8b486900eac3af7b3a2b502dc6 100644
--- a/aio-pro/pom.xml
+++ b/aio-pro/pom.xml
@@ -19,7 +19,7 @@
org.smartboot.socket
smart-socket-parent
- 1.4.0.1231-beta
+ 1.4.0-rc.1
../smart-socket-parent
diff --git a/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringClient.java b/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringClient.java
index 9c6e65e67c7091e4910394c9d25079784b46c91b..89c22d99fa9671dccece81b1c55c3b1a16fbd18e 100644
--- a/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringClient.java
+++ b/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringClient.java
@@ -20,7 +20,7 @@ public class StringClient {
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
System.setProperty("smart-socket.server.pageSize", (1024 * 1024 * 32) + "");
- System.setProperty("smart-socket.session.writeChunkSize", "2048");
+ System.setProperty("smart-socket.session.writeChunkSize", "1048");
for (int i = 0; i < 10; i++) {
new Thread() {
@Override
diff --git a/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringServer.java b/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringServer.java
index 638bd7237f62437b20b92bc3238e81a1c25e016e..a5f7287586c57f4016ed1c879108ce943364b75b 100644
--- a/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringServer.java
+++ b/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringServer.java
@@ -41,6 +41,14 @@ public class StringServer {
if (throwable != null) {
throwable.printStackTrace();
}
+ switch (stateMachineEnum){
+ case FLOW_CONTROL:
+// System.out.println("流控");
+ break;
+ case RELEASE_FLOW_CONTROL:
+// System.out.println("释放流控");
+ break;
+ }
}
};
processor.addPlugin(new MonitorPlugin());
diff --git a/example/benchmark2/bin/client.sh b/example/benchmark2/bin/client.sh
new file mode 100644
index 0000000000000000000000000000000000000000..b7a00ad088e102886a2480d229bab841036b9fcb
--- /dev/null
+++ b/example/benchmark2/bin/client.sh
@@ -0,0 +1,3 @@
+#!/bin/sh
+HTTP_HOME=$(dirname $(pwd))
+java -Dlog4j.configurationFile=file:${HTTP_HOME}/conf/log4j2_client.xml -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${HTTP_HOME}/lib/ org.smartboot.socket.benchmark.StringClient
diff --git a/example/benchmark2/bin/server.sh b/example/benchmark2/bin/server.sh
new file mode 100644
index 0000000000000000000000000000000000000000..70b087ed98bcc711e78dc8fa66ee3cf5f02a0b6a
--- /dev/null
+++ b/example/benchmark2/bin/server.sh
@@ -0,0 +1,3 @@
+#!/bin/sh
+HTTP_HOME=$(dirname $(pwd))
+java -Dlog4j.configurationFile=file:${HTTP_HOME}/conf/log4j2_server.xml -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${HTTP_HOME}/lib/ org.smartboot.socket.benchmark.StringServer
diff --git a/example/benchmark2/conf/log4j2_client.xml b/example/benchmark2/conf/log4j2_client.xml
new file mode 100644
index 0000000000000000000000000000000000000000..353be8a4a4ec94c15aeb84a38efb47fe46eb0f99
--- /dev/null
+++ b/example/benchmark2/conf/log4j2_client.xml
@@ -0,0 +1,44 @@
+
+
+
+
+
+
+
+ ../logs/
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/example/benchmark2/conf/log4j2_server.xml b/example/benchmark2/conf/log4j2_server.xml
new file mode 100644
index 0000000000000000000000000000000000000000..5651d519a9b4f002f0e10c3caf400e90baa6634e
--- /dev/null
+++ b/example/benchmark2/conf/log4j2_server.xml
@@ -0,0 +1,44 @@
+
+
+
+
+
+
+
+ ../logs/
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/example/benchmark2/pom.xml b/example/benchmark2/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..d28defe742124f5d68f998740e676c3f62fe7209
--- /dev/null
+++ b/example/benchmark2/pom.xml
@@ -0,0 +1,72 @@
+
+
+
+ smart-socket-parent
+ org.smartboot.socket
+ 1.3.23
+
+ 4.0.0
+1.0
+ benchmark2
+
+
+ org.slf4j
+ slf4j-simple
+ 1.7.25
+
+
+ org.smartboot.socket
+ aio-pro
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ 2.11.0
+
+
+ org.apache.logging.log4j
+ log4j-api
+ 2.11.0
+
+
+
+
+
+ maven-assembly-plugin
+
+
+
+
+
+ release.xml
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
+ alimaven
+ aliyun maven
+ http://maven.aliyun.com/nexus/content/groups/public/
+
+
+
+
+ alimaven
+ aliyun maven
+ http://maven.aliyun.com/nexus/content/groups/public/
+
+
+
\ No newline at end of file
diff --git a/example/benchmark2/release.xml b/example/benchmark2/release.xml
new file mode 100644
index 0000000000000000000000000000000000000000..b179f16168251131ffe2122033a13d3f4d6898b5
--- /dev/null
+++ b/example/benchmark2/release.xml
@@ -0,0 +1,48 @@
+
+
+
+
+
+ tar.gz
+ dir
+
+
+
+
+ /lib
+
+
+
+
+
+
+ bin/**
+
+ 0755
+
+
+
+
+ /conf/**
+ logs
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringClient.java b/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringClient.java
new file mode 100644
index 0000000000000000000000000000000000000000..4e3909a663435511872073c18e05340cffaf0ead
--- /dev/null
+++ b/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringClient.java
@@ -0,0 +1,92 @@
+package org.smartboot.socket.benchmark;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartboot.socket.MessageProcessor;
+import org.smartboot.socket.StateMachineEnum;
+import org.smartboot.socket.transport.AioQuickClient;
+import org.smartboot.socket.transport.AioSession;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * @author 三刀
+ * @version V1.0 , 2018/11/23
+ */
+public class StringClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StringClient.class);
+ AsynchronousChannelGroup asynchronousChannelGroup;
+
+ public StringClient(AsynchronousChannelGroup asynchronousChannelGroup) {
+ this.asynchronousChannelGroup = asynchronousChannelGroup;
+ }
+
+ public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
+ System.setProperty("smart-socket.server.pageSize", (1024 * 1024 * 32) + "");
+ System.setProperty("smart-socket.session.writeChunkSize", "2048");
+ final AsynchronousChannelGroup asynchronousChannelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ });
+
+ for (int i = 0; i < 10; i++) {
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ new StringClient(asynchronousChannelGroup).test();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+ }.start();
+ }
+
+ }
+
+ public void test() throws InterruptedException, ExecutionException, IOException {
+ AioQuickClient client = new AioQuickClient<>("localhost", 8888, new StringProtocol(), new MessageProcessor() {
+ @Override
+ public void process(AioSession session, String msg) {
+// LOGGER.info(msg);
+ }
+
+ @Override
+ public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) {
+ if (throwable != null) {
+ throwable.printStackTrace();
+ }
+ }
+ });
+ client.setWriteQueueSize(16384);
+ AioSession session = client.start(asynchronousChannelGroup);
+
+ int i = 1;
+ while (true) {
+ int num = (int) (Math.random() * 10) + 1;
+ StringBuilder sb = new StringBuilder();
+ while (num-- > 0) {
+ sb.append("smart-socket");
+ }
+ byte[] bytes = sb.toString().getBytes();
+ ByteBuffer buffer = ByteBuffer.allocate(bytes.length + 4);
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ buffer.flip();
+ session.write(buffer);
+// outputStream.writeInt(bytes.length);
+// outputStream.write(bytes);
+ }
+ }
+}
diff --git a/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringProtocol.java b/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringProtocol.java
new file mode 100644
index 0000000000000000000000000000000000000000..86092367fcf78b52bf2483b1d4d47d275b9c52de
--- /dev/null
+++ b/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringProtocol.java
@@ -0,0 +1,35 @@
+package org.smartboot.socket.benchmark;
+
+import org.smartboot.socket.Protocol;
+import org.smartboot.socket.transport.AioSession;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author 三刀
+ * @version V1.0 , 2018/11/23
+ */
+public class StringProtocol implements Protocol {
+ @Override
+ public String decode(ByteBuffer readBuffer, AioSession session) {
+ int remaining = readBuffer.remaining();
+ if (remaining < 4) {
+ return null;
+ }
+ readBuffer.mark();
+ int length = readBuffer.getInt();
+ if (length > readBuffer.remaining()) {
+ readBuffer.reset();
+ return null;
+ }
+ byte[] b = new byte[length];
+ readBuffer.get(b);
+ readBuffer.mark();
+ return new String(b);
+ }
+
+ @Override
+ public ByteBuffer encode(String s, AioSession aioSession) {
+ return null;
+ }
+}
diff --git a/example/pom.xml b/example/pom.xml
index 211c16ca4ba56c2d1249134e5a4d1202cba87108..dc172f1e935d3fc377ac5fb15ee6734b87467c0e 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -5,7 +5,7 @@
smart-socket-parent
org.smartboot.socket
- 1.4.0.1231-beta
+ 1.4.0-rc.1
pom
1.0.0
diff --git a/pom.xml b/pom.xml
index b24550d4f0fabacc217f650b9189573c80d3c63b..23e835c30726893093d5e7285af1c3a992507967 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
org.smartboot.socket
smart-socket-parent
- 1.4.0.1231-beta
+ 1.4.0-rc.1
2.6
diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml
index dfb6c4a476449f7244bdc17b10c167d2eb1bdfdf..de614fc6c670055b3d4ff4e3d1fe0ba13f44f090 100644
--- a/smart-socket-parent/pom.xml
+++ b/smart-socket-parent/pom.xml
@@ -15,14 +15,14 @@
4.0.0
org.smartboot.socket
smart-socket-parent
- 1.4.0.1231-beta
+ 1.4.0-rc.1
pom
UTF-8
1.7.25
- 1.4.0.1231-beta
+ 1.4.0-rc.1
diff --git a/smart-socket-parent/uml/aio-core.puml b/smart-socket-parent/uml/aio-core.puml
new file mode 100644
index 0000000000000000000000000000000000000000..3fe560b43ab94a98e211e35900027cbc786e2008
--- /dev/null
+++ b/smart-socket-parent/uml/aio-core.puml
@@ -0,0 +1,86 @@
+@startuml
+
+skinparam packageStyle rectangle
+skinparam ClassFontSize 20
+skinparam ClassAttributeFontSize 20
+skinparam ClassStereotypeFontSize 18
+skinparam titleBorderRoundCorner 26
+skinparam PackageFontSize 18
+skinparam titleBorderThickness 2
+skinparam titleBorderColor red
+skinparam titleBackgroundColor Cornsilk
+skinparam LegendFontSize 8
+title smart-socket 基础通信
+
+package org.smart.socket.transport <> {
+
+class AioQuickClient <<客户端AIO通信>>{
+#AioSession session
+#BufferPagePool bufferPool
+-AsynchronousChannelGroup asynchronousChannelGroup
++ AioSession start()
++ void shutdown()
++ AioQuickClient setReadBufferSize(int size)
++ AioQuickClient setOption(SocketOption, value)
+}
+
+class AioQuickServer <<服务端AIO通信>>{
+#ReadCompletionHandler aioReadCompletionHandler
+#WriteCompletionHandler aioWriteCompletionHandler
+#BufferPagePool bufferPool
+-AsynchronousServerSocketChannel serverSocketChannel
+-AsynchronousChannelGroup asynchronousChannelGroup
++void start()
++void shutdown()
++AioQuickServer setThreadNum(int num)
++AioQuickServer setReadBufferSize(int size)
++AioQuickServer setBannerEnabled(boolean enabled)
++AioQuickServer setOption(SocketOption, value)
+}
+
+class AioSession <<通信会话>>{
+#AsynchronousSocketChannel channel
+#VirtualBuffer readBuffer
+#VirtualBuffer writeBuffer
+- Object attachment
+-InputStream inputStream
+-WriteBuffer writeBuffer
++WriteBuffer writeBuffer()
++void close()
++String getSessionID()
++boolean isInvalid()
++T getAttachment()
++void setAttachment(T attachment)
++InputStream getInputStream()
++InputStream getInputStream(int length)
+}
+
+class WriteBuffer <<数据缓冲区>> {
+BlockingQueue bufList
+- VirtualBuffer writeInBuf
+-BufferPage bufferPage
++void write()
++void flush()
++void close()
++boolean isClosed()
++boolean hasData()
+}
+
+
+AioSession o-- AioQuickClient
+AioSession o-- AioQuickServer
+WriteBuffer *-down- AioSession
+}
+
+package org.smart.socket.buffer <> {
+note "smart-socket内存池" as N
+}
+
+
+WriteBuffer o-up- org.smart.socket.buffer
+
+legend right
+ 三刀
+endlegend
+
+@enduml
\ No newline at end of file
diff --git a/smart-socket-parent/uml/buffer_pool.puml b/smart-socket-parent/uml/buffer_pool.puml
new file mode 100644
index 0000000000000000000000000000000000000000..80fe0db8c6b70e7ca3140b4ae14dd8528e6c23d8
--- /dev/null
+++ b/smart-socket-parent/uml/buffer_pool.puml
@@ -0,0 +1,55 @@
+@startuml
+
+skinparam packageStyle rectangle
+skinparam ClassFontSize 20
+skinparam ClassAttributeFontSize 18
+skinparam titleBorderRoundCorner 26
+'skinparam PackageFontSize 38
+skinparam titleBorderThickness 2
+skinparam titleBorderColor red
+skinparam titleBackgroundColor Cornsilk
+skinparam LegendFontSize 8
+title smart-socket 内存池
+
+package org.smart.socket.buffer <> {
+
+note "提供堆外缓冲区 DirectByteBuffer 的池化服务,\n由内存池管理虚拟内存块 VirtualBuffer 的分配与回收" as tip
+
+class BufferPage {
+ - List freeList
+ - ByteBuffer buffer
+ + VirtualBuffer allocate(int size)
+ void clean(VirtualBuffer cleanBuffer)
+}
+
+
+class BufferPagePool {
+- BufferPage[] bufferPageList
+'- int cursor
++ allocateBufferPage()
+}
+
+
+class VirtualBuffer{
+ BufferPage bufferPage
+ - ByteBuffer buffer
+ - boolean clean
+ - int parentPosition
+ - int parentLimit
+
+ + ByteBuffer buffer()
+ + void clean()
+}
+
+BufferPagePool "1" *-- "N" BufferPage : 缓存页池化
+BufferPage "1" *- "0..*" VirtualBuffer : 虚拟内存块
+}
+'org.smart.socket +-- org.smart.socket.transport
+
+'org.smart.socket.transport +-- org.smart.socket.buffer
+
+legend right
+ 三刀
+endlegend
+
+@enduml
\ No newline at end of file