From b484ceafaea09232a5a341e51e4ebddc39388387 Mon Sep 17 00:00:00 2001 From: Tamako <1253594122@qq.com> Date: Thu, 24 Jul 2025 09:30:39 +0800 Subject: [PATCH 1/3] =?UTF-8?q?refactor(aio-pro):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=BF=83=E8=B7=B3=E6=8F=92=E4=BB=B6=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 重构 HeartPlugin 类,优化心跳检测逻辑 - 使用 ConcurrentHashMap 替代 HashMap 提高线程安全 -采用 AtomicLong 替代 Long保证并发操作的原子性 - 优化代码结构,提高可读性和可维护性 --- .../socket/extension/plugins/HeartPlugin.java | 74 +++++++++++-------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java b/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java index 7a3b0f5b..dfb93865 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java +++ b/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java @@ -14,10 +14,11 @@ import org.smartboot.socket.timer.HashedWheelTimer; import org.smartboot.socket.transport.AioSession; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * 心跳插件 @@ -26,22 +27,23 @@ import java.util.concurrent.TimeUnit; * @version V1.0 , 2018/8/19 */ public abstract class HeartPlugin extends AbstractPlugin { - private static final TimeoutCallback DEFAULT_TIMEOUT_CALLBACK = new TimeoutCallback() { - @Override - public void callback(AioSession session, long lastTime) { - session.close(true); - } - }; - private Map sessionMap = new HashMap<>(); + /** + * 默认超时回调 + */ + private static final TimeoutCallback DEFAULT_TIMEOUT_CALLBACK = (session, lastTime) -> session.close(true); + private final Map sessionMap = new ConcurrentHashMap<>(); /** * 心跳频率 */ - private long heartRate; + private final long heartRate; /** * 在超时时间内未收到消息,关闭连接。 */ - private long timeout; - private TimeoutCallback timeoutCallback; + private final long timeout; + /** + * 超时回调 + */ + private final TimeoutCallback timeoutCallback; /** * 心跳插件 @@ -78,7 +80,7 @@ public abstract class HeartPlugin extends AbstractPlugin { */ public HeartPlugin(int heartRate, int timeout, TimeUnit timeUnit, TimeoutCallback timeoutCallback) { if (timeout > 0 && heartRate >= timeout) { - throw new IllegalArgumentException("heartRate must little then timeout"); + throw new IllegalArgumentException("heartRate must little than timeout"); } this.heartRate = timeUnit.toMillis(heartRate); this.timeout = timeUnit.toMillis(timeout); @@ -87,21 +89,27 @@ public abstract class HeartPlugin extends AbstractPlugin { @Override public final boolean preProcess(AioSession session, T t) { - sessionMap.put(session, System.currentTimeMillis()); - //是否心跳响应消息 - return !isHeartMessage(session, t); + boolean heartMessage = isHeartMessage(session, t); + if (heartMessage) { + sessionMap.computeIfPresent(session, (k, v) -> { + v.set(System.currentTimeMillis()); + return v; + }); + } + // 是否心跳响应消息 + return !heartMessage; } @Override public final void stateEvent(StateMachineEnum stateMachineEnum, AioSession session, Throwable throwable) { switch (stateMachineEnum) { case NEW_SESSION: - sessionMap.put(session, System.currentTimeMillis()); + sessionMap.put(session, new AtomicLong(System.currentTimeMillis())); + // 注册心跳监测 registerHeart(session, heartRate); - //注册心跳监测 break; case SESSION_CLOSED: - //移除心跳监测 + // 移除心跳监测 sessionMap.remove(session); break; default: @@ -112,8 +120,8 @@ public abstract class HeartPlugin extends AbstractPlugin { /** * 自定义心跳消息并发送 * - * @param session - * @throws IOException + * @param session 当前会话 + * @throws IOException 抛出异常 */ public abstract void sendHeartRequest(AioSession session) throws IOException; @@ -121,12 +129,18 @@ public abstract class HeartPlugin extends AbstractPlugin { * 判断当前收到的消息是否为心跳消息。 * 心跳请求消息与响应消息可能相同,也可能不同,因实际场景而异,故接口定义不做区分。 * - * @param session - * @param msg - * @return + * @param session 当前会话 + * @param msg 接收到的消息 + * @return true:是心跳消息,false:非心跳消息 */ public abstract boolean isHeartMessage(AioSession session, T msg); + /** + * 注册心跳任务 + * + * @param session 当前会话 + * @param heartRate 心跳频率 + */ private void registerHeart(final AioSession session, final long heartRate) { if (heartRate <= 0) { // LOGGER.info("session:{} 因心跳间隔为:{},终止启动心跳监测任务", session, heartRate); @@ -141,18 +155,14 @@ public abstract class HeartPlugin extends AbstractPlugin { // LOGGER.info("session:{} 已失效,移除心跳任务", session); return; } - Long lastTime = sessionMap.get(session); - if (lastTime == null) { -// LOGGER.warn("session:{} timeout is null", session); - lastTime = System.currentTimeMillis(); - sessionMap.put(session, lastTime); - } + AtomicLong atomicLong = sessionMap.computeIfAbsent(session, k -> new AtomicLong(System.currentTimeMillis())); + long lastTime = atomicLong.get(); long current = System.currentTimeMillis(); - //超时未收到消息,关闭连接 + // 超时未收到消息,关闭连接 if (timeout > 0 && (current - lastTime) > timeout) { timeoutCallback.callback(session, lastTime); } - //超时未收到消息,尝试发送心跳消息 + // 超时未收到消息,尝试发送心跳消息 else if (current - lastTime > heartRate) { try { sendHeartRequest(session); @@ -168,6 +178,6 @@ public abstract class HeartPlugin extends AbstractPlugin { } public interface TimeoutCallback { - public void callback(AioSession session, long lastTime); + void callback(AioSession session, long lastTime); } } -- Gitee From d2c0100fc43cbd812d3223431ede6a1769047248 Mon Sep 17 00:00:00 2001 From: Tamako <1253594122@qq.com> Date: Thu, 24 Jul 2025 09:43:38 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9preProcess=E6=96=B9?= =?UTF-8?q?=E6=B3=95=E4=B8=AD=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/extension/plugins/HeartPlugin.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java b/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java index dfb93865..ae8bcc29 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java +++ b/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java @@ -28,9 +28,12 @@ import java.util.concurrent.atomic.AtomicLong; */ public abstract class HeartPlugin extends AbstractPlugin { /** - * 默认超时回调 + * 默认超时回调方法 */ private static final TimeoutCallback DEFAULT_TIMEOUT_CALLBACK = (session, lastTime) -> session.close(true); + /** + * 每个会话接收到最后一条消息的时间 + */ private final Map sessionMap = new ConcurrentHashMap<>(); /** * 心跳频率 @@ -89,15 +92,12 @@ public abstract class HeartPlugin extends AbstractPlugin { @Override public final boolean preProcess(AioSession session, T t) { - boolean heartMessage = isHeartMessage(session, t); - if (heartMessage) { - sessionMap.computeIfPresent(session, (k, v) -> { - v.set(System.currentTimeMillis()); - return v; - }); - } + sessionMap.computeIfPresent(session, (k, v) -> { + v.set(System.currentTimeMillis()); + return v; + }); // 是否心跳响应消息 - return !heartMessage; + return !isHeartMessage(session, t); } @Override -- Gitee From 283ae4731af8d2fa93ab9ba01fab024432d92a55 Mon Sep 17 00:00:00 2001 From: Tamako <1253594122@qq.com> Date: Thu, 24 Jul 2025 11:31:49 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=88=A0=E9=99=A4AtomicLong?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../socket/extension/plugins/HeartPlugin.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java b/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java index ae8bcc29..60d16f20 100644 --- a/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java +++ b/aio-pro/src/main/java/org/smartboot/socket/extension/plugins/HeartPlugin.java @@ -18,7 +18,6 @@ import java.util.Map; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; /** * 心跳插件 @@ -34,7 +33,7 @@ public abstract class HeartPlugin extends AbstractPlugin { /** * 每个会话接收到最后一条消息的时间 */ - private final Map sessionMap = new ConcurrentHashMap<>(); + private final Map sessionMap = new ConcurrentHashMap<>(); /** * 心跳频率 */ @@ -92,10 +91,7 @@ public abstract class HeartPlugin extends AbstractPlugin { @Override public final boolean preProcess(AioSession session, T t) { - sessionMap.computeIfPresent(session, (k, v) -> { - v.set(System.currentTimeMillis()); - return v; - }); + sessionMap.computeIfPresent(session, (k, v) -> System.currentTimeMillis()); // 是否心跳响应消息 return !isHeartMessage(session, t); } @@ -104,7 +100,7 @@ public abstract class HeartPlugin extends AbstractPlugin { public final void stateEvent(StateMachineEnum stateMachineEnum, AioSession session, Throwable throwable) { switch (stateMachineEnum) { case NEW_SESSION: - sessionMap.put(session, new AtomicLong(System.currentTimeMillis())); + sessionMap.put(session, System.currentTimeMillis()); // 注册心跳监测 registerHeart(session, heartRate); break; @@ -155,8 +151,7 @@ public abstract class HeartPlugin extends AbstractPlugin { // LOGGER.info("session:{} 已失效,移除心跳任务", session); return; } - AtomicLong atomicLong = sessionMap.computeIfAbsent(session, k -> new AtomicLong(System.currentTimeMillis())); - long lastTime = atomicLong.get(); + long lastTime = sessionMap.computeIfAbsent(session, k -> System.currentTimeMillis()); long current = System.currentTimeMillis(); // 超时未收到消息,关闭连接 if (timeout > 0 && (current - lastTime) > timeout) { -- Gitee