diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java b/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java index 7a3b0f5b5d14cc4aad4022582192fdb0f824d0fd..60d16f2012caef6fe6aa0255c53473e8129ccf34 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java +++ b/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java @@ -14,9 +14,9 @@ import org.smartboot.socket.timer.HashedWheelTimer; import org.smartboot.socket.transport.AioSession; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** @@ -26,22 +26,26 @@ import java.util.concurrent.TimeUnit; * @version V1.0 , 2018/8/19 */ public abstract class HeartPlugin extends AbstractPlugin { - private static final TimeoutCallback DEFAULT_TIMEOUT_CALLBACK = new TimeoutCallback() { - @Override - public void callback(AioSession session, long lastTime) { - session.close(true); - } - }; - private Map sessionMap = new HashMap<>(); + /** + * 默认超时回调方法 + */ + private static final TimeoutCallback DEFAULT_TIMEOUT_CALLBACK = (session, lastTime) -> session.close(true); + /** + * 每个会话接收到最后一条消息的时间 + */ + private final Map sessionMap = new ConcurrentHashMap<>(); /** * 心跳频率 */ - private long heartRate; + private final long heartRate; /** * 在超时时间内未收到消息,关闭连接。 */ - private long timeout; - private TimeoutCallback timeoutCallback; + private final long timeout; + /** + * 超时回调 + */ + private final TimeoutCallback timeoutCallback; /** * 心跳插件 @@ -78,7 +82,7 @@ public abstract class HeartPlugin extends AbstractPlugin { */ public HeartPlugin(int heartRate, int timeout, TimeUnit timeUnit, TimeoutCallback timeoutCallback) { if (timeout > 0 && heartRate >= timeout) { - throw new IllegalArgumentException("heartRate must little then timeout"); + throw new IllegalArgumentException("heartRate must little than timeout"); } this.heartRate = timeUnit.toMillis(heartRate); this.timeout = timeUnit.toMillis(timeout); @@ -87,8 +91,8 @@ public abstract class HeartPlugin extends AbstractPlugin { @Override public final boolean preProcess(AioSession session, T t) { - sessionMap.put(session, System.currentTimeMillis()); - //是否心跳响应消息 + sessionMap.computeIfPresent(session, (k, v) -> System.currentTimeMillis()); + // 是否心跳响应消息 return !isHeartMessage(session, t); } @@ -97,11 +101,11 @@ public abstract class HeartPlugin extends AbstractPlugin { switch (stateMachineEnum) { case NEW_SESSION: sessionMap.put(session, System.currentTimeMillis()); + // 注册心跳监测 registerHeart(session, heartRate); - //注册心跳监测 break; case SESSION_CLOSED: - //移除心跳监测 + // 移除心跳监测 sessionMap.remove(session); break; default: @@ -112,8 +116,8 @@ public abstract class HeartPlugin extends AbstractPlugin { /** * 自定义心跳消息并发送 * - * @param session - * @throws IOException + * @param session 当前会话 + * @throws IOException 抛出异常 */ public abstract void sendHeartRequest(AioSession session) throws IOException; @@ -121,12 +125,18 @@ public abstract class HeartPlugin extends AbstractPlugin { * 判断当前收到的消息是否为心跳消息。 * 心跳请求消息与响应消息可能相同,也可能不同,因实际场景而异,故接口定义不做区分。 * - * @param session - * @param msg - * @return + * @param session 当前会话 + * @param msg 接收到的消息 + * @return true:是心跳消息,false:非心跳消息 */ public abstract boolean isHeartMessage(AioSession session, T msg); + /** + * 注册心跳任务 + * + * @param session 当前会话 + * @param heartRate 心跳频率 + */ private void registerHeart(final AioSession session, final long heartRate) { if (heartRate <= 0) { // LOGGER.info("session:{} 因心跳间隔为:{},终止启动心跳监测任务", session, heartRate); @@ -141,18 +151,13 @@ public abstract class HeartPlugin extends AbstractPlugin { // LOGGER.info("session:{} 已失效,移除心跳任务", session); return; } - Long lastTime = sessionMap.get(session); - if (lastTime == null) { -// LOGGER.warn("session:{} timeout is null", session); - lastTime = System.currentTimeMillis(); - sessionMap.put(session, lastTime); - } + long lastTime = sessionMap.computeIfAbsent(session, k -> System.currentTimeMillis()); long current = System.currentTimeMillis(); - //超时未收到消息,关闭连接 + // 超时未收到消息,关闭连接 if (timeout > 0 && (current - lastTime) > timeout) { timeoutCallback.callback(session, lastTime); } - //超时未收到消息,尝试发送心跳消息 + // 超时未收到消息,尝试发送心跳消息 else if (current - lastTime > heartRate) { try { sendHeartRequest(session); @@ -168,6 +173,6 @@ public abstract class HeartPlugin extends AbstractPlugin { } public interface TimeoutCallback { - public void callback(AioSession session, long lastTime); + void callback(AioSession session, long lastTime); } }