diff --git a/README.md b/README.md index 1faae78bd675f8e831d563ccb2f5e37e0e1647bf..2cfd2ac4a3e7caef353827d1b08600ba2abfa33f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,22 @@ ## smart-socket [English](README_EN.md) smart-socket是一款国产开源的Java AIO框架,追求代码量、性能、稳定性、接口设计各方面都达到极致。如果smart-socket对您有一丝帮助,请Star一下我们的项目并持续关注;如果您对smart-socket并不满意,那请多一些耐心,smart-socket一直在努力变得更好。 +## 版本说明 + +| 系列 | 最新版 | 文档 | 说明 | +| -- | -- | -- | -- | +| 1.3 | [1.3.23](https://mvnrepository.com/artifact/org.smartboot.socket/aio-core/1.3.23) | 《[smart-socket技术小册](https://smartboot.gitee.io/docs/smart-socket/)》 | 企业级,已稳定运行在众多企业的生产环境上 | +| 1.4 | 1.4.0-rc.1 | 暂无 |暂无| + +**特色:** +1. 代码量极少,可读性强 +2. 上手快,二次开发只需实现两个接口 +3. 性能爆表,充分压榨CPU、带宽 +4. 资源占用极低,IO线程0感知 +5. 自带流控、缓存压缩、流量/消息量监控等黑科技 +6. 文档齐全 + + **![smart\-socket](//pub.idqqimg.com/wpa/images/group.png):** | 群号 | 群类型 | 入群条件 | 福利 | @@ -23,50 +39,6 @@ smart-socket是一款国产开源的Java AIO框架,追求代码量、性能、 - smart-socket能处理半包、粘包吗? -**特色:** -1. 代码量极少,可读性强 -2. 上手快,二次开发只需实现两个接口 -3. 性能爆表,充分压榨CPU、带宽 -4. 资源占用极低,IO线程0感知 -5. 自带流控、缓存压缩、流量/消息量监控等黑科技 -6. 文档齐全《[smart-socket技术小册](https://smartboot.gitee.io/docs/smart-socket/)》 - -### Maven -smart-socket发布了两种类型的包供大家选用: - -1. aio-core,针对Socket的初级用户提供的开发包,仅提供基本的AIO通讯服务。 - - ```xml - - org.smartboot.socket - aio-core - 1.3.22 - - ``` - -2. aio-pro,面向资深用户提供的进阶版,不仅包含了aio-core的所有功能,还提供了TLS/SSL通讯功能,并提供一些用于辅助编解码的工具类。 - - ```xml - - org.smartboot.socket - aio-pro - 1.3.22 - - ``` - -## 性能测试报告 - -| 项目 | 结果 | -| --- | --- | -|CPU| i7-4790 3.60Ghz| -|内存| 8G| -|测试代码|服务端:P2PServer,客户端:P2PMultiClient| -|测试时长|大于两分钟(服务端与客户端启动后的第一分钟数据是无效的,因为实际未跑满一分钟) -|时间单位|1分钟| -|数据总流量|7064MB| -|消息大小|33B| -|消息数|224484842| - ## 标题党 - [《每秒处理 500W 条消息,人、机为之颤抖》](https://www.oschina.net/news/90988/smart-socket-1-2-0-beta) - [《再见,Netty!你好,smart-socket!》](https://my.oschina.net/u/2385344/blog/1603648) diff --git a/aio-core/pom.xml b/aio-core/pom.xml index 091d2a411c432557602c56c46f9161adc516eb8e..21acd65baba0e3a8f771acb8a6b85a357b01f393 100644 --- a/aio-core/pom.xml +++ b/aio-core/pom.xml @@ -11,7 +11,7 @@ org.smartboot.socket smart-socket-parent - 1.4.0.1231-beta + 1.4.0-rc.1 ../smart-socket-parent @@ -31,4 +31,26 @@ + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + true + public + -Xdoclint:none + + + + attach-javadocs + + jar + + + + + + diff --git a/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java b/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java index b807716105fda806af811e401a331e7b12006777..d5e6174743092656e63c4a2f18de25f5394f7f00 100644 --- a/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java +++ b/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java @@ -74,4 +74,12 @@ public enum StateMachineEnum { *

AioSession关闭成功

*/ SESSION_CLOSED, + /** + * 释放流控 + */ + RELEASE_FLOW_CONTROL, + /** + * 流控 + */ + FLOW_CONTROL } diff --git a/aio-core/src/main/java/org/smartboot/socket/buffer/BufferPage.java b/aio-core/src/main/java/org/smartboot/socket/buffer/BufferPage.java index cddc5a56d838296a0826fe38c4c9c5b011f21228..f55ecf7efb0a3adf206cb71700d7208ed0bea251 100644 --- a/aio-core/src/main/java/org/smartboot/socket/buffer/BufferPage.java +++ b/aio-core/src/main/java/org/smartboot/socket/buffer/BufferPage.java @@ -67,22 +67,15 @@ public final class BufferPage { freeChunk.setParentPosition(buffer.limit()); } if (bufferChunk.buffer().remaining() != size) { - LOGGER.error(bufferChunk.buffer().remaining() + "aaaa" + size); throw new RuntimeException("allocate " + size + ", buffer:" + bufferChunk); } return bufferChunk; } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("bufferPage has no available space: " + size); - } + LOGGER.warn("bufferPage has no available space: " + size); return new VirtualBuffer(null, allocate0(size, false), 0, 0); } synchronized void clean(VirtualBuffer cleanBuffer) { - if (freeList.isEmpty()) { - freeList.add(cleanBuffer); - return; - } int index = 0; Iterator iterator = freeList.iterator(); while (iterator.hasNext()) { diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java index 222fae8dd7a1c8fc9a1cd56e14e936d34efdab90..fd3e5696ee0be301dbcddfde70ee709fe9b4b989 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java @@ -136,8 +136,19 @@ public class AioQuickClient { *

*/ public final void shutdown() { + showdown0(false); + } + + /** + * 立即关闭客户端 + */ + public final void shutdownNow() { + showdown0(true); + } + + private void showdown0(boolean flag) { if (session != null) { - session.close(); + session.close(flag); session = null; } //仅Client内部创建的ChannelGroup需要shutdown @@ -146,7 +157,6 @@ public class AioQuickClient { } } - /** * 设置读缓存区大小 * diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java index 739e8c15084ec104ce23cc8176848e99a1541e0b..ef5b84d9adb9e69f2fd5576915d57b290114178e 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java @@ -74,6 +74,7 @@ public class AioQuickServer { config.setPort(port); config.setProtocol(protocol); config.setProcessor(messageProcessor); + config.setFlowControlEnabled(true); } /** diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java index c36b693bb1dcaea066c9dd93968d76836f01b6ba..df7c8f6e818b8071c31b4c2bbfdb06a1be36e1cf 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java @@ -88,20 +88,19 @@ public class AioSession { * @see AioSession#SESSION_STATUS_ENABLED */ protected byte status = SESSION_STATUS_ENABLED; + Semaphore readSemaphore = new Semaphore(1); /** * 输出信号量,防止并发write导致异常 */ private Semaphore semaphore = new Semaphore(1); - - /** - * 内存页,用于申请当前AioSession所需的VirtualBuffer - */ - private BufferPage bufferPage; /** * 附件对象 */ private Object attachment; - + /** + * 是否流控,客户端写流控,服务端读流控 + */ + private boolean flowControl; private ReadCompletionHandler readCompletionHandler; private WriteCompletionHandler writeCompletionHandler; private IoServerConfig ioServerConfig; @@ -117,7 +116,6 @@ public class AioSession { */ AioSession(AsynchronousSocketChannel channel, final IoServerConfig config, ReadCompletionHandler readCompletionHandler, WriteCompletionHandler writeCompletionHandler, BufferPage bufferPage) { this.channel = channel; - this.bufferPage = bufferPage; this.readCompletionHandler = readCompletionHandler; this.writeCompletionHandler = writeCompletionHandler; this.ioServerConfig = config; @@ -126,6 +124,10 @@ public class AioSession { byteBuf = new WriteBuffer(bufferPage, new Function, Void>() { @Override public Void apply(BlockingQueue var) { + if (ioServerConfig.isFlowControlEnabled() && var.size() >= ioServerConfig.getFlowControlSize()) { + flowControl = true; + ioServerConfig.getProcessor().stateEvent(AioSession.this, StateMachineEnum.FLOW_CONTROL, null); + } if (!semaphore.tryAcquire()) { return null; } @@ -146,6 +148,7 @@ public class AioSession { * 初始化AioSession */ void initSession() { + readSemaphore.tryAcquire(); continueRead(); } @@ -162,6 +165,13 @@ public class AioSession { } if (writeBuffer != null) { + //如果存在流控并符合释放条件,则触发读操作 + //一定要放在continueWrite之前 + if (flowControl && byteBuf.bufList.size() < ioServerConfig.getReleaseFlowControlSize()) { + ioServerConfig.getProcessor().stateEvent(AioSession.this, StateMachineEnum.RELEASE_FLOW_CONTROL, null); + flowControl = false; + readFromChannel(false); + } continueWrite(writeBuffer); return; } @@ -275,9 +285,13 @@ public class AioSession { /** - * 触发通道的读操作,当发现存在严重消息积压时,会触发流控 + * 触发通道的读回调操作 */ void readFromChannel(boolean eof) { + //处于流控状态 + if (flowControl || !readSemaphore.tryAcquire()) { + return; + } final ByteBuffer readBuffer = this.readBuffer.buffer(); readBuffer.flip(); final MessageProcessor messageProcessor = ioServerConfig.getProcessor(); @@ -324,6 +338,13 @@ public class AioSession { readBuffer.position(readBuffer.limit()); readBuffer.limit(readBuffer.capacity()); } + + //读缓冲区已满 + if (!readBuffer.hasRemaining()) { + RuntimeException exception = new RuntimeException("readBuffer has no remaining"); + messageProcessor.stateEvent(this, StateMachineEnum.DECODE_EXCEPTION, exception); + throw exception; + } continueRead(); } diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index e28e5501074178350b2130440fe21718c5a38b0e..dbe5e542894ed0f62469f1fa8c32b924e9af292d 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -32,7 +32,16 @@ final class IoServerConfig { "\\__, \\| ( ) ( ) |( (_| || | | |_ \\__, \\( (_) )( (___ | |\\`\\ ( ___/| |_ \n" + "(____/(_) (_) (_)`\\__,_)(_) `\\__) (____/`\\___/'`\\____)(_) (_)`\\____)`\\__)"; - public static final String VERSION = "v1.4.0.1231-beta"; + public static final String VERSION = "v1.4.0-rc.1"; + /** + * 释放流控阈值 + */ + private final int releaseFlowControlSize = getIntProperty(Property.SERVER_RELEASE_FLOW_CONTROL_SIZE, 10); + + /** + * 流控阈值 + */ + private final int flowControlSize = getIntProperty(Property.SERVER_FLOW_CONTROL_SIZE, 20); /** * 消息体缓存大小,字节 */ @@ -61,11 +70,14 @@ final class IoServerConfig { * 服务器处理线程数 */ private int threadNum = Runtime.getRuntime().availableProcessors() + 1; - /** * 是否启用控制台banner */ private boolean bannerEnabled = true; + /** + * 流控功能开关 + */ + private boolean flowControlEnabled = false; /** * Socket 配置 */ @@ -132,9 +144,7 @@ final class IoServerConfig { public final void setProcessor(MessageProcessor processor) { this.processor = processor; - if (processor instanceof NetMonitor) { - this.monitor = (NetMonitor) processor; - } + this.monitor = (processor instanceof NetMonitor) ? (NetMonitor) processor : null; } public int getReadBufferSize() { @@ -164,6 +174,22 @@ final class IoServerConfig { socketOptions.put(socketOption, f); } + public boolean isFlowControlEnabled() { + return flowControlEnabled; + } + + public void setFlowControlEnabled(boolean flowControlEnabled) { + this.flowControlEnabled = flowControlEnabled; + } + + public int getReleaseFlowControlSize() { + return releaseFlowControlSize; + } + + public int getFlowControlSize() { + return flowControlSize; + } + @Override public String toString() { return "IoServerConfig{" + @@ -189,5 +215,7 @@ final class IoServerConfig { String SERVER_PAGE_SIZE = PROJECT_NAME + ".server.pageSize"; String CLIENT_PAGE_SIZE = PROJECT_NAME + ".client.pageSize"; String SERVER_PAGE_IS_DIRECT = PROJECT_NAME + ".server.page.isDirect"; + String SERVER_FLOW_CONTROL_SIZE = PROJECT_NAME + ".server.flowControlSize"; + String SERVER_RELEASE_FLOW_CONTROL_SIZE = PROJECT_NAME + ".server.releaseFlowControlSize"; } } diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java b/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java index 3d461b654bd76bbc87cb55919c477b839dff75b4..17cb9bf815d3428f41fb72af9ae988db9789024f 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java @@ -32,6 +32,7 @@ class ReadCompletionHandler implements CompletionHandler bufList = new LinkedBlockingQueue<>(); + BlockingQueue bufList = new LinkedBlockingQueue<>(); private VirtualBuffer writeInBuf; private BufferPage bufferPage; private boolean closed = false; @@ -51,7 +51,7 @@ public final class WriteBuffer extends OutputStream { function.apply(bufList); } - public void writeInt(int v) throws IOException { + public synchronized void writeInt(int v) throws IOException { cacheByte[0] = (byte) ((v >>> 24) & 0xFF); cacheByte[1] = (byte) ((v >>> 16) & 0xFF); cacheByte[2] = (byte) ((v >>> 8) & 0xFF); @@ -105,7 +105,6 @@ public final class WriteBuffer extends OutputStream { } if (bufList.size() > 0) { function.apply(bufList); - return; } } @@ -129,11 +128,11 @@ public final class WriteBuffer extends OutputStream { } } - public boolean isClosed() { + boolean isClosed() { return closed; } - public boolean hasData() { + boolean hasData() { return bufList.size() > 0 || (writeInBuf != null && writeInBuf.buffer().position() > 0); } } \ No newline at end of file diff --git a/aio-core/src/test/java/org/smartboot/socket/test/IntegerClient.java b/aio-core/src/test/java/org/smartboot/socket/test/IntegerClient.java index c517282978955633e49146949b55d7e4c9a2dc94..3ad7538f353913f2897db6e93fbe841dbc3494f0 100644 --- a/aio-core/src/test/java/org/smartboot/socket/test/IntegerClient.java +++ b/aio-core/src/test/java/org/smartboot/socket/test/IntegerClient.java @@ -8,13 +8,11 @@ import org.smartboot.socket.transport.AioSession; */ public class IntegerClient { public static void main(String[] args) throws Exception { - IntegerClientProcessor processor = new IntegerClientProcessor(); - AioQuickClient aioQuickClient = new AioQuickClient("localhost", 8888, new IntegerProtocol(), processor); + AioQuickClient aioQuickClient = new AioQuickClient("localhost", 8888, new IntegerProtocol(), new IntegerClientProcessor()); AioSession session = aioQuickClient.start(); session.writeBuffer().writeInt(1); -// session.getOutputStream().flush(); -// session.getOutputStream().close(); - Thread.sleep(1000); - aioQuickClient.shutdown(); +// session.writeBuffer().flush(); +// Thread.sleep(1000); + aioQuickClient.shutdownNow(); } } diff --git a/aio-pro/pom.xml b/aio-pro/pom.xml index f32fed9ea43f81a4cf3156fa5da60b7f380576cd..5c6998543c9f1c8b486900eac3af7b3a2b502dc6 100644 --- a/aio-pro/pom.xml +++ b/aio-pro/pom.xml @@ -19,7 +19,7 @@ org.smartboot.socket smart-socket-parent - 1.4.0.1231-beta + 1.4.0-rc.1 ../smart-socket-parent diff --git a/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringClient.java b/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringClient.java index 9c6e65e67c7091e4910394c9d25079784b46c91b..89c22d99fa9671dccece81b1c55c3b1a16fbd18e 100644 --- a/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringClient.java +++ b/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringClient.java @@ -20,7 +20,7 @@ public class StringClient { public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { System.setProperty("smart-socket.server.pageSize", (1024 * 1024 * 32) + ""); - System.setProperty("smart-socket.session.writeChunkSize", "2048"); + System.setProperty("smart-socket.session.writeChunkSize", "1048"); for (int i = 0; i < 10; i++) { new Thread() { @Override diff --git a/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringServer.java b/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringServer.java index 638bd7237f62437b20b92bc3238e81a1c25e016e..a5f7287586c57f4016ed1c879108ce943364b75b 100644 --- a/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringServer.java +++ b/example/benchmark/src/main/java/org/smartboot/socket/benchmark/StringServer.java @@ -41,6 +41,14 @@ public class StringServer { if (throwable != null) { throwable.printStackTrace(); } + switch (stateMachineEnum){ + case FLOW_CONTROL: +// System.out.println("流控"); + break; + case RELEASE_FLOW_CONTROL: +// System.out.println("释放流控"); + break; + } } }; processor.addPlugin(new MonitorPlugin()); diff --git a/example/benchmark2/bin/client.sh b/example/benchmark2/bin/client.sh new file mode 100644 index 0000000000000000000000000000000000000000..b7a00ad088e102886a2480d229bab841036b9fcb --- /dev/null +++ b/example/benchmark2/bin/client.sh @@ -0,0 +1,3 @@ +#!/bin/sh +HTTP_HOME=$(dirname $(pwd)) +java -Dlog4j.configurationFile=file:${HTTP_HOME}/conf/log4j2_client.xml -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${HTTP_HOME}/lib/ org.smartboot.socket.benchmark.StringClient diff --git a/example/benchmark2/bin/server.sh b/example/benchmark2/bin/server.sh new file mode 100644 index 0000000000000000000000000000000000000000..70b087ed98bcc711e78dc8fa66ee3cf5f02a0b6a --- /dev/null +++ b/example/benchmark2/bin/server.sh @@ -0,0 +1,3 @@ +#!/bin/sh +HTTP_HOME=$(dirname $(pwd)) +java -Dlog4j.configurationFile=file:${HTTP_HOME}/conf/log4j2_server.xml -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${HTTP_HOME}/lib/ org.smartboot.socket.benchmark.StringServer diff --git a/example/benchmark2/conf/log4j2_client.xml b/example/benchmark2/conf/log4j2_client.xml new file mode 100644 index 0000000000000000000000000000000000000000..353be8a4a4ec94c15aeb84a38efb47fe46eb0f99 --- /dev/null +++ b/example/benchmark2/conf/log4j2_client.xml @@ -0,0 +1,44 @@ + + + + + + + + ../logs/ + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/example/benchmark2/conf/log4j2_server.xml b/example/benchmark2/conf/log4j2_server.xml new file mode 100644 index 0000000000000000000000000000000000000000..5651d519a9b4f002f0e10c3caf400e90baa6634e --- /dev/null +++ b/example/benchmark2/conf/log4j2_server.xml @@ -0,0 +1,44 @@ + + + + + + + + ../logs/ + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/example/benchmark2/pom.xml b/example/benchmark2/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..d28defe742124f5d68f998740e676c3f62fe7209 --- /dev/null +++ b/example/benchmark2/pom.xml @@ -0,0 +1,72 @@ + + + + smart-socket-parent + org.smartboot.socket + 1.3.23 + + 4.0.0 +1.0 + benchmark2 + + + org.slf4j + slf4j-simple + 1.7.25 + + + org.smartboot.socket + aio-pro + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.11.0 + + + org.apache.logging.log4j + log4j-api + 2.11.0 + + + + + + maven-assembly-plugin + + + + + + release.xml + + + + + make-assembly + package + + single + + + + + + + + + alimaven + aliyun maven + http://maven.aliyun.com/nexus/content/groups/public/ + + + + + alimaven + aliyun maven + http://maven.aliyun.com/nexus/content/groups/public/ + + + \ No newline at end of file diff --git a/example/benchmark2/release.xml b/example/benchmark2/release.xml new file mode 100644 index 0000000000000000000000000000000000000000..b179f16168251131ffe2122033a13d3f4d6898b5 --- /dev/null +++ b/example/benchmark2/release.xml @@ -0,0 +1,48 @@ + + + + + + tar.gz + dir + + + + + /lib + + + + + + + bin/** + + 0755 + + + + + /conf/** + logs + + + + + + + + + + + + + \ No newline at end of file diff --git a/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringClient.java b/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringClient.java new file mode 100644 index 0000000000000000000000000000000000000000..4e3909a663435511872073c18e05340cffaf0ead --- /dev/null +++ b/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringClient.java @@ -0,0 +1,92 @@ +package org.smartboot.socket.benchmark; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.socket.MessageProcessor; +import org.smartboot.socket.StateMachineEnum; +import org.smartboot.socket.transport.AioQuickClient; +import org.smartboot.socket.transport.AioSession; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousChannelGroup; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadFactory; + +/** + * @author 三刀 + * @version V1.0 , 2018/11/23 + */ +public class StringClient { + private static final Logger LOGGER = LoggerFactory.getLogger(StringClient.class); + AsynchronousChannelGroup asynchronousChannelGroup; + + public StringClient(AsynchronousChannelGroup asynchronousChannelGroup) { + this.asynchronousChannelGroup = asynchronousChannelGroup; + } + + public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { + System.setProperty("smart-socket.server.pageSize", (1024 * 1024 * 32) + ""); + System.setProperty("smart-socket.session.writeChunkSize", "2048"); + final AsynchronousChannelGroup asynchronousChannelGroup = AsynchronousChannelGroup.withFixedThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r); + } + }); + + for (int i = 0; i < 10; i++) { + new Thread() { + @Override + public void run() { + try { + new StringClient(asynchronousChannelGroup).test(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + }.start(); + } + + } + + public void test() throws InterruptedException, ExecutionException, IOException { + AioQuickClient client = new AioQuickClient<>("localhost", 8888, new StringProtocol(), new MessageProcessor() { + @Override + public void process(AioSession session, String msg) { +// LOGGER.info(msg); + } + + @Override + public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) { + if (throwable != null) { + throwable.printStackTrace(); + } + } + }); + client.setWriteQueueSize(16384); + AioSession session = client.start(asynchronousChannelGroup); + + int i = 1; + while (true) { + int num = (int) (Math.random() * 10) + 1; + StringBuilder sb = new StringBuilder(); + while (num-- > 0) { + sb.append("smart-socket"); + } + byte[] bytes = sb.toString().getBytes(); + ByteBuffer buffer = ByteBuffer.allocate(bytes.length + 4); + buffer.putInt(bytes.length); + buffer.put(bytes); + buffer.flip(); + session.write(buffer); +// outputStream.writeInt(bytes.length); +// outputStream.write(bytes); + } + } +} diff --git a/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringProtocol.java b/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringProtocol.java new file mode 100644 index 0000000000000000000000000000000000000000..86092367fcf78b52bf2483b1d4d47d275b9c52de --- /dev/null +++ b/example/benchmark2/src/main/java/org/smartboot/socket/benchmark/StringProtocol.java @@ -0,0 +1,35 @@ +package org.smartboot.socket.benchmark; + +import org.smartboot.socket.Protocol; +import org.smartboot.socket.transport.AioSession; + +import java.nio.ByteBuffer; + +/** + * @author 三刀 + * @version V1.0 , 2018/11/23 + */ +public class StringProtocol implements Protocol { + @Override + public String decode(ByteBuffer readBuffer, AioSession session) { + int remaining = readBuffer.remaining(); + if (remaining < 4) { + return null; + } + readBuffer.mark(); + int length = readBuffer.getInt(); + if (length > readBuffer.remaining()) { + readBuffer.reset(); + return null; + } + byte[] b = new byte[length]; + readBuffer.get(b); + readBuffer.mark(); + return new String(b); + } + + @Override + public ByteBuffer encode(String s, AioSession aioSession) { + return null; + } +} diff --git a/example/pom.xml b/example/pom.xml index 211c16ca4ba56c2d1249134e5a4d1202cba87108..dc172f1e935d3fc377ac5fb15ee6734b87467c0e 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -5,7 +5,7 @@ smart-socket-parent org.smartboot.socket - 1.4.0.1231-beta + 1.4.0-rc.1 pom 1.0.0 diff --git a/pom.xml b/pom.xml index b24550d4f0fabacc217f650b9189573c80d3c63b..23e835c30726893093d5e7285af1c3a992507967 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ org.smartboot.socket smart-socket-parent - 1.4.0.1231-beta + 1.4.0-rc.1 2.6 diff --git a/smart-socket-parent/pom.xml b/smart-socket-parent/pom.xml index dfb6c4a476449f7244bdc17b10c167d2eb1bdfdf..de614fc6c670055b3d4ff4e3d1fe0ba13f44f090 100644 --- a/smart-socket-parent/pom.xml +++ b/smart-socket-parent/pom.xml @@ -15,14 +15,14 @@ 4.0.0 org.smartboot.socket smart-socket-parent - 1.4.0.1231-beta + 1.4.0-rc.1 pom UTF-8 1.7.25 - 1.4.0.1231-beta + 1.4.0-rc.1 diff --git a/smart-socket-parent/uml/aio-core.puml b/smart-socket-parent/uml/aio-core.puml new file mode 100644 index 0000000000000000000000000000000000000000..3fe560b43ab94a98e211e35900027cbc786e2008 --- /dev/null +++ b/smart-socket-parent/uml/aio-core.puml @@ -0,0 +1,86 @@ +@startuml + +skinparam packageStyle rectangle +skinparam ClassFontSize 20 +skinparam ClassAttributeFontSize 20 +skinparam ClassStereotypeFontSize 18 +skinparam titleBorderRoundCorner 26 +skinparam PackageFontSize 18 +skinparam titleBorderThickness 2 +skinparam titleBorderColor red +skinparam titleBackgroundColor Cornsilk +skinparam LegendFontSize 8 +title smart-socket 基础通信 + +package org.smart.socket.transport <> { + +class AioQuickClient <<客户端AIO通信>>{ +#AioSession session +#BufferPagePool bufferPool +-AsynchronousChannelGroup asynchronousChannelGroup ++ AioSession start() ++ void shutdown() ++ AioQuickClient setReadBufferSize(int size) ++ AioQuickClient setOption(SocketOption, value) +} + +class AioQuickServer <<服务端AIO通信>>{ +#ReadCompletionHandler aioReadCompletionHandler +#WriteCompletionHandler aioWriteCompletionHandler +#BufferPagePool bufferPool +-AsynchronousServerSocketChannel serverSocketChannel +-AsynchronousChannelGroup asynchronousChannelGroup ++void start() ++void shutdown() ++AioQuickServer setThreadNum(int num) ++AioQuickServer setReadBufferSize(int size) ++AioQuickServer setBannerEnabled(boolean enabled) ++AioQuickServer setOption(SocketOption, value) +} + +class AioSession <<通信会话>>{ +#AsynchronousSocketChannel channel +#VirtualBuffer readBuffer +#VirtualBuffer writeBuffer +- Object attachment +-InputStream inputStream +-WriteBuffer writeBuffer ++WriteBuffer writeBuffer() ++void close() ++String getSessionID() ++boolean isInvalid() ++T getAttachment() ++void setAttachment(T attachment) ++InputStream getInputStream() ++InputStream getInputStream(int length) +} + +class WriteBuffer <<数据缓冲区>> { +BlockingQueue bufList +- VirtualBuffer writeInBuf +-BufferPage bufferPage ++void write() ++void flush() ++void close() ++boolean isClosed() ++boolean hasData() +} + + +AioSession o-- AioQuickClient +AioSession o-- AioQuickServer +WriteBuffer *-down- AioSession +} + +package org.smart.socket.buffer <> { +note "smart-socket内存池" as N +} + + +WriteBuffer o-up- org.smart.socket.buffer + +legend right + 三刀 +endlegend + +@enduml \ No newline at end of file diff --git a/smart-socket-parent/uml/buffer_pool.puml b/smart-socket-parent/uml/buffer_pool.puml new file mode 100644 index 0000000000000000000000000000000000000000..80fe0db8c6b70e7ca3140b4ae14dd8528e6c23d8 --- /dev/null +++ b/smart-socket-parent/uml/buffer_pool.puml @@ -0,0 +1,55 @@ +@startuml + +skinparam packageStyle rectangle +skinparam ClassFontSize 20 +skinparam ClassAttributeFontSize 18 +skinparam titleBorderRoundCorner 26 +'skinparam PackageFontSize 38 +skinparam titleBorderThickness 2 +skinparam titleBorderColor red +skinparam titleBackgroundColor Cornsilk +skinparam LegendFontSize 8 +title smart-socket 内存池 + +package org.smart.socket.buffer <> { + +note "提供堆外缓冲区 DirectByteBuffer 的池化服务,\n由内存池管理虚拟内存块 VirtualBuffer 的分配与回收" as tip + +class BufferPage { + - List freeList + - ByteBuffer buffer + + VirtualBuffer allocate(int size) + void clean(VirtualBuffer cleanBuffer) +} + + +class BufferPagePool { +- BufferPage[] bufferPageList +'- int cursor ++ allocateBufferPage() +} + + +class VirtualBuffer{ + BufferPage bufferPage + - ByteBuffer buffer + - boolean clean + - int parentPosition + - int parentLimit + + + ByteBuffer buffer() + + void clean() +} + +BufferPagePool "1" *-- "N" BufferPage : 缓存页池化 +BufferPage "1" *- "0..*" VirtualBuffer : 虚拟内存块 +} +'org.smart.socket +-- org.smart.socket.transport + +'org.smart.socket.transport +-- org.smart.socket.buffer + +legend right + 三刀 +endlegend + +@enduml \ No newline at end of file