diff --git a/aio-core/pom.xml b/aio-core/pom.xml index f173c2775787217cdce3b449ba14561ae9785b43..065b82d0ae1b9553663ac1a3588ac8c3dc1c4380 100644 --- a/aio-core/pom.xml +++ b/aio-core/pom.xml @@ -1,24 +1,43 @@ - aio-core - A high performance network application framework based on Java AIO - 4.0.0 - org.smartboot.socket - aio-core - - - org.smartboot.socket - smart-socket-parent - 1.3.12 - ../smart-socket-parent - - - - - org.slf4j - slf4j-api - - + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + aio-core + A high performance network application framework based on Java AIO + 4.0.0 + org.smartboot.socket + aio-core + jar + + org.smartboot.socket + smart-socket-parent + 1.3.11 + ../smart-socket-parent + + + + + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + UTF-8 + + + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-simple + 1.7.25 + + diff --git a/aio-core/src/main/java/org/smartboot/socket/Filter.java b/aio-core/src/main/java/org/smartboot/socket/Filter.java deleted file mode 100644 index 3ede028a60407142db8f8579c37cdd7827904537..0000000000000000000000000000000000000000 --- a/aio-core/src/main/java/org/smartboot/socket/Filter.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (c) 2017, org.smartboot. All rights reserved. - * project name: smart-socket - * file name: Filter.java - * Date: 2017-11-25 - * Author: sandao - */ - -package org.smartboot.socket; - -import org.smartboot.socket.transport.AioSession; - -/** - * 消息/服务过滤器。 - *

- * smart-socket设计的{@code Filter}与状态机{@link StateMachineEnum}看上去比较类似,但两者是以不同的维度对发生的事件进行Filter处理。 - *

- *

- * 现阶段的Filter更像是事件监听器,只能被动接收事件通告,无法改变原定的处理流程。未来可能会重新设计一下,使其成为真正意义上的Filter。目前主要用于服务数据监控。 - *

- * {@code Filter}以系统服务级别实时过滤如下事件: - *
    - *
  1. connected
  2. - *
  3. closed
  4. - *
  5. processFail
  6. - *
  7. processFilter
  8. - *
  9. readFilter
  10. - *
  11. writeFilter
  12. - *
- * - * @author 三刀 - * @version V1.0.0 - */ -public interface Filter { - - /** - * 建立连接时触发过滤器 - * - * @param session 新建立的连接session - */ - void connected(AioSession session); - - /** - * 网络断链 - * - * @param session 当前以关闭的session - */ - void closed(AioSession session); - - /** - * 数据读取过滤,可用于统计流量 - * - * @param session 当前执行read的AioSession对象 - * @param readSize 本次解码读取的数据长度 - */ - void readFilter(AioSession session, int readSize); - - - /** - * 消息处理前置预处理 - * - * @param session 当前执行消息处理的session对象 - * @param msg 编解码后的消息实体 - */ - void processFilter(AioSession session, T msg); - - - /** - * 消息接受失败处理 - * - * @param session 消息处理异常的session对象 - * @param msg 编解码后的消息实体 - * @param e 本次处理异常对象 - */ - void processFail(AioSession session, T msg, Throwable e); - - /** - * 数据输出过滤,可用于统计流量 - * - * @param session 本次执行write回调的AIOSession对象 - * @param writeSize 本次输出的数据长度 - */ - void writeFilter(AioSession session, int writeSize); - -} diff --git a/aio-core/src/main/java/org/smartboot/socket/MessageProcessor.java b/aio-core/src/main/java/org/smartboot/socket/MessageProcessor.java deleted file mode 100644 index 56d65e75c8d9814d7e44e9eb82018c575c4105b7..0000000000000000000000000000000000000000 --- a/aio-core/src/main/java/org/smartboot/socket/MessageProcessor.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (c) 2017, org.smartboot. All rights reserved. - * project name: smart-socket - * file name: MessageProcessor.java - * Date: 2017-11-25 - * Author: sandao - */ - -package org.smartboot.socket; - -import org.smartboot.socket.transport.AioSession; - -/** - * 消息处理器。 - * - *

- * 通过实现该接口,对完成解码的消息进行业务处理。 - *

- *

示例:

- *

- *

- * public class IntegerServerProcessor implements MessageProcessor {
- *     public void process(AioSession session, Integer msg) {
- *         Integer respMsg = msg + 1;
- *         System.out.println("receive data from client: " + msg + " ,rsp:" + (respMsg));
- *         try {
- *             session.write(respMsg);
- *         } catch (IOException e) {
- *             e.printStackTrace();
- *         }
- *     }
- *
- *     public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) {
- *         switch (stateMachineEnum) {
- *             case NEW_SESSION:
- *                 ...
- *                 break;
- *             case SESSION_CLOSED:
- *                 ...
- *                 break;
- *             case PROCESS_EXCEPTION:
- *                 ...
- *                 break;
- *             default:
- *                 ...
- *         }
- *     }
- * }
- *     
- *

- * - * @author 三刀 - * @version V1.0.0 2018/5/19 - */ -public interface MessageProcessor { - - /** - * 处理接收到的消息 - * - * @param session 通信会话 - * @param msg 待处理的业务消息 - */ - void process(AioSession session, T msg); - - /** - * 状态机事件,当枚举事件发生时由框架触发该方法 - * - *

{@link Filter}属于通信级别的过滤器,监控全局系统服务状态;而状态机更侧重于应用级别的过滤器,相较于Filter更加轻量灵活。

- * - * @param session 本次触发状态机的AioSession对象 - * @param stateMachineEnum 状态枚举 - * @param throwable 异常对象,如果存在的话 - * @see StateMachineEnum - */ - void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable); -} diff --git a/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java b/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java index 4e37e5ba9a944ad290a1eae8af8e212ef1d2891c..30b5c52928768944e0cc19d0fa1f03719870563c 100644 --- a/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java +++ b/aio-core/src/main/java/org/smartboot/socket/StateMachineEnum.java @@ -13,11 +13,11 @@ import org.smartboot.socket.transport.AioSession; /** * 列举了当前smart-socket所关注的各类状态枚举。 * - *

当前枚举的各状态机事件在发生后都会及时触发{@link MessageProcessor#stateEvent(AioSession, StateMachineEnum, Throwable)}方法。因此用户在实现的{@linkplain MessageProcessor}接口中可对自己关心的状态机事件进行处理。

+ *

当前枚举的各状态机事件在发生后都会及时触发{@link AioSession#stateEvent(StateMachineEnum, Throwable)}方法。因此用户在具体实现的{@linkplain AioSession}中可对自己关心的状态机事件进行处理。

* * @author 三刀 * @version V1.0.0 2018/5/19 - * @see MessageProcessor + * @see AioSession */ public enum StateMachineEnum { /** @@ -28,6 +28,7 @@ public enum StateMachineEnum { * 读通道已被关闭。 *

* 通常由以下几种情况会触发该状态: + * *

    *
  1. 对端主动关闭write通道,致使本通常满足了EOF条件
  2. *
  3. 当前AioSession处理完读操作后检测到自身正处于{@link StateMachineEnum#SESSION_CLOSING}状态
  4. @@ -37,7 +38,7 @@ public enum StateMachineEnum { INPUT_SHUTDOWN, /** * 业务处理异常。 - *

    执行{@link MessageProcessor#process(AioSession, Object)}期间发生用户未捕获的异常。

    + *

    执行{@link AioSession#process(Object)}期间发生用户未捕获的异常。

    */ PROCESS_EXCEPTION, /** diff --git a/aio-core/src/main/java/org/smartboot/socket/package-info.java b/aio-core/src/main/java/org/smartboot/socket/package-info.java index 8cc5d60260da7ffe8c6631058a624e3e6e6cc5c8..ea91193f635c464c0f51762e7add33f192c237b5 100644 --- a/aio-core/src/main/java/org/smartboot/socket/package-info.java +++ b/aio-core/src/main/java/org/smartboot/socket/package-info.java @@ -4,8 +4,9 @@ * * *

    - * 用户进行通信开发时需要实现该package中的接口,通常情况下仅需实现{@link org.smartboot.socket.Protocol}、{@link org.smartboot.socket.MessageProcessor}即可。 - * 如需进行整个服务级别的监控、维护,可选择性的使用{@link org.smartboot.socket.Filter}。 + * 用户进行通信开发时需要实现该package中的接口,通常情况下仅需实现{@link org.smartboot.socket.Protocol}接口, + * {@link org.smartboot.socket.AioSession}即可。 + * 如需进行整个服务级别的监控、维护,可选择性的使用{@link org.smartboot.socket.Plugin}。 *

    * *

    diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java index 7311a447647fa9e9b61a6d63bd046acc69ad5800..5beab565465ba8e533fafe9b15672c3be7d9cdde 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java @@ -9,10 +9,6 @@ package org.smartboot.socket.transport; -import org.smartboot.socket.Filter; -import org.smartboot.socket.MessageProcessor; -import org.smartboot.socket.Protocol; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -23,6 +19,9 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; +import org.smartboot.socket.Plugin; +import org.smartboot.socket.Protocol; + /** * AIO实现的客户端服务。 * @@ -72,13 +71,13 @@ public class AioQuickClient { * @param host 远程服务器地址 * @param port 远程服务器端口号 * @param protocol 协议编解码 - * @param messageProcessor 消息处理器 + * @param sessionFactory session工厂 */ - public AioQuickClient(String host, int port, Protocol protocol, MessageProcessor messageProcessor) { + public AioQuickClient(String host, int port, Protocol protocol, SessionFactory sessionFactory) { config.setHost(host); config.setPort(port); config.setProtocol(protocol); - config.setProcessor(messageProcessor); + config.setSessionFactory(sessionFactory); } /** @@ -104,7 +103,7 @@ public class AioQuickClient { //bind host socketChannel.connect(new InetSocketAddress(config.getHost(), config.getPort())).get(); //连接成功则构造AIOSession对象 - session = new AioSession(socketChannel, config, new ReadCompletionHandler(), new WriteCompletionHandler(), false); + session = config.getSessionFactory().newSession(socketChannel, config, new ReadCompletionHandler(), new WriteCompletionHandler(), false); session.initSession(); } @@ -146,12 +145,13 @@ public class AioQuickClient { /** - * 设置消息过滤器,执行顺序以数组中的顺序为准 + * 设置插件,执行顺序以数组中的顺序为准 * - * @param filters 过滤器数组 + * @param plugins 插件数组 */ - public final AioQuickClient setFilters(Filter[] filters) { - this.config.setFilters(filters); + @SafeVarargs + public final AioQuickClient setPlugins(Plugin... plugins) { + this.config.setPlugins(plugins); return this; } diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java index bd48efd6d5733faf86a02c25b2fc9afed011cbb7..aaa1509af5f71ec58dcdaed9afd28f06b2a0db3d 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java @@ -8,12 +8,6 @@ package org.smartboot.socket.transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.smartboot.socket.Filter; -import org.smartboot.socket.MessageProcessor; -import org.smartboot.socket.Protocol; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketOption; @@ -23,7 +17,11 @@ import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.Map; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.socket.Plugin; +import org.smartboot.socket.Protocol; /** * AIO服务端。 @@ -60,28 +58,28 @@ public class AioQuickServer { protected WriteCompletionHandler aioWriteCompletionHandler = new WriteCompletionHandler<>(); private AsynchronousServerSocketChannel serverSocketChannel = null; private AsynchronousChannelGroup asynchronousChannelGroup; - + /** * 设置服务端启动必要参数配置 * * @param port 绑定服务端口号 * @param protocol 协议编解码 - * @param messageProcessor 消息处理器 + * @param sessionFactory session工厂 */ - public AioQuickServer(int port, Protocol protocol, MessageProcessor messageProcessor) { + public AioQuickServer(int port, Protocol protocol, SessionFactory sessionFactory) { config.setPort(port); config.setProtocol(protocol); - config.setProcessor(messageProcessor); + config.setSessionFactory(sessionFactory); } /** * @param host 绑定服务端Host地址 * @param port 绑定服务端口号 * @param protocol 协议编解码 - * @param messageProcessor 消息处理器 + * @param sessionFactory session工厂 */ - public AioQuickServer(String host, int port, Protocol protocol, MessageProcessor messageProcessor) { - this(port, protocol, messageProcessor); + public AioQuickServer(String host, int port, Protocol protocol, SessionFactory sessionFactory) { + this(port, protocol, sessionFactory); config.setHost(host); } @@ -141,6 +139,12 @@ public class AioQuickServer { shutdown(); throw e; } + + //start the plugins + for(Plugin plugin : config.getPlugins()) { + plugin.start(); + } + LOGGER.info("smart-socket server started on port {}", config.getPort()); LOGGER.info("smart-socket server config is {}", config); } @@ -152,7 +156,7 @@ public class AioQuickServer { */ protected void createSession(AsynchronousSocketChannel channel) { //连接成功则构造AIOSession对象 - AioSession session = new AioSession(channel, config, aioReadCompletionHandler, aioWriteCompletionHandler, true); + AioSession session = config.getSessionFactory().newSession(channel, config, aioReadCompletionHandler, aioWriteCompletionHandler, true); session.initSession(); } @@ -168,21 +172,21 @@ public class AioQuickServer { } catch (IOException e) { LOGGER.warn(e.getMessage(), e); } - if (!asynchronousChannelGroup.isTerminated()) { + if (asynchronousChannelGroup != null) { try { - asynchronousChannelGroup.shutdownNow(); - } catch (IOException e) { - LOGGER.error("shutdown exception", e); - } + asynchronousChannelGroup.shutdownNow(); + } catch (IOException e) { + LOGGER.warn(e.getMessage(), e); + } + asynchronousChannelGroup = null; } - try { - asynchronousChannelGroup.awaitTermination(3, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.error("shutdown exception", e); + + //close the plugins + for(Plugin plugin : config.getPlugins()) { + plugin.stop(); } } - /** * 设置处理线程数量 * @@ -195,13 +199,13 @@ public class AioQuickServer { /** - * 设置消息过滤器,执行顺序以数组中的顺序为准 + * 设置插件,执行顺序以数组中的顺序为准 * - * @param filters 过滤器数组 + * @param plugins 插件数组 */ @SafeVarargs - public final AioQuickServer setFilters(Filter... filters) { - this.config.setFilters(filters); + public final AioQuickServer setPlugins(Plugin... plugins) { + this.config.setPlugins(plugins); return this; } diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java index d73e8da37a0743fd075aa0e4310f59a16da8b73a..d40ea12d08efdbc0ed542ac84fd6ec2efbbd0999 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java @@ -9,11 +9,6 @@ package org.smartboot.socket.transport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.smartboot.socket.Filter; -import org.smartboot.socket.StateMachineEnum; - import java.io.IOException; import java.io.InputStream; import java.io.InvalidObjectException; @@ -21,6 +16,12 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.socket.Plugin; +import org.smartboot.socket.StateMachineEnum; /** * AIO传输层会话。 @@ -50,7 +51,7 @@ import java.util.concurrent.Semaphore; * @author 三刀 * @version V1.0.0 */ -public class AioSession { +public abstract class AioSession { /** * Session状态:已关闭 */ @@ -110,7 +111,13 @@ public class AioSession { private Semaphore semaphore = new Semaphore(1); private IoServerConfig ioServerConfig; private InputStream inputStream; - + + /** + * session id的生成器 + */ + private static final AtomicInteger idGen = new AtomicInteger(0); + private int sessionId; + /** * @param channel * @param config @@ -118,7 +125,8 @@ public class AioSession { * @param writeCompletionHandler * @param serverSession 是否服务端Session */ - AioSession(AsynchronousSocketChannel channel, IoServerConfig config, ReadCompletionHandler readCompletionHandler, WriteCompletionHandler writeCompletionHandler, boolean serverSession) { + protected AioSession(AsynchronousSocketChannel channel, IoServerConfig config, ReadCompletionHandler readCompletionHandler, WriteCompletionHandler writeCompletionHandler, boolean serverSession) { + this.sessionId = idGen.incrementAndGet(); this.channel = channel; this.readCompletionHandler = readCompletionHandler; this.writeCompletionHandler = writeCompletionHandler; @@ -128,10 +136,10 @@ public class AioSession { this.ioServerConfig = config; this.serverFlowLimit = serverSession && config.getWriteQueueSize() > 0 && config.isFlowControlEnabled() ? false : null; //触发状态机 - config.getProcessor().stateEvent(this, StateMachineEnum.NEW_SESSION, null); - this.readBuffer = allocateReadBuffer(config.getReadBufferSize()); - for (Filter filter : config.getFilters()) { - filter.connected(this); + stateEvent(StateMachineEnum.NEW_SESSION, null); + this.readBuffer = newByteBuffer0(config.getReadBufferSize()); + for (Plugin plugin : config.getPlugins()) { + plugin.connected(this); } } @@ -171,7 +179,7 @@ public class AioSession { writeBuffer = headBuffer; } else { if (writeBuffer == null || totalSize << 1 <= writeBuffer.capacity() || totalSize > writeBuffer.capacity()) { - writeBuffer = allocateReadBuffer(totalSize); + writeBuffer = newByteBuffer0(totalSize); } else { writeBuffer.clear().limit(totalSize); } @@ -183,7 +191,7 @@ public class AioSession { //如果存在流控并符合释放条件,则触发读操作 //一定要放在continueWrite之前 if (serverFlowLimit != null && serverFlowLimit && writeCacheQueue.size() < ioServerConfig.getReleaseLine()) { - ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.RELEASE_FLOW_LIMIT, null); + stateEvent(StateMachineEnum.RELEASE_FLOW_LIMIT, null); serverFlowLimit = false; continueRead(); } @@ -297,15 +305,15 @@ public class AioSession { } catch (IOException e) { logger.debug("close session exception", e); } - for (Filter filter : ioServerConfig.getFilters()) { - filter.closed(this); + for (Plugin plugin : ioServerConfig.getPlugins()) { + plugin.closed(this); } - ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.SESSION_CLOSED, null); + stateEvent(StateMachineEnum.SESSION_CLOSED, null); } else if ((writeBuffer == null || !writeBuffer.hasRemaining()) && (writeCacheQueue == null || writeCacheQueue.size() == 0) && semaphore.tryAcquire()) { close(true); semaphore.release(); } else { - ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.SESSION_CLOSING, null); + stateEvent(StateMachineEnum.SESSION_CLOSING, null); } } @@ -313,7 +321,7 @@ public class AioSession { * 获取当前Session的唯一标识 */ public final String getSessionID() { - return "aioSession-" + hashCode(); + return "aioSession-" + this.sessionId; } /** @@ -323,6 +331,24 @@ public class AioSession { return status != SESSION_STATUS_ENABLED; } + /** + * 处理接收到的消息 + * + * @param msg 待处理的业务消息 + */ + protected abstract void process(T msg) throws Exception; + + /** + * 状态机事件,当枚举事件发生时由框架触发该方法 + * + *

    {@link Plugin}属于通信级别的过滤器,监控全局系统服务状态;而状态机则是{@linkplain AioSession}内部的状态获取,相较于Plugin更加轻量灵活。

    + * + * @param stateMachineEnum 状态枚举 + * @param throwable 异常对象,如果存在的话 + * @see StateMachineEnum + */ + protected abstract void stateEvent(StateMachineEnum stateMachineEnum, Throwable throwable); + /** * 触发通道的读操作,当发现存在严重消息积压时,会触发流控 */ @@ -333,13 +359,13 @@ public class AioSession { while ((dataEntry = ioServerConfig.getProtocol().decode(readBuffer, this, eof)) != null) { //处理消息 try { - for (Filter h : ioServerConfig.getFilters()) { - h.processFilter(this, dataEntry); + for (Plugin h : ioServerConfig.getPlugins()) { + h.process(this, dataEntry); } - ioServerConfig.getProcessor().process(this, dataEntry); + process(dataEntry); } catch (Exception e) { - ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.PROCESS_EXCEPTION, e); - for (Filter h : ioServerConfig.getFilters()) { + stateEvent(StateMachineEnum.PROCESS_EXCEPTION, e); + for (Plugin h : ioServerConfig.getPlugins()) { h.processFail(this, dataEntry, e); } } @@ -348,7 +374,7 @@ public class AioSession { if (eof || status == SESSION_STATUS_CLOSING) { close(false); - ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.INPUT_SHUTDOWN, null); + stateEvent(StateMachineEnum.INPUT_SHUTDOWN, null); return; } if (status == SESSION_STATUS_CLOSED) { @@ -369,7 +395,7 @@ public class AioSession { //触发流控 if (serverFlowLimit != null && writeCacheQueue.size() > ioServerConfig.getFlowLimitLine()) { serverFlowLimit = true; - ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.FLOW_LIMIT, null); + stateEvent(StateMachineEnum.FLOW_LIMIT, null); } else { continueRead(); } @@ -436,16 +462,7 @@ public class AioSession { return this.ioServerConfig; } - /** - * 申请新ReadBuffer。 - *

    - * 重新申请readBuffer前请保证老的数据都正确处理 - *

    - * - * @param size - * @return - */ - private ByteBuffer allocateReadBuffer(int size) { + private ByteBuffer newByteBuffer0(int size) { return ioServerConfig.isDirectBuffer() ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size); } @@ -520,4 +537,28 @@ public class AioSession { } } } + + @Override + public boolean equals(Object obj) { + if(obj == null || !(obj instanceof AioSession)) { + return false; + }else { + return this.sessionId == ((AioSession)obj).sessionId; + } + } + + @Override + public int hashCode() { + return getSessionID().hashCode(); + } + + @Override + public String toString() { + try { + return "{" + getSessionID() + ", " + getRemoteAddress().getAddress().getHostAddress() + ":" + getRemoteAddress().getPort() + "}"; + } catch (IOException e) { + e.printStackTrace(); + return "{" + getSessionID() + ", " + e.getMessage() + "}"; + } + } } diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java index bd33ba0810e1bc9fd08e7cd1bcbc0f2ae5fd39c7..53f5955e9919612db27975a86ed1b09bbed7348c 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java @@ -8,14 +8,20 @@ package org.smartboot.socket.transport; -import org.smartboot.socket.Filter; -import org.smartboot.socket.MessageProcessor; -import org.smartboot.socket.Protocol; - import java.net.SocketOption; +import java.nio.channels.AsynchronousSocketChannel; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.socket.Plugin; +import org.smartboot.socket.Protocol; +import org.smartboot.socket.StateMachineEnum; /** * Quickly服务端/客户端配置信息 T:解码后生成的对象类型 @@ -23,7 +29,7 @@ import java.util.Map; * @author 三刀 * @version V1.0.0 */ -final class IoServerConfig { +public final class IoServerConfig { public static final String BANNER = " _ _ _ \n" + " ( )_ ( ) ( )_ \n" + @@ -32,7 +38,7 @@ final class IoServerConfig { "\\__, \\| ( ) ( ) |( (_| || | | |_ \\__, \\( (_) )( (___ | |\\`\\ ( ___/| |_ \n" + "(____/(_) (_) (_)`\\__,_)(_) `\\__) (____/`\\___/'`\\____)(_) (_)`\\____)`\\__)"; - public static final String VERSION = "v1.3.12"; + public static final String VERSION = "v1.3.11"; /** * 消息队列缓存大小 */ @@ -52,18 +58,13 @@ final class IoServerConfig { /** * 服务器消息拦截器 */ - private Filter[] filters = new Filter[0]; + private Set> plugins = new HashSet<>(); /** * 服务器端口号 */ private int port = 8888; - /** - * 消息处理器 - */ - private MessageProcessor processor; - /** * 协议编解码 */ @@ -104,6 +105,70 @@ final class IoServerConfig { */ private Map, Object> socketOptions; + /** + * Session 工厂 + * @return + */ + private SessionFactory sessionFactory; + + /** + * 默认的Session工厂 + */ + private final SessionFactory defaultFactory = new SessionFactory() { + + final Logger logger = LoggerFactory.getLogger(AioSession.class); + + @Override + public AioSession newSession(AsynchronousSocketChannel channel, IoServerConfig config, + ReadCompletionHandler readCompletionHandler, WriteCompletionHandler writeCompletionHandler, + boolean serverSession) { + + + + return new AioSession(channel, config, readCompletionHandler, writeCompletionHandler, serverSession) { + + @Override + protected void process(T msg) throws Exception { + logger.info("process:{}, msg:{}", toString(), msg); + } + + @Override + protected void stateEvent(StateMachineEnum stateMachineEnum, Throwable throwable) { + switch (stateMachineEnum) { + case NEW_SESSION: + logger.info("new session:{}, throwable:{}", toString(), throwable); + break; + case INPUT_SHUTDOWN: + logger.info("input shutdown:{}, throwable:{}", toString(), throwable); + break; + case INPUT_EXCEPTION: + logger.info("input exception:{}, throwable:{}", toString(), throwable); + break; + case OUTPUT_EXCEPTION: + logger.info("output exception:{}, throwable:{}", toString(), throwable); + break; + case SESSION_CLOSING: + logger.info("session closing:{}, throwable:{}", toString(), throwable); + break; + case SESSION_CLOSED: + logger.info("session closed:{{}}, throwable:{}", getSessionID(), throwable); + break; + case FLOW_LIMIT: + logger.info("flow limit:{}, throwable:{}", toString(), throwable); + break; + case RELEASE_FLOW_LIMIT: + logger.info("release flow limit:{}, throwable:{}", toString(), throwable); + break; + default: + break; + } + } + + }; + } + + }; + public final String getHost() { return host; } @@ -128,14 +193,20 @@ final class IoServerConfig { this.threadNum = threadNum; } - - public final Filter[] getFilters() { - return filters; + public final Set> getPlugins() { + return Collections.unmodifiableSet(plugins); } - public final void setFilters(Filter[] filters) { - if (filters != null) { - this.filters = filters; + public final void addPlugin(Plugin plugin) { + if(plugin != null) { + this.plugins.add(plugin); + } + } + + public final void setPlugins(Plugin[] plugins) { + if (plugins != null) { + this.plugins.clear(); + this.plugins.addAll(Arrays.asList(plugins)); } } @@ -147,14 +218,6 @@ final class IoServerConfig { this.protocol = protocol; } - public final MessageProcessor getProcessor() { - return processor; - } - - public final void setProcessor(MessageProcessor processor) { - this.processor = processor; - } - public int getWriteQueueSize() { return writeQueueSize; } @@ -216,15 +279,26 @@ final class IoServerConfig { this.flowControlEnabled = flowControlEnabled; } + public void setSessionFactory(SessionFactory factory) { + this.sessionFactory = factory; + } + + public SessionFactory getSessionFactory() { + if(this.sessionFactory != null) { + return this.sessionFactory; + }else { + return this.defaultFactory; + } + } + @Override public String toString() { return "IoServerConfig{" + "writeQueueSize=" + writeQueueSize + ", readBufferSize=" + readBufferSize + ", host='" + host + '\'' + - ", filters=" + Arrays.toString(filters) + + ", plugins=" + Arrays.toString(plugins.toArray(new Plugin[plugins.size()])) + ", port=" + port + - ", processor=" + processor + ", protocol=" + protocol + ", directBuffer=" + directBuffer + ", threadNum=" + threadNum + diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java b/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java index 31f14e0189723a319b1a7b3cfb4ff2e970859ec5..9b002b40e2a9b5a2b8dcd3c70e691ff94ab7f148 100644 --- a/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java +++ b/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java @@ -10,7 +10,7 @@ package org.smartboot.socket.transport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.smartboot.socket.Filter; +import org.smartboot.socket.Plugin; import org.smartboot.socket.StateMachineEnum; import java.io.IOException; @@ -22,14 +22,14 @@ import java.nio.channels.CompletionHandler; * @author 三刀 * @version V1.0.0 */ -class ReadCompletionHandler implements CompletionHandler> { +public final class ReadCompletionHandler implements CompletionHandler> { private static final Logger LOGGER = LoggerFactory.getLogger(ReadCompletionHandler.class); @Override public void completed(final Integer result, final AioSession aioSession) { // 接收到的消息进行预处理 - for (Filter h : aioSession.getServerConfig().getFilters()) { - h.readFilter(aioSession, result); + for (Plugin h : aioSession.getServerConfig().getPlugins()) { + h.readCompleted(aioSession, result); } aioSession.readFromChannel(result == -1); } @@ -47,7 +47,7 @@ class ReadCompletionHandler implements CompletionHandler implements CompletionHandler> { +public final class WriteCompletionHandler implements CompletionHandler> { private static final Logger LOGGER = LoggerFactory.getLogger(WriteCompletionHandler.class); @Override public void completed(final Integer result, final AioSession aioSession) { // 接收到的消息进行预处理 - for (Filter h : aioSession.getServerConfig().getFilters()) { - h.writeFilter(aioSession, result); + for (Plugin plugin : aioSession.getServerConfig().getPlugins()) { + plugin.writeCompleted(aioSession, result); } aioSession.writeToChannel(); } @@ -39,7 +39,7 @@ class WriteCompletionHandler implements CompletionHandler{ + + @Override + public AioSession newSession(AsynchronousSocketChannel channel, IoServerConfig config, + ReadCompletionHandler readCompletionHandler, + WriteCompletionHandler writeCompletionHandler, boolean serverSession) { + return new IntegerClientSession(channel, config, readCompletionHandler, writeCompletionHandler, serverSession); + } + + } + public static void main(String[] args) throws Exception { - IntegerClientProcessor processor = new IntegerClientProcessor(); - AioQuickClient aioQuickClient = new AioQuickClient("localhost", 8888, new IntegerProtocol(), processor); + + AioQuickClient aioQuickClient = new AioQuickClient("localhost", 8899, new IntegerProtocol(), new ClientSessionFactory()); aioQuickClient.start(); - processor.getSession().write(1); Thread.sleep(1000); aioQuickClient.shutdown(); } diff --git a/aio-core/src/test/java/net/vinote/demo/IntegerClientProcessor.java b/aio-core/src/test/java/net/vinote/demo/IntegerClientProcessor.java deleted file mode 100644 index 0775a8812f06f1b5edc33447a439c952bcf7fd3f..0000000000000000000000000000000000000000 --- a/aio-core/src/test/java/net/vinote/demo/IntegerClientProcessor.java +++ /dev/null @@ -1,34 +0,0 @@ -package net.vinote.demo; - -import org.smartboot.socket.MessageProcessor; -import org.smartboot.socket.transport.AioSession; -import org.smartboot.socket.StateMachineEnum; - -/** - * @author 三刀 - * @version V1.0 , 2017/8/23 - */ -public class IntegerClientProcessor implements MessageProcessor { - private AioSession session; - - @Override - public void process(AioSession session, Integer msg) { - System.out.println("receive data from server:" + msg); - } - - @Override - public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) { - switch (stateMachineEnum) { - case NEW_SESSION: - this.session = session; - break; - default: - System.out.println("other state:" + stateMachineEnum); - } - - } - - public AioSession getSession() { - return session; - } -} diff --git a/aio-core/src/test/java/net/vinote/demo/IntegerServer.java b/aio-core/src/test/java/net/vinote/demo/IntegerServer.java index 5201847c8cfd07cfb032db07a4caa9cb60041d60..d3ca202d86aa0ba6ab16eb20eea7fed8da56f73d 100644 --- a/aio-core/src/test/java/net/vinote/demo/IntegerServer.java +++ b/aio-core/src/test/java/net/vinote/demo/IntegerServer.java @@ -1,15 +1,32 @@ package net.vinote.demo; +import org.smartboot.plugin.AioMonitor; import org.smartboot.socket.transport.AioQuickServer; +import org.smartboot.socket.transport.AioSession; +import org.smartboot.socket.transport.IoServerConfig; +import org.smartboot.socket.transport.ReadCompletionHandler; +import org.smartboot.socket.transport.SessionFactory; +import org.smartboot.socket.transport.WriteCompletionHandler; import java.io.IOException; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.concurrent.TimeUnit; /** * Created by 三刀 on 2017/7/12. */ public class IntegerServer { public static void main(String[] args) throws IOException { - AioQuickServer server = new AioQuickServer(8888, new IntegerProtocol(), new IntegerServerProcessor()); + AioQuickServer server = new AioQuickServer(8899, new IntegerProtocol(), new SessionFactory() { + + @Override + public AioSession newSession(AsynchronousSocketChannel channel, IoServerConfig config, + ReadCompletionHandler readCompletionHandler, + WriteCompletionHandler writeCompletionHandler, boolean serverSession) { + return new IntegerServerSession(channel, config, readCompletionHandler, writeCompletionHandler, serverSession); + } + + }).setPlugins(new AioMonitor<>(5, TimeUnit.SECONDS)); server.start(); } } diff --git a/aio-core/src/test/java/net/vinote/demo/IntegerServerProcessor.java b/aio-core/src/test/java/net/vinote/demo/IntegerServerProcessor.java deleted file mode 100644 index cb8a1fa8a496f96a295a0477313054917fed66e9..0000000000000000000000000000000000000000 --- a/aio-core/src/test/java/net/vinote/demo/IntegerServerProcessor.java +++ /dev/null @@ -1,29 +0,0 @@ -package net.vinote.demo; - -import org.smartboot.socket.MessageProcessor; -import org.smartboot.socket.transport.AioSession; -import org.smartboot.socket.StateMachineEnum; - -import java.io.IOException; - -/** - * @author 三刀 - * @version V1.0 , 2017/8/23 - */ -public class IntegerServerProcessor implements MessageProcessor { - @Override - public void process(AioSession session, Integer msg) { - Integer respMsg = msg + 1; - System.out.println("receive data from client: " + msg + " ,rsp:" + (respMsg)); - try { - session.write(respMsg); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) { - - } -}