From 58705715ecd73c283595df3058bad0767ab61ba7 Mon Sep 17 00:00:00 2001 From: Seer Date: Mon, 12 Mar 2018 11:27:49 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E6=94=AF=E6=8C=81directByteBuffer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/socket/StateMachineEnum.java | 4 ++-- .../org/smartboot/socket/transport/AioQuickClient.java | 10 ++++++++++ .../org/smartboot/socket/transport/AioQuickServer.java | 10 ++++++++++ .../org/smartboot/socket/transport/AioSession.java | 7 +++++-- .../org/smartboot/socket/transport/IoServerConfig.java | 10 ++++++++++ .../socket/protocol/p2p/client/P2PMultiClient.java | 1 + .../socket/protocol/p2p/server/P2PServer.java | 1 + 7 files changed, 39 insertions(+), 4 deletions(-) diff --git a/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java b/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java index 49737596..412ef38a 100644 --- a/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java +++ b/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java @@ -1,8 +1,8 @@ /* - * Copyright (c) 2017, org.smartboot. All rights reserved. + * Copyright (c) 2018, org.smartboot. All rights reserved. * project name: smart-socket * file name: StateMachineEnum.java - * Date: 2017-11-25 + * Date: 2018-03-12 * Author: sandao */ diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java index 3dbf85b6..3ed2e0d4 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java @@ -162,4 +162,14 @@ public class AioQuickClient { this.config.setWriteQueueSize(size); return this; } + + /** + * 是否启用DirectByteBuffer + * + * @param directBuffer + */ + public final AioQuickClient setDirectBuffer(boolean directBuffer) { + config.setDirectBuffer(directBuffer); + return this; + } } diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java index 7601e7e3..5f01b370 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java @@ -172,4 +172,14 @@ public class AioQuickServer { config.setBannerEnabled(bannerEnabled); return this; } + + /** + * 是否启用DirectByteBuffer + * + * @param directBuffer + */ + public final AioQuickServer setDirectBuffer(boolean directBuffer) { + config.setDirectBuffer(directBuffer); + return this; + } } diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java index 4f55f34e..4995b069 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java @@ -96,7 +96,7 @@ public class AioSession { this.serverFlowLimit = serverSession ? false : null; //触发状态机 config.getProcessor().stateEvent(this, StateMachineEnum.NEW_SESSION, null); - this.readBuffer = ByteBuffer.allocate(config.getReadBufferSize()); + this.readBuffer = newByteBuffer0(config.getReadBufferSize()); for (Filter filter : config.getFilters()) { filter.connected(this); } @@ -143,7 +143,7 @@ public class AioSession { writeBuffer = headBuffer; } else { if (writeBuffer == null || totalSize * 2 <= writeBuffer.capacity() || totalSize > writeBuffer.capacity()) { - writeBuffer = ByteBuffer.allocate(totalSize); + writeBuffer = newByteBuffer0(totalSize); } else { writeBuffer.clear().limit(totalSize); } @@ -344,4 +344,7 @@ public class AioSession { return this.ioServerConfig; } + private ByteBuffer newByteBuffer0(int size) { + return ioServerConfig.isDirectBuffer() ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size); + } } diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index 436cc3eb..e0591d74 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -64,6 +64,8 @@ final class IoServerConfig { */ private Protocol protocol; + private boolean directBuffer; + /** * 服务器处理线程数 */ @@ -171,4 +173,12 @@ final class IoServerConfig { public void setBannerEnabled(boolean bannerEnabled) { this.bannerEnabled = bannerEnabled; } + + public boolean isDirectBuffer() { + return directBuffer; + } + + public void setDirectBuffer(boolean directBuffer) { + this.directBuffer = directBuffer; + } } diff --git a/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PMultiClient.java b/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PMultiClient.java index 1920970e..230676a8 100644 --- a/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PMultiClient.java +++ b/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PMultiClient.java @@ -48,6 +48,7 @@ public class P2PMultiClient { // .setWriteQueueSize(16384); AioQuickClient client = new AioQuickClient("localhost", 8888, new P2PProtocol(messageFactory), processor); client.setFilters(new Filter[]{new QuickMonitorTimer()}) + .setDirectBuffer(true) .setWriteQueueSize(16384); try { client.start(asynchronousChannelGroup); diff --git a/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/server/P2PServer.java b/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/server/P2PServer.java index 6c590723..b4519d27 100644 --- a/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/server/P2PServer.java +++ b/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/server/P2PServer.java @@ -37,6 +37,7 @@ public class P2PServer { AioQuickServer server = new AioQuickServer(8888, new P2PProtocol(messageFactory), new P2PServerMessageProcessor(messageFactory)); server.setThreadNum(16) .setWriteQueueSize(16384) + .setDirectBuffer(true) .setFilters(new Filter[]{new QuickMonitorTimer()}); try { server.start(); -- Gitee From 92c4b2b278bf6f94510b115b8ac2d1533e726d5d Mon Sep 17 00:00:00 2001 From: Seer Date: Thu, 15 Mar 2018 16:42:01 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/transport/AioQuickClient.java | 52 ++++-------------- .../socket/transport/AioQuickServer.java | 29 ++-------- .../java/net/vinote/demo/IntegerClient.java | 5 +- .../java/net/vinote/demo/IntegerServer.java | 5 +- .../socket/transport/AioSSLQuickClient.java | 7 +-- .../socket/transport/AioSSLQuickServer.java | 2 - .../smartboot/socket/http/HttpBootstrap.java | 1 + .../socket/http/HttpContentDecoder.java | 20 ------- .../smartboot/socket/http/HttpDecodeUnit.java | 7 +-- .../smartboot/socket/http/HttpProtocol.java | 2 +- .../http/http11/Http11ContentDecoder.java | 35 ++++++++---- .../http/websocket/WebsocketDecoder.java | 53 ++++++++++++------- .../p2p/client/P2PDisconnectClient.java | 6 +-- .../p2p/client/P2PMaxConnectClient.java | 22 ++++---- .../src/main/java/Test.java | 13 ++--- 15 files changed, 98 insertions(+), 161 deletions(-) delete mode 100644 smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpContentDecoder.java diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java index 3ed2e0d4..19ae1e14 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java @@ -9,8 +9,6 @@ package org.smartboot.socket.transport; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.smartboot.socket.Filter; import org.smartboot.socket.MessageProcessor; import org.smartboot.socket.Protocol; @@ -29,7 +27,6 @@ import java.util.concurrent.ThreadFactory; * @version V1.0.0 */ public class AioQuickClient { - private static final Logger LOGGER = LogManager.getLogger(AioQuickClient.class); protected AsynchronousSocketChannel socketChannel = null; /** * IO事件处理线程组 @@ -40,8 +37,7 @@ public class AioQuickClient { */ protected IoServerConfig config = new IoServerConfig(); - public AioQuickClient() { - } + protected AioSession session; /** * @param host 远程服务器地址 @@ -50,7 +46,10 @@ public class AioQuickClient { * @param messageProcessor 消息处理器 */ public AioQuickClient(String host, int port, Protocol protocol, MessageProcessor messageProcessor) { - connect(host, port).setProtocol(protocol).setProcessor(messageProcessor); + config.setHost(host); + config.setPort(port); + config.setProtocol(protocol); + config.setProcessor(messageProcessor); } /** @@ -63,7 +62,7 @@ public class AioQuickClient { this.socketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup); socketChannel.connect(new InetSocketAddress(config.getHost(), config.getPort())).get(); //连接成功则构造AIOSession对象 - AioSession session = new AioSession(socketChannel, config, new ReadCompletionHandler(), new WriteCompletionHandler(), false); + session = new AioSession(socketChannel, config, new ReadCompletionHandler(), new WriteCompletionHandler(), false); session.initSession(); } @@ -88,12 +87,9 @@ public class AioQuickClient { * 停止客户端服务 */ public final void shutdown() { - if (socketChannel != null) { - try { - socketChannel.close(); - } catch (Exception e) { - LOGGER.catching(e); - } + if (session != null) { + session.close(); + session = null; } //仅Client内部创建的ChannelGroup需要shutdown if (asynchronousChannelGroup != null) { @@ -101,27 +97,6 @@ public class AioQuickClient { } } - /** - * 设置远程连接的地址、端口 - * - * @param host - * @param port - */ - public final AioQuickClient connect(String host, int port) { - this.config.setHost(host); - this.config.setPort(port); - return this; - } - - /** - * 设置协议对象 - * - * @param protocol - */ - public final AioQuickClient setProtocol(Protocol protocol) { - this.config.setProtocol(protocol); - return this; - } /** * 设置消息过滤器,执行顺序以数组中的顺序为准 @@ -133,15 +108,6 @@ public class AioQuickClient { return this; } - /** - * 设置消息处理器 - * - * @param processor - */ - public final AioQuickClient setProcessor(MessageProcessor processor) { - this.config.setProcessor(processor); - return this; - } /** * 设置读缓存区大小 diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java index 5f01b370..d6f0f768 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java @@ -37,16 +37,15 @@ public class AioQuickServer { protected ReadCompletionHandler aioReadCompletionHandler = new ReadCompletionHandler<>(); protected WriteCompletionHandler aioWriteCompletionHandler = new WriteCompletionHandler<>(); - public AioQuickServer() { - } - /** * @param port 绑定服务端口号 * @param protocol 协议编解码 * @param messageProcessor 消息处理器 */ public AioQuickServer(int port, Protocol protocol, MessageProcessor messageProcessor) { - bind(port).setProtocol(protocol).setProcessor(messageProcessor); + config.setPort(port); + config.setProtocol(protocol); + config.setProcessor(messageProcessor); } public void start() throws IOException { @@ -98,15 +97,6 @@ public class AioQuickServer { asynchronousChannelGroup.shutdown(); } - /** - * 设置服务绑定的端口 - * - * @param port - */ - public final AioQuickServer bind(int port) { - this.config.setPort(port); - return this; - } /** * 设置处理线程数量 @@ -118,10 +108,6 @@ public class AioQuickServer { return this; } - public final AioQuickServer setProtocol(Protocol protocol) { - this.config.setProtocol(protocol); - return this; - } /** * 设置消息过滤器,执行顺序以数组中的顺序为准 @@ -133,15 +119,6 @@ public class AioQuickServer { return this; } - /** - * 设置消息处理器 - * - * @param processor - */ - public final AioQuickServer setProcessor(MessageProcessor processor) { - this.config.setProcessor(processor); - return this; - } /** * 设置输出队列缓冲区长度 diff --git a/aio-core/src/test/java/net/vinote/demo/IntegerClient.java b/aio-core/src/test/java/net/vinote/demo/IntegerClient.java index 2db42cac..29674612 100644 --- a/aio-core/src/test/java/net/vinote/demo/IntegerClient.java +++ b/aio-core/src/test/java/net/vinote/demo/IntegerClient.java @@ -8,10 +8,7 @@ import org.smartboot.socket.transport.AioQuickClient; public class IntegerClient { public static void main(String[] args) throws Exception { IntegerClientProcessor processor = new IntegerClientProcessor(); - AioQuickClient aioQuickClient = new AioQuickClient() - .connect("localhost", 8888) - .setProtocol(new IntegerProtocol()) - .setProcessor(processor); + AioQuickClient aioQuickClient = new AioQuickClient("localhost", 8888, new IntegerProtocol(), processor); aioQuickClient.start(); processor.getSession().write(1); Thread.sleep(1000); diff --git a/aio-core/src/test/java/net/vinote/demo/IntegerServer.java b/aio-core/src/test/java/net/vinote/demo/IntegerServer.java index e306adf2..33e5dbb2 100644 --- a/aio-core/src/test/java/net/vinote/demo/IntegerServer.java +++ b/aio-core/src/test/java/net/vinote/demo/IntegerServer.java @@ -9,10 +9,7 @@ import java.io.IOException; */ public class IntegerServer { public static void main(String[] args) { - AioQuickServer server = new AioQuickServer() - .bind(8888) - .setProtocol(new IntegerProtocol()) - .setProcessor(new IntegerServerProcessor()); + AioQuickServer server = new AioQuickServer(8888, new IntegerProtocol(), new IntegerServerProcessor()); try { server.start(); } catch (IOException e) { diff --git a/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickClient.java b/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickClient.java index 958c2e2f..526c17dd 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickClient.java +++ b/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickClient.java @@ -26,15 +26,12 @@ import java.util.concurrent.ExecutionException; * AIO实现的客户端服务 * Created by 三刀 on 2017/6/28. */ -public class AioSSLQuickClient extends AioQuickClient { +public final class AioSSLQuickClient extends AioQuickClient { private static final Logger LOGGER = LogManager.getLogger(AioSSLQuickClient.class); private SSLService sslService; private SSLConfig sslConfig = new SSLConfig(); - public AioSSLQuickClient() { - } - /** * @param host 远程服务器地址 * @param port 远程服务器端口号 @@ -58,7 +55,7 @@ public class AioSSLQuickClient extends AioQuickClient { this.socketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup); socketChannel.connect(new InetSocketAddress(config.getHost(), config.getPort())).get(); //连接成功则构造AIOSession对象 - AioSession session = new SSLAioSession(socketChannel, config, new ReadCompletionHandler(), new WriteCompletionHandler(), sslService); + session = new SSLAioSession(socketChannel, config, new ReadCompletionHandler(), new WriteCompletionHandler(), sslService); session.initSession(); } diff --git a/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickServer.java b/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickServer.java index c6a2577c..db0f0ef7 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickServer.java +++ b/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickServer.java @@ -27,8 +27,6 @@ public class AioSSLQuickServer extends AioQuickServer { private SSLService sslService; - public AioSSLQuickServer() { - } /** * @param port 绑定服务端口号 diff --git a/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpBootstrap.java b/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpBootstrap.java index a212705a..1fec33cd 100755 --- a/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpBootstrap.java +++ b/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpBootstrap.java @@ -36,6 +36,7 @@ public class HttpBootstrap { static void http(HttpMessageProcessor processor) { // 定义服务器接受的消息类型以及各类消息对应的处理器 AioQuickServer server = new AioQuickServer(8888, new HttpProtocol(), processor); + server.setDirectBuffer(true); server.setFilters(new Filter[]{new QuickMonitorTimer()}); try { server.start(); diff --git a/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpContentDecoder.java b/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpContentDecoder.java deleted file mode 100644 index 71829a19..00000000 --- a/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpContentDecoder.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2018, org.smartboot. All rights reserved. - * project name: smart-socket - * file name: HttpContentDecoder.java - * Date: 2018-02-16 - * Author: sandao - */ - -package org.smartboot.socket.http; - -import java.nio.ByteBuffer; - -/** - * @author 三刀 - * @version V1.0 , 2018/2/16 - */ -public abstract class HttpContentDecoder { - - public abstract void decode(HttpDecodeUnit decodeUnit, ByteBuffer buffer); -} diff --git a/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpDecodeUnit.java b/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpDecodeUnit.java index 0ac191a2..cd5ead57 100644 --- a/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpDecodeUnit.java +++ b/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpDecodeUnit.java @@ -8,6 +8,7 @@ package org.smartboot.socket.http; +import org.smartboot.socket.Protocol; import org.smartboot.socket.extension.decoder.DelimiterFrameDecoder; import org.smartboot.socket.extension.decoder.FixedLengthFrameDecoder; import org.smartboot.socket.extension.decoder.StreamFrameDecoder; @@ -21,7 +22,7 @@ import org.smartboot.socket.http.enums.HttpPartEnum; public class HttpDecodeUnit { HttpHeader header; HttpRequest entity; - HttpContentDecoder contentDecoder; + Protocol contentDecoder; /** * 当前解码阶段 */ @@ -62,11 +63,11 @@ public class HttpDecodeUnit { this.partFinished = partFinished; } - public HttpContentDecoder getContentDecoder() { + public Protocol getContentDecoder() { return contentDecoder; } - public void setContentDecoder(HttpContentDecoder contentDecoder) { + public void setContentDecoder(Protocol contentDecoder) { this.contentDecoder = contentDecoder; } diff --git a/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpProtocol.java b/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpProtocol.java index f95c0e2e..21b6fd69 100755 --- a/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpProtocol.java +++ b/smart-protocol-http/src/main/java/org/smartboot/socket/http/HttpProtocol.java @@ -115,7 +115,7 @@ final class HttpProtocol implements Protocol { decodeUnit.setDecodePartEnum(HttpPartEnum.CONTENT); } case CONTENT: { - decodeUnit.contentDecoder.decode(decodeUnit, buffer); + decodeUnit.contentDecoder.decode(buffer, session, eof); break; } default: { diff --git a/smart-protocol-http/src/main/java/org/smartboot/socket/http/http11/Http11ContentDecoder.java b/smart-protocol-http/src/main/java/org/smartboot/socket/http/http11/Http11ContentDecoder.java index 2ebb2709..47d31b44 100644 --- a/smart-protocol-http/src/main/java/org/smartboot/socket/http/http11/Http11ContentDecoder.java +++ b/smart-protocol-http/src/main/java/org/smartboot/socket/http/http11/Http11ContentDecoder.java @@ -9,9 +9,11 @@ package org.smartboot.socket.http.http11; import org.apache.commons.lang.StringUtils; -import org.smartboot.socket.http.HttpContentDecoder; +import org.smartboot.socket.Protocol; import org.smartboot.socket.http.HttpDecodeUnit; +import org.smartboot.socket.http.HttpRequest; import org.smartboot.socket.http.enums.HttpPartEnum; +import org.smartboot.socket.transport.AioSession; import java.nio.ByteBuffer; @@ -19,9 +21,26 @@ import java.nio.ByteBuffer; * @author 三刀 * @version V1.0 , 2018/2/16 */ -public class Http11ContentDecoder extends HttpContentDecoder { +public class Http11ContentDecoder implements Protocol { + + private void decodeBodyForm(HttpDecodeUnit unit) { + ByteBuffer buffer = unit.getFormBodyDecoder().getBuffer(); + String[] paramArray = StringUtils.split(new String(buffer.array(), buffer.position(), buffer.remaining()), "&"); + for (int i = 0; i < paramArray.length; i++) { + ((Http11Request) unit.getEntity()).setParam(StringUtils.substringBefore(paramArray[i], "=").trim(), StringUtils.substringAfter(paramArray[i], "=").trim()); + } + } + @Override - public void decode(HttpDecodeUnit decodeUnit, ByteBuffer buffer) { + public HttpRequest decode(ByteBuffer buffer, AioSession session, boolean eof) { + HttpDecodeUnit decodeUnit = (HttpDecodeUnit) session.getAttachment(); + if (decodeUnit == null) { + throw new RuntimeException("decodeUnit is null"); + } + HttpRequest httpRequest = decodeUnit.getEntity(); + if (httpRequest == null) { + throw new RuntimeException("request is null"); + } switch (decodeUnit.getBodyTypeEnum()) { case FORM: if (decodeUnit.getFormBodyDecoder().decode(buffer)) { @@ -38,13 +57,11 @@ public class Http11ContentDecoder extends HttpContentDecoder { default: throw new UnsupportedOperationException(); } + return null; } - private void decodeBodyForm(HttpDecodeUnit unit) { - ByteBuffer buffer = unit.getFormBodyDecoder().getBuffer(); - String[] paramArray = StringUtils.split(new String(buffer.array(), buffer.position(), buffer.remaining()), "&"); - for (int i = 0; i < paramArray.length; i++) { - ((Http11Request) unit.getEntity()).setParam(StringUtils.substringBefore(paramArray[i], "=").trim(), StringUtils.substringAfter(paramArray[i], "=").trim()); - } + @Override + public ByteBuffer encode(HttpRequest msg, AioSession session) { + return null; } } diff --git a/smart-protocol-http/src/main/java/org/smartboot/socket/http/websocket/WebsocketDecoder.java b/smart-protocol-http/src/main/java/org/smartboot/socket/http/websocket/WebsocketDecoder.java index 558a6517..3bb2e4d5 100644 --- a/smart-protocol-http/src/main/java/org/smartboot/socket/http/websocket/WebsocketDecoder.java +++ b/smart-protocol-http/src/main/java/org/smartboot/socket/http/websocket/WebsocketDecoder.java @@ -8,9 +8,11 @@ package org.smartboot.socket.http.websocket; +import org.smartboot.socket.Protocol; import org.smartboot.socket.extension.decoder.FixedLengthFrameDecoder; -import org.smartboot.socket.http.HttpContentDecoder; import org.smartboot.socket.http.HttpDecodeUnit; +import org.smartboot.socket.http.HttpRequest; +import org.smartboot.socket.transport.AioSession; import java.nio.ByteBuffer; @@ -18,9 +20,30 @@ import java.nio.ByteBuffer; * @author 三刀 * @version V1.0 , 2018/2/11 */ -public class WebsocketDecoder extends HttpContentDecoder { +public class WebsocketDecoder implements Protocol { + + private void unmask(DataFraming framing, ByteBuffer payLoadBuffer) { + int i = payLoadBuffer.position(); + int end = payLoadBuffer.limit(); + int intMask = ((framing.getMaskingKey()[0] & 0xFF) << 24) + | ((framing.getMaskingKey()[1] & 0xFF) << 16) + | ((framing.getMaskingKey()[2] & 0xFF) << 8) + | (framing.getMaskingKey()[3] & 0xFF); + for (; i + 3 < end; i += 4) { + int unmasked = payLoadBuffer.getInt(i) ^ intMask; + payLoadBuffer.putInt(i, unmasked); + } + for (; i < end; i++) { + payLoadBuffer.put(i, (byte) (payLoadBuffer.get(i) ^ framing.getMaskingKey()[i % 4])); + } + } + @Override - public void decode(HttpDecodeUnit decodeUnit, ByteBuffer buffer) { + public HttpRequest decode(ByteBuffer buffer, AioSession session, boolean eof) { + HttpDecodeUnit decodeUnit = (HttpDecodeUnit) session.getAttachment(); + if (decodeUnit == null) { + throw new RuntimeException("decodeUnit is null"); + } DataFraming dataFraming = (DataFraming) decodeUnit.getEntity(); while (buffer.hasRemaining()) { switch (dataFraming.getState()) { @@ -43,7 +66,7 @@ public class WebsocketDecoder extends HttpContentDecoder { case READING_SIZE: { if (dataFraming.getFramePayloadLen1() == 126) { if (buffer.remaining() < 2) { - return; + return null; } int length = buffer.getShort() & 0xFFFF;//无符号整数 if (length < 126) { @@ -52,7 +75,7 @@ public class WebsocketDecoder extends HttpContentDecoder { dataFraming.setFramePayloadLength(length); } else if (dataFraming.getFramePayloadLen1() == 127) { if (buffer.remaining() < 8) { - return; + return null; } long length = buffer.getLong(); if (length < 65536) { @@ -73,7 +96,7 @@ public class WebsocketDecoder extends HttpContentDecoder { case MASKING_KEY: { if (dataFraming.isFrameMasked()) { if (buffer.remaining() < 4) { - return; + return null; } byte[] maskingKey = new byte[4]; buffer.get(maskingKey); @@ -96,21 +119,11 @@ public class WebsocketDecoder extends HttpContentDecoder { } } } + return null; } - private void unmask(DataFraming framing, ByteBuffer payLoadBuffer) { - int i = payLoadBuffer.position(); - int end = payLoadBuffer.limit(); - int intMask = ((framing.getMaskingKey()[0] & 0xFF) << 24) - | ((framing.getMaskingKey()[1] & 0xFF) << 16) - | ((framing.getMaskingKey()[2] & 0xFF) << 8) - | (framing.getMaskingKey()[3] & 0xFF); - for (; i + 3 < end; i += 4) { - int unmasked = payLoadBuffer.getInt(i) ^ intMask; - payLoadBuffer.putInt(i, unmasked); - } - for (; i < end; i++) { - payLoadBuffer.put(i, (byte) (payLoadBuffer.get(i) ^ framing.getMaskingKey()[i % 4])); - } + @Override + public ByteBuffer encode(HttpRequest msg, AioSession session) { + return null; } } diff --git a/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PDisconnectClient.java b/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PDisconnectClient.java index e6bc1527..4d9c2d57 100644 --- a/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PDisconnectClient.java +++ b/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PDisconnectClient.java @@ -43,9 +43,7 @@ public class P2PDisconnectClient { AioQuickClient client = null; try { P2PClientMessageProcessor processor = new P2PClientMessageProcessor(messageFactory); - client = new AioQuickClient().connect("127.0.0.1", 8888) - .setProtocol(new P2PProtocol(messageFactory)) - .setProcessor(processor); + client = new AioQuickClient("127.0.0.1", 8888, new P2PProtocol(messageFactory), processor); client.start(asynchronousChannelGroup); long num = 0; while (num++ < 10) { @@ -54,7 +52,7 @@ public class P2PDisconnectClient { try { // processor.getSession().sendWithoutResponse(request); logger.info(processor.getSession().sendWithResponse(request, 0)); - Thread.sleep(100); +// Thread.sleep(100); } catch (Exception e) { System.out.println(num); e.printStackTrace(); diff --git a/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PMaxConnectClient.java b/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PMaxConnectClient.java index ac69b715..2b6239fe 100644 --- a/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PMaxConnectClient.java +++ b/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PMaxConnectClient.java @@ -1,10 +1,10 @@ package org.smartboot.socket.protocol.p2p.client; -import org.smartboot.socket.protocol.p2p.message.BaseMessage; import org.smartboot.socket.MessageProcessor; +import org.smartboot.socket.StateMachineEnum; +import org.smartboot.socket.protocol.p2p.message.BaseMessage; import org.smartboot.socket.transport.AioQuickClient; import org.smartboot.socket.transport.AioSession; -import org.smartboot.socket.StateMachineEnum; import java.nio.channels.AsynchronousChannelGroup; import java.util.concurrent.ThreadFactory; @@ -20,19 +20,17 @@ public class P2PMaxConnectClient { int num = 0; try { while (true) { - final AioQuickClient client = new AioQuickClient() - .setProcessor(new MessageProcessor() { - @Override - public void process(AioSession session, BaseMessage msg) { + final AioQuickClient client = new AioQuickClient("127.0.0.1", 8888, null, new MessageProcessor() { + @Override + public void process(AioSession session, BaseMessage msg) { - } + } - @Override - public void stateEvent(AioSession session, StateMachineEnum stateEnum, Throwable throwable) { + @Override + public void stateEvent(AioSession session, StateMachineEnum stateEnum, Throwable throwable) { - } - }) - .connect("127.0.0.1", 8888); + } + }); client.start(asynchronousChannelGroup); num++; Thread.sleep(1); diff --git a/smart-protocol-servlet/src/main/java/Test.java b/smart-protocol-servlet/src/main/java/Test.java index 54037192..2dd3d7d9 100644 --- a/smart-protocol-servlet/src/main/java/Test.java +++ b/smart-protocol-servlet/src/main/java/Test.java @@ -1,5 +1,4 @@ import org.smartboot.socket.extension.timer.QuickMonitorTimer; -import org.smartboot.socket.protocol.http.HttpEntity; import org.smartboot.socket.protocol.http.HttpProtocol; import org.smartboot.socket.protocol.http.process.HttpServerMessageProcessor; import org.smartboot.socket.protocol.http.servlet.core.WinstoneRequest; @@ -14,15 +13,13 @@ public class Test { // 定义服务器接受的消息类型以及各类消息对应的处理器 // config.setFilters(new SmartFilter[] { new QuickMonitorTimer() }); - HashMap arg=new HashMap(); - arg.put("warfile","/Users/zhengjunwei/IdeaProjects/smart-platform/smart-dms/target/smart-dms.war"); - arg.put("useInvoker","true"); + HashMap arg = new HashMap(); + arg.put("warfile", "/Users/zhengjunwei/IdeaProjects/smart-platform/smart-dms/target/smart-dms.war"); + arg.put("useInvoker", "true"); HttpServerMessageProcessor processor = new HttpServerMessageProcessor(arg); - AioQuickServer server = new AioQuickServer() + AioQuickServer server = new AioQuickServer(8888, new HttpProtocol(), processor) .setThreadNum(8) - .setProtocol(new HttpProtocol()) - .setFilters(new QuickMonitorTimer()) - .setProcessor(processor); + .setFilters(new QuickMonitorTimer()); try { server.start(); } catch (IOException e) { -- Gitee From 1cd0a904261a1cd74f8036fb46e0f6866248e15b Mon Sep 17 00:00:00 2001 From: Seer Date: Thu, 15 Mar 2018 17:45:55 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/smartboot/socket/transport/AioQuickClient.java | 3 +-- .../org/smartboot/socket/transport/AioSSLQuickClient.java | 5 +---- .../socket/protocol/p2p/client/P2PDisconnectClient.java | 4 ++-- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java index 19ae1e14..876232a7 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java @@ -27,7 +27,6 @@ import java.util.concurrent.ThreadFactory; * @version V1.0.0 */ public class AioQuickClient { - protected AsynchronousSocketChannel socketChannel = null; /** * IO事件处理线程组 */ @@ -59,7 +58,7 @@ public class AioQuickClient { * @throws InterruptedException */ public void start(AsynchronousChannelGroup asynchronousChannelGroup) throws IOException, ExecutionException, InterruptedException { - this.socketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup); + AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup); socketChannel.connect(new InetSocketAddress(config.getHost(), config.getPort())).get(); //连接成功则构造AIOSession对象 session = new AioSession(socketChannel, config, new ReadCompletionHandler(), new WriteCompletionHandler(), false); diff --git a/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickClient.java b/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickClient.java index 526c17dd..8d0e8bad 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickClient.java +++ b/aio-pro/src/main/java/org/smartboot/socket/transport/AioSSLQuickClient.java @@ -9,8 +9,6 @@ package org.smartboot.socket.transport; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.smartboot.socket.MessageProcessor; import org.smartboot.socket.Protocol; import org.smartboot.socket.extension.ssl.SSLConfig; @@ -27,7 +25,6 @@ import java.util.concurrent.ExecutionException; * Created by 三刀 on 2017/6/28. */ public final class AioSSLQuickClient extends AioQuickClient { - private static final Logger LOGGER = LogManager.getLogger(AioSSLQuickClient.class); private SSLService sslService; private SSLConfig sslConfig = new SSLConfig(); @@ -52,7 +49,7 @@ public final class AioSSLQuickClient extends AioQuickClient { //启动SSL服务 sslConfig.setClientMode(true); sslService = new SSLService(sslConfig); - this.socketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup); + AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(asynchronousChannelGroup); socketChannel.connect(new InetSocketAddress(config.getHost(), config.getPort())).get(); //连接成功则构造AIOSession对象 session = new SSLAioSession(socketChannel, config, new ReadCompletionHandler(), new WriteCompletionHandler(), sslService); diff --git a/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PDisconnectClient.java b/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PDisconnectClient.java index 4d9c2d57..ede32768 100644 --- a/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PDisconnectClient.java +++ b/smart-protocol-p2p/src/main/java/org/smartboot/socket/protocol/p2p/client/P2PDisconnectClient.java @@ -52,7 +52,7 @@ public class P2PDisconnectClient { try { // processor.getSession().sendWithoutResponse(request); logger.info(processor.getSession().sendWithResponse(request, 0)); -// Thread.sleep(100); + Thread.sleep(10); } catch (Exception e) { System.out.println(num); e.printStackTrace(); @@ -104,7 +104,7 @@ public class P2PDisconnectClient { request.setDetect("台州人在杭州:" + num); try { logger.info(processor.getSession().sendWithResponse(request, 0)); - Thread.sleep(100); + Thread.sleep(10); } catch (Exception e) { System.out.println(num); e.printStackTrace(); -- Gitee From 83374cee824697dc17f53e5014270e0a9f02eadc Mon Sep 17 00:00:00 2001 From: Seer Date: Fri, 16 Mar 2018 15:23:35 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- smart-protocol-p2p/conf/log4j2_client.xml | 1 + smart-protocol-p2p/conf/log4j2_server.xml | 1 + 2 files changed, 2 insertions(+) diff --git a/smart-protocol-p2p/conf/log4j2_client.xml b/smart-protocol-p2p/conf/log4j2_client.xml index 7e49f802..a6626459 100644 --- a/smart-protocol-p2p/conf/log4j2_client.xml +++ b/smart-protocol-p2p/conf/log4j2_client.xml @@ -28,6 +28,7 @@ + diff --git a/smart-protocol-p2p/conf/log4j2_server.xml b/smart-protocol-p2p/conf/log4j2_server.xml index a5dc7f4e..482baee5 100644 --- a/smart-protocol-p2p/conf/log4j2_server.xml +++ b/smart-protocol-p2p/conf/log4j2_server.xml @@ -28,6 +28,7 @@ + -- Gitee