当前枚举的各状态机事件在发生后都会及时触发{@link MessageProcessor#stateEvent(AioSession, StateMachineEnum, Throwable)}方法。因此用户在实现的{@linkplain MessageProcessor}接口中可对自己关心的状态机事件进行处理。
+ * 当前枚举的各状态机事件在发生后都会及时触发{@link AioSession#stateEvent(StateMachineEnum, Throwable)}方法。因此用户在具体实现的{@linkplain AioSession}中可对自己关心的状态机事件进行处理。
*
* @author 三刀
* @version V1.0.0 2018/5/19
- * @see MessageProcessor
+ * @see AioSession
*/
public enum StateMachineEnum {
/**
@@ -28,6 +28,7 @@ public enum StateMachineEnum {
* 读通道已被关闭。
*
* - 对端主动关闭write通道,致使本通常满足了EOF条件
* - 当前AioSession处理完读操作后检测到自身正处于{@link StateMachineEnum#SESSION_CLOSING}状态
@@ -37,7 +38,7 @@ public enum StateMachineEnum {
INPUT_SHUTDOWN,
/**
* 业务处理异常。
- * 执行{@link MessageProcessor#process(AioSession, Object)}期间发生用户未捕获的异常。
+ * 执行{@link AioSession#process(Object)}期间发生用户未捕获的异常。
*/
PROCESS_EXCEPTION,
/**
diff --git a/aio-core/src/main/java/org/smartboot/socket/package-info.java b/aio-core/src/main/java/org/smartboot/socket/package-info.java
index 8cc5d60260da7ffe8c6631058a624e3e6e6cc5c8..ea91193f635c464c0f51762e7add33f192c237b5 100644
--- a/aio-core/src/main/java/org/smartboot/socket/package-info.java
+++ b/aio-core/src/main/java/org/smartboot/socket/package-info.java
@@ -4,8 +4,9 @@
*
*
*
- * 用户进行通信开发时需要实现该package中的接口,通常情况下仅需实现{@link org.smartboot.socket.Protocol}、{@link org.smartboot.socket.MessageProcessor}即可。
- * 如需进行整个服务级别的监控、维护,可选择性的使用{@link org.smartboot.socket.Filter}。
+ * 用户进行通信开发时需要实现该package中的接口,通常情况下仅需实现{@link org.smartboot.socket.Protocol}接口,
+ * {@link org.smartboot.socket.AioSession}即可。
+ * 如需进行整个服务级别的监控、维护,可选择性的使用{@link org.smartboot.socket.Plugin}。
*
*
*
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java
index 7311a447647fa9e9b61a6d63bd046acc69ad5800..5beab565465ba8e533fafe9b15672c3be7d9cdde 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickClient.java
@@ -9,10 +9,6 @@
package org.smartboot.socket.transport;
-import org.smartboot.socket.Filter;
-import org.smartboot.socket.MessageProcessor;
-import org.smartboot.socket.Protocol;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -23,6 +19,9 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
+import org.smartboot.socket.Plugin;
+import org.smartboot.socket.Protocol;
+
/**
* AIO实现的客户端服务。
*
@@ -72,13 +71,13 @@ public class AioQuickClient {
* @param host 远程服务器地址
* @param port 远程服务器端口号
* @param protocol 协议编解码
- * @param messageProcessor 消息处理器
+ * @param sessionFactory session工厂
*/
- public AioQuickClient(String host, int port, Protocol protocol, MessageProcessor messageProcessor) {
+ public AioQuickClient(String host, int port, Protocol protocol, SessionFactory sessionFactory) {
config.setHost(host);
config.setPort(port);
config.setProtocol(protocol);
- config.setProcessor(messageProcessor);
+ config.setSessionFactory(sessionFactory);
}
/**
@@ -104,7 +103,7 @@ public class AioQuickClient {
//bind host
socketChannel.connect(new InetSocketAddress(config.getHost(), config.getPort())).get();
//连接成功则构造AIOSession对象
- session = new AioSession(socketChannel, config, new ReadCompletionHandler(), new WriteCompletionHandler(), false);
+ session = config.getSessionFactory().newSession(socketChannel, config, new ReadCompletionHandler(), new WriteCompletionHandler(), false);
session.initSession();
}
@@ -146,12 +145,13 @@ public class AioQuickClient {
/**
- * 设置消息过滤器,执行顺序以数组中的顺序为准
+ * 设置插件,执行顺序以数组中的顺序为准
*
- * @param filters 过滤器数组
+ * @param plugins 插件数组
*/
- public final AioQuickClient setFilters(Filter[] filters) {
- this.config.setFilters(filters);
+ @SafeVarargs
+ public final AioQuickClient setPlugins(Plugin... plugins) {
+ this.config.setPlugins(plugins);
return this;
}
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java
index bd48efd6d5733faf86a02c25b2fc9afed011cbb7..aaa1509af5f71ec58dcdaed9afd28f06b2a0db3d 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioQuickServer.java
@@ -8,12 +8,6 @@
package org.smartboot.socket.transport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.smartboot.socket.Filter;
-import org.smartboot.socket.MessageProcessor;
-import org.smartboot.socket.Protocol;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketOption;
@@ -23,7 +17,11 @@ import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartboot.socket.Plugin;
+import org.smartboot.socket.Protocol;
/**
* AIO服务端。
@@ -60,28 +58,28 @@ public class AioQuickServer {
protected WriteCompletionHandler aioWriteCompletionHandler = new WriteCompletionHandler<>();
private AsynchronousServerSocketChannel serverSocketChannel = null;
private AsynchronousChannelGroup asynchronousChannelGroup;
-
+
/**
* 设置服务端启动必要参数配置
*
* @param port 绑定服务端口号
* @param protocol 协议编解码
- * @param messageProcessor 消息处理器
+ * @param sessionFactory session工厂
*/
- public AioQuickServer(int port, Protocol protocol, MessageProcessor messageProcessor) {
+ public AioQuickServer(int port, Protocol protocol, SessionFactory sessionFactory) {
config.setPort(port);
config.setProtocol(protocol);
- config.setProcessor(messageProcessor);
+ config.setSessionFactory(sessionFactory);
}
/**
* @param host 绑定服务端Host地址
* @param port 绑定服务端口号
* @param protocol 协议编解码
- * @param messageProcessor 消息处理器
+ * @param sessionFactory session工厂
*/
- public AioQuickServer(String host, int port, Protocol protocol, MessageProcessor messageProcessor) {
- this(port, protocol, messageProcessor);
+ public AioQuickServer(String host, int port, Protocol protocol, SessionFactory sessionFactory) {
+ this(port, protocol, sessionFactory);
config.setHost(host);
}
@@ -141,6 +139,12 @@ public class AioQuickServer {
shutdown();
throw e;
}
+
+ //start the plugins
+ for(Plugin plugin : config.getPlugins()) {
+ plugin.start();
+ }
+
LOGGER.info("smart-socket server started on port {}", config.getPort());
LOGGER.info("smart-socket server config is {}", config);
}
@@ -152,7 +156,7 @@ public class AioQuickServer {
*/
protected void createSession(AsynchronousSocketChannel channel) {
//连接成功则构造AIOSession对象
- AioSession session = new AioSession(channel, config, aioReadCompletionHandler, aioWriteCompletionHandler, true);
+ AioSession session = config.getSessionFactory().newSession(channel, config, aioReadCompletionHandler, aioWriteCompletionHandler, true);
session.initSession();
}
@@ -168,21 +172,21 @@ public class AioQuickServer {
} catch (IOException e) {
LOGGER.warn(e.getMessage(), e);
}
- if (!asynchronousChannelGroup.isTerminated()) {
+ if (asynchronousChannelGroup != null) {
try {
- asynchronousChannelGroup.shutdownNow();
- } catch (IOException e) {
- LOGGER.error("shutdown exception", e);
- }
+ asynchronousChannelGroup.shutdownNow();
+ } catch (IOException e) {
+ LOGGER.warn(e.getMessage(), e);
+ }
+ asynchronousChannelGroup = null;
}
- try {
- asynchronousChannelGroup.awaitTermination(3, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.error("shutdown exception", e);
+
+ //close the plugins
+ for(Plugin plugin : config.getPlugins()) {
+ plugin.stop();
}
}
-
/**
* 设置处理线程数量
*
@@ -195,13 +199,13 @@ public class AioQuickServer {
/**
- * 设置消息过滤器,执行顺序以数组中的顺序为准
+ * 设置插件,执行顺序以数组中的顺序为准
*
- * @param filters 过滤器数组
+ * @param plugins 插件数组
*/
@SafeVarargs
- public final AioQuickServer setFilters(Filter... filters) {
- this.config.setFilters(filters);
+ public final AioQuickServer setPlugins(Plugin... plugins) {
+ this.config.setPlugins(plugins);
return this;
}
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java
index d73e8da37a0743fd075aa0e4310f59a16da8b73a..d40ea12d08efdbc0ed542ac84fd6ec2efbbd0999 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/AioSession.java
@@ -9,11 +9,6 @@
package org.smartboot.socket.transport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.smartboot.socket.Filter;
-import org.smartboot.socket.StateMachineEnum;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidObjectException;
@@ -21,6 +16,12 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartboot.socket.Plugin;
+import org.smartboot.socket.StateMachineEnum;
/**
* AIO传输层会话。
@@ -50,7 +51,7 @@ import java.util.concurrent.Semaphore;
* @author 三刀
* @version V1.0.0
*/
-public class AioSession {
+public abstract class AioSession {
/**
* Session状态:已关闭
*/
@@ -110,7 +111,13 @@ public class AioSession {
private Semaphore semaphore = new Semaphore(1);
private IoServerConfig ioServerConfig;
private InputStream inputStream;
-
+
+ /**
+ * session id的生成器
+ */
+ private static final AtomicInteger idGen = new AtomicInteger(0);
+ private int sessionId;
+
/**
* @param channel
* @param config
@@ -118,7 +125,8 @@ public class AioSession {
* @param writeCompletionHandler
* @param serverSession 是否服务端Session
*/
- AioSession(AsynchronousSocketChannel channel, IoServerConfig config, ReadCompletionHandler readCompletionHandler, WriteCompletionHandler writeCompletionHandler, boolean serverSession) {
+ protected AioSession(AsynchronousSocketChannel channel, IoServerConfig config, ReadCompletionHandler readCompletionHandler, WriteCompletionHandler writeCompletionHandler, boolean serverSession) {
+ this.sessionId = idGen.incrementAndGet();
this.channel = channel;
this.readCompletionHandler = readCompletionHandler;
this.writeCompletionHandler = writeCompletionHandler;
@@ -128,10 +136,10 @@ public class AioSession {
this.ioServerConfig = config;
this.serverFlowLimit = serverSession && config.getWriteQueueSize() > 0 && config.isFlowControlEnabled() ? false : null;
//触发状态机
- config.getProcessor().stateEvent(this, StateMachineEnum.NEW_SESSION, null);
- this.readBuffer = allocateReadBuffer(config.getReadBufferSize());
- for (Filter filter : config.getFilters()) {
- filter.connected(this);
+ stateEvent(StateMachineEnum.NEW_SESSION, null);
+ this.readBuffer = newByteBuffer0(config.getReadBufferSize());
+ for (Plugin plugin : config.getPlugins()) {
+ plugin.connected(this);
}
}
@@ -171,7 +179,7 @@ public class AioSession {
writeBuffer = headBuffer;
} else {
if (writeBuffer == null || totalSize << 1 <= writeBuffer.capacity() || totalSize > writeBuffer.capacity()) {
- writeBuffer = allocateReadBuffer(totalSize);
+ writeBuffer = newByteBuffer0(totalSize);
} else {
writeBuffer.clear().limit(totalSize);
}
@@ -183,7 +191,7 @@ public class AioSession {
//如果存在流控并符合释放条件,则触发读操作
//一定要放在continueWrite之前
if (serverFlowLimit != null && serverFlowLimit && writeCacheQueue.size() < ioServerConfig.getReleaseLine()) {
- ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.RELEASE_FLOW_LIMIT, null);
+ stateEvent(StateMachineEnum.RELEASE_FLOW_LIMIT, null);
serverFlowLimit = false;
continueRead();
}
@@ -297,15 +305,15 @@ public class AioSession {
} catch (IOException e) {
logger.debug("close session exception", e);
}
- for (Filter filter : ioServerConfig.getFilters()) {
- filter.closed(this);
+ for (Plugin plugin : ioServerConfig.getPlugins()) {
+ plugin.closed(this);
}
- ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.SESSION_CLOSED, null);
+ stateEvent(StateMachineEnum.SESSION_CLOSED, null);
} else if ((writeBuffer == null || !writeBuffer.hasRemaining()) && (writeCacheQueue == null || writeCacheQueue.size() == 0) && semaphore.tryAcquire()) {
close(true);
semaphore.release();
} else {
- ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.SESSION_CLOSING, null);
+ stateEvent(StateMachineEnum.SESSION_CLOSING, null);
}
}
@@ -313,7 +321,7 @@ public class AioSession {
* 获取当前Session的唯一标识
*/
public final String getSessionID() {
- return "aioSession-" + hashCode();
+ return "aioSession-" + this.sessionId;
}
/**
@@ -323,6 +331,24 @@ public class AioSession {
return status != SESSION_STATUS_ENABLED;
}
+ /**
+ * 处理接收到的消息
+ *
+ * @param msg 待处理的业务消息
+ */
+ protected abstract void process(T msg) throws Exception;
+
+ /**
+ * 状态机事件,当枚举事件发生时由框架触发该方法
+ *
+ * {@link Plugin}属于通信级别的过滤器,监控全局系统服务状态;而状态机则是{@linkplain AioSession}内部的状态获取,相较于Plugin更加轻量灵活。
+ *
+ * @param stateMachineEnum 状态枚举
+ * @param throwable 异常对象,如果存在的话
+ * @see StateMachineEnum
+ */
+ protected abstract void stateEvent(StateMachineEnum stateMachineEnum, Throwable throwable);
+
/**
* 触发通道的读操作,当发现存在严重消息积压时,会触发流控
*/
@@ -333,13 +359,13 @@ public class AioSession {
while ((dataEntry = ioServerConfig.getProtocol().decode(readBuffer, this, eof)) != null) {
//处理消息
try {
- for (Filter h : ioServerConfig.getFilters()) {
- h.processFilter(this, dataEntry);
+ for (Plugin h : ioServerConfig.getPlugins()) {
+ h.process(this, dataEntry);
}
- ioServerConfig.getProcessor().process(this, dataEntry);
+ process(dataEntry);
} catch (Exception e) {
- ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.PROCESS_EXCEPTION, e);
- for (Filter h : ioServerConfig.getFilters()) {
+ stateEvent(StateMachineEnum.PROCESS_EXCEPTION, e);
+ for (Plugin h : ioServerConfig.getPlugins()) {
h.processFail(this, dataEntry, e);
}
}
@@ -348,7 +374,7 @@ public class AioSession {
if (eof || status == SESSION_STATUS_CLOSING) {
close(false);
- ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.INPUT_SHUTDOWN, null);
+ stateEvent(StateMachineEnum.INPUT_SHUTDOWN, null);
return;
}
if (status == SESSION_STATUS_CLOSED) {
@@ -369,7 +395,7 @@ public class AioSession {
//触发流控
if (serverFlowLimit != null && writeCacheQueue.size() > ioServerConfig.getFlowLimitLine()) {
serverFlowLimit = true;
- ioServerConfig.getProcessor().stateEvent(this, StateMachineEnum.FLOW_LIMIT, null);
+ stateEvent(StateMachineEnum.FLOW_LIMIT, null);
} else {
continueRead();
}
@@ -436,16 +462,7 @@ public class AioSession {
return this.ioServerConfig;
}
- /**
- * 申请新ReadBuffer。
- *
- * 重新申请readBuffer前请保证老的数据都正确处理
- *
- *
- * @param size
- * @return
- */
- private ByteBuffer allocateReadBuffer(int size) {
+ private ByteBuffer newByteBuffer0(int size) {
return ioServerConfig.isDirectBuffer() ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
}
@@ -520,4 +537,28 @@ public class AioSession {
}
}
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if(obj == null || !(obj instanceof AioSession)) {
+ return false;
+ }else {
+ return this.sessionId == ((AioSession)obj).sessionId;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return getSessionID().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return "{" + getSessionID() + ", " + getRemoteAddress().getAddress().getHostAddress() + ":" + getRemoteAddress().getPort() + "}";
+ } catch (IOException e) {
+ e.printStackTrace();
+ return "{" + getSessionID() + ", " + e.getMessage() + "}";
+ }
+ }
}
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
index bd33ba0810e1bc9fd08e7cd1bcbc0f2ae5fd39c7..53f5955e9919612db27975a86ed1b09bbed7348c 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/IoServerConfig.java
@@ -8,14 +8,20 @@
package org.smartboot.socket.transport;
-import org.smartboot.socket.Filter;
-import org.smartboot.socket.MessageProcessor;
-import org.smartboot.socket.Protocol;
-
import java.net.SocketOption;
+import java.nio.channels.AsynchronousSocketChannel;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartboot.socket.Plugin;
+import org.smartboot.socket.Protocol;
+import org.smartboot.socket.StateMachineEnum;
/**
* Quickly服务端/客户端配置信息 T:解码后生成的对象类型
@@ -23,7 +29,7 @@ import java.util.Map;
* @author 三刀
* @version V1.0.0
*/
-final class IoServerConfig {
+public final class IoServerConfig {
public static final String BANNER = " _ _ _ \n" +
" ( )_ ( ) ( )_ \n" +
@@ -32,7 +38,7 @@ final class IoServerConfig {
"\\__, \\| ( ) ( ) |( (_| || | | |_ \\__, \\( (_) )( (___ | |\\`\\ ( ___/| |_ \n" +
"(____/(_) (_) (_)`\\__,_)(_) `\\__) (____/`\\___/'`\\____)(_) (_)`\\____)`\\__)";
- public static final String VERSION = "v1.3.12";
+ public static final String VERSION = "v1.3.11";
/**
* 消息队列缓存大小
*/
@@ -52,18 +58,13 @@ final class IoServerConfig {
/**
* 服务器消息拦截器
*/
- private Filter[] filters = new Filter[0];
+ private Set> plugins = new HashSet<>();
/**
* 服务器端口号
*/
private int port = 8888;
- /**
- * 消息处理器
- */
- private MessageProcessor processor;
-
/**
* 协议编解码
*/
@@ -104,6 +105,70 @@ final class IoServerConfig {
*/
private Map, Object> socketOptions;
+ /**
+ * Session 工厂
+ * @return
+ */
+ private SessionFactory sessionFactory;
+
+ /**
+ * 默认的Session工厂
+ */
+ private final SessionFactory defaultFactory = new SessionFactory() {
+
+ final Logger logger = LoggerFactory.getLogger(AioSession.class);
+
+ @Override
+ public AioSession newSession(AsynchronousSocketChannel channel, IoServerConfig config,
+ ReadCompletionHandler readCompletionHandler, WriteCompletionHandler writeCompletionHandler,
+ boolean serverSession) {
+
+
+
+ return new AioSession(channel, config, readCompletionHandler, writeCompletionHandler, serverSession) {
+
+ @Override
+ protected void process(T msg) throws Exception {
+ logger.info("process:{}, msg:{}", toString(), msg);
+ }
+
+ @Override
+ protected void stateEvent(StateMachineEnum stateMachineEnum, Throwable throwable) {
+ switch (stateMachineEnum) {
+ case NEW_SESSION:
+ logger.info("new session:{}, throwable:{}", toString(), throwable);
+ break;
+ case INPUT_SHUTDOWN:
+ logger.info("input shutdown:{}, throwable:{}", toString(), throwable);
+ break;
+ case INPUT_EXCEPTION:
+ logger.info("input exception:{}, throwable:{}", toString(), throwable);
+ break;
+ case OUTPUT_EXCEPTION:
+ logger.info("output exception:{}, throwable:{}", toString(), throwable);
+ break;
+ case SESSION_CLOSING:
+ logger.info("session closing:{}, throwable:{}", toString(), throwable);
+ break;
+ case SESSION_CLOSED:
+ logger.info("session closed:{{}}, throwable:{}", getSessionID(), throwable);
+ break;
+ case FLOW_LIMIT:
+ logger.info("flow limit:{}, throwable:{}", toString(), throwable);
+ break;
+ case RELEASE_FLOW_LIMIT:
+ logger.info("release flow limit:{}, throwable:{}", toString(), throwable);
+ break;
+ default:
+ break;
+ }
+ }
+
+ };
+ }
+
+ };
+
public final String getHost() {
return host;
}
@@ -128,14 +193,20 @@ final class IoServerConfig {
this.threadNum = threadNum;
}
-
- public final Filter[] getFilters() {
- return filters;
+ public final Set> getPlugins() {
+ return Collections.unmodifiableSet(plugins);
}
- public final void setFilters(Filter[] filters) {
- if (filters != null) {
- this.filters = filters;
+ public final void addPlugin(Plugin plugin) {
+ if(plugin != null) {
+ this.plugins.add(plugin);
+ }
+ }
+
+ public final void setPlugins(Plugin[] plugins) {
+ if (plugins != null) {
+ this.plugins.clear();
+ this.plugins.addAll(Arrays.asList(plugins));
}
}
@@ -147,14 +218,6 @@ final class IoServerConfig {
this.protocol = protocol;
}
- public final MessageProcessor getProcessor() {
- return processor;
- }
-
- public final void setProcessor(MessageProcessor processor) {
- this.processor = processor;
- }
-
public int getWriteQueueSize() {
return writeQueueSize;
}
@@ -216,15 +279,26 @@ final class IoServerConfig {
this.flowControlEnabled = flowControlEnabled;
}
+ public void setSessionFactory(SessionFactory factory) {
+ this.sessionFactory = factory;
+ }
+
+ public SessionFactory getSessionFactory() {
+ if(this.sessionFactory != null) {
+ return this.sessionFactory;
+ }else {
+ return this.defaultFactory;
+ }
+ }
+
@Override
public String toString() {
return "IoServerConfig{" +
"writeQueueSize=" + writeQueueSize +
", readBufferSize=" + readBufferSize +
", host='" + host + '\'' +
- ", filters=" + Arrays.toString(filters) +
+ ", plugins=" + Arrays.toString(plugins.toArray(new Plugin[plugins.size()])) +
", port=" + port +
- ", processor=" + processor +
", protocol=" + protocol +
", directBuffer=" + directBuffer +
", threadNum=" + threadNum +
diff --git a/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java b/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java
index 31f14e0189723a319b1a7b3cfb4ff2e970859ec5..9b002b40e2a9b5a2b8dcd3c70e691ff94ab7f148 100644
--- a/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java
+++ b/aio-core/src/main/java/org/smartboot/socket/transport/ReadCompletionHandler.java
@@ -10,7 +10,7 @@ package org.smartboot.socket.transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.smartboot.socket.Filter;
+import org.smartboot.socket.Plugin;
import org.smartboot.socket.StateMachineEnum;
import java.io.IOException;
@@ -22,14 +22,14 @@ import java.nio.channels.CompletionHandler;
* @author 三刀
* @version V1.0.0
*/
-class ReadCompletionHandler implements CompletionHandler> {
+public final class ReadCompletionHandler implements CompletionHandler> {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadCompletionHandler.class);
@Override
public void completed(final Integer result, final AioSession aioSession) {
// 接收到的消息进行预处理
- for (Filter h : aioSession.getServerConfig().getFilters()) {
- h.readFilter(aioSession, result);
+ for (Plugin h : aioSession.getServerConfig().getPlugins()) {
+ h.readCompleted(aioSession, result);
}
aioSession.readFromChannel(result == -1);
}
@@ -47,7 +47,7 @@ class ReadCompletionHandler implements CompletionHandler implements CompletionHandler> {
+public final class WriteCompletionHandler implements CompletionHandler> {
private static final Logger LOGGER = LoggerFactory.getLogger(WriteCompletionHandler.class);
@Override
public void completed(final Integer result, final AioSession aioSession) {
// 接收到的消息进行预处理
- for (Filter h : aioSession.getServerConfig().getFilters()) {
- h.writeFilter(aioSession, result);
+ for (Plugin plugin : aioSession.getServerConfig().getPlugins()) {
+ plugin.writeCompleted(aioSession, result);
}
aioSession.writeToChannel();
}
@@ -39,7 +39,7 @@ class WriteCompletionHandler implements CompletionHandler{
+
+ @Override
+ public AioSession newSession(AsynchronousSocketChannel channel, IoServerConfig config,
+ ReadCompletionHandler readCompletionHandler,
+ WriteCompletionHandler writeCompletionHandler, boolean serverSession) {
+ return new IntegerClientSession(channel, config, readCompletionHandler, writeCompletionHandler, serverSession);
+ }
+
+ }
+
public static void main(String[] args) throws Exception {
- IntegerClientProcessor processor = new IntegerClientProcessor();
- AioQuickClient aioQuickClient = new AioQuickClient("localhost", 8888, new IntegerProtocol(), processor);
+
+ AioQuickClient aioQuickClient = new AioQuickClient("localhost", 8899, new IntegerProtocol(), new ClientSessionFactory());
aioQuickClient.start();
- processor.getSession().write(1);
Thread.sleep(1000);
aioQuickClient.shutdown();
}
diff --git a/aio-core/src/test/java/net/vinote/demo/IntegerClientProcessor.java b/aio-core/src/test/java/net/vinote/demo/IntegerClientProcessor.java
deleted file mode 100644
index 0775a8812f06f1b5edc33447a439c952bcf7fd3f..0000000000000000000000000000000000000000
--- a/aio-core/src/test/java/net/vinote/demo/IntegerClientProcessor.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package net.vinote.demo;
-
-import org.smartboot.socket.MessageProcessor;
-import org.smartboot.socket.transport.AioSession;
-import org.smartboot.socket.StateMachineEnum;
-
-/**
- * @author 三刀
- * @version V1.0 , 2017/8/23
- */
-public class IntegerClientProcessor implements MessageProcessor {
- private AioSession session;
-
- @Override
- public void process(AioSession session, Integer msg) {
- System.out.println("receive data from server:" + msg);
- }
-
- @Override
- public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) {
- switch (stateMachineEnum) {
- case NEW_SESSION:
- this.session = session;
- break;
- default:
- System.out.println("other state:" + stateMachineEnum);
- }
-
- }
-
- public AioSession getSession() {
- return session;
- }
-}
diff --git a/aio-core/src/test/java/net/vinote/demo/IntegerServer.java b/aio-core/src/test/java/net/vinote/demo/IntegerServer.java
index 5201847c8cfd07cfb032db07a4caa9cb60041d60..d3ca202d86aa0ba6ab16eb20eea7fed8da56f73d 100644
--- a/aio-core/src/test/java/net/vinote/demo/IntegerServer.java
+++ b/aio-core/src/test/java/net/vinote/demo/IntegerServer.java
@@ -1,15 +1,32 @@
package net.vinote.demo;
+import org.smartboot.plugin.AioMonitor;
import org.smartboot.socket.transport.AioQuickServer;
+import org.smartboot.socket.transport.AioSession;
+import org.smartboot.socket.transport.IoServerConfig;
+import org.smartboot.socket.transport.ReadCompletionHandler;
+import org.smartboot.socket.transport.SessionFactory;
+import org.smartboot.socket.transport.WriteCompletionHandler;
import java.io.IOException;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.util.concurrent.TimeUnit;
/**
* Created by 三刀 on 2017/7/12.
*/
public class IntegerServer {
public static void main(String[] args) throws IOException {
- AioQuickServer server = new AioQuickServer(8888, new IntegerProtocol(), new IntegerServerProcessor());
+ AioQuickServer server = new AioQuickServer(8899, new IntegerProtocol(), new SessionFactory() {
+
+ @Override
+ public AioSession newSession(AsynchronousSocketChannel channel, IoServerConfig config,
+ ReadCompletionHandler readCompletionHandler,
+ WriteCompletionHandler writeCompletionHandler, boolean serverSession) {
+ return new IntegerServerSession(channel, config, readCompletionHandler, writeCompletionHandler, serverSession);
+ }
+
+ }).setPlugins(new AioMonitor<>(5, TimeUnit.SECONDS));
server.start();
}
}
diff --git a/aio-core/src/test/java/net/vinote/demo/IntegerServerProcessor.java b/aio-core/src/test/java/net/vinote/demo/IntegerServerProcessor.java
deleted file mode 100644
index cb8a1fa8a496f96a295a0477313054917fed66e9..0000000000000000000000000000000000000000
--- a/aio-core/src/test/java/net/vinote/demo/IntegerServerProcessor.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package net.vinote.demo;
-
-import org.smartboot.socket.MessageProcessor;
-import org.smartboot.socket.transport.AioSession;
-import org.smartboot.socket.StateMachineEnum;
-
-import java.io.IOException;
-
-/**
- * @author 三刀
- * @version V1.0 , 2017/8/23
- */
-public class IntegerServerProcessor implements MessageProcessor {
- @Override
- public void process(AioSession session, Integer msg) {
- Integer respMsg = msg + 1;
- System.out.println("receive data from client: " + msg + " ,rsp:" + (respMsg));
- try {
- session.write(respMsg);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void stateEvent(AioSession session, StateMachineEnum stateMachineEnum, Throwable throwable) {
-
- }
-}