From 442e9568a898be62065ab3ba2e784b5c668eed76 Mon Sep 17 00:00:00 2001 From: Arogant_95 <842709152@qq.com> Date: Mon, 17 Dec 2018 14:17:17 +0800 Subject: [PATCH 01/12] =?UTF-8?q?=E5=88=A0=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/jmqtt/common/config/RedisConfig.java | 64 ------------------- 1 file changed, 64 deletions(-) delete mode 100644 jmqtt-common/src/main/java/org/jmqtt/common/config/RedisConfig.java diff --git a/jmqtt-common/src/main/java/org/jmqtt/common/config/RedisConfig.java b/jmqtt-common/src/main/java/org/jmqtt/common/config/RedisConfig.java deleted file mode 100644 index 8e41253..0000000 --- a/jmqtt-common/src/main/java/org/jmqtt/common/config/RedisConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.jmqtt.common.config; - -public class RedisConfig { - private String host = "127.0.0.1"; - private Integer port = 6379; - private String password = "123456"; - private Integer maxIdle = 100; - private Integer maxActive = 300; - private Integer maxWait = 1000; - private Integer timeout = 100000; - - public void setPassword(String password) { - this.password = password; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public Integer getPort() { - return port; - } - - public void setPort(Integer port) { - this.port = port; - } - - public Integer getMaxIdle() { - return maxIdle; - } - - public void setMaxIdle(Integer maxIdle) { - this.maxIdle = maxIdle; - } - - public Integer getMaxActive() { - return maxActive; - } - - public void setMaxActive(Integer maxActive) { - this.maxActive = maxActive; - } - - public Integer getMaxWait() { - return maxWait; - } - - public void setMaxWait(Integer maxWait) { - this.maxWait = maxWait; - } - - public Integer getTimeout() { - return timeout; - } - - public void setTimeout(Integer timeout) { - this.timeout = timeout; - } - -} -- Gitee From 33cdf4573162de3b037e5d6bc37671aa9f4e4dc6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 8 Feb 2021 10:39:47 +0000 Subject: [PATCH 02/12] Bump mysql-connector-java from 5.1.47 to 8.0.16 Bumps [mysql-connector-java](https://github.com/mysql/mysql-connector-j) from 5.1.47 to 8.0.16. - [Release notes](https://github.com/mysql/mysql-connector-j/releases) - [Changelog](https://github.com/mysql/mysql-connector-j/blob/release/8.0/CHANGES) - [Commits](https://github.com/mysql/mysql-connector-j/compare/5.1.47...8.0.16) Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0456fc5..4ba369d 100644 --- a/pom.xml +++ b/pom.xml @@ -46,7 +46,7 @@ 2.9.46 3.4.6 1.2.4 - 5.1.47 + 8.0.16 2.13.3 -- Gitee From 55c833eed336423da4373d0a703b59e4d6a533ad Mon Sep 17 00:00:00 2001 From: sfclibby Date: Fri, 12 Mar 2021 22:21:34 +0800 Subject: [PATCH 03/12] Create jmqtt-manager --- jmqtt-manager | 1 + 1 file changed, 1 insertion(+) create mode 100644 jmqtt-manager diff --git a/jmqtt-manager b/jmqtt-manager new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/jmqtt-manager @@ -0,0 +1 @@ + -- Gitee From 49145c4b5e375beeeb4a9aeb22c5c331493f5ff1 Mon Sep 17 00:00:00 2001 From: dyc Date: Fri, 19 Mar 2021 15:46:06 +0800 Subject: [PATCH 04/12] =?UTF-8?q?fix=20bug,=E5=88=A0=E9=99=A4=E5=86=B2?= =?UTF-8?q?=E7=AA=81=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dispatcher/EventConsumeHandler.java | 2 +- .../org/jmqtt/broker/store/rdb/DBUtils.java | 3 +- pom.xml | 35 ------------------- 3 files changed, 2 insertions(+), 38 deletions(-) diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java index 4da33e7..e6ff917 100644 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java @@ -109,7 +109,7 @@ public class EventConsumeHandler { return; } String clientId = event.getBody(); - if (ConnectManager.getInstance().containClient(clientId)){ + if (!ConnectManager.getInstance().containClient(clientId)){ return; } ClientSession clientSession = ConnectManager.getInstance().getClient(clientId); diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/rdb/DBUtils.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/rdb/DBUtils.java index 40a2efd..40bb4d1 100644 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/rdb/DBUtils.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/rdb/DBUtils.java @@ -10,7 +10,6 @@ import org.apache.ibatis.session.SqlSessionFactoryBuilder; import org.apache.ibatis.transaction.TransactionFactory; import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory; import org.jmqtt.broker.common.config.BrokerConfig; -import org.jmqtt.broker.store.highperformance.OutflowMessageHandler; import org.jmqtt.broker.store.rdb.mapper.*; import javax.sql.DataSource; @@ -29,7 +28,7 @@ public class DBUtils { private SqlSessionFactory sqlSessionFactory; - private AtomicBoolean start = new AtomicBoolean(false); + private AtomicBoolean start = new AtomicBoolean(false); public static DBUtils getInstance(){ return dbUtils; diff --git a/pom.xml b/pom.xml index e3499f6..2621e73 100644 --- a/pom.xml +++ b/pom.xml @@ -161,41 +161,6 @@ - - - - org.codehaus.mojo - versions-maven-plugin - 2.7 - - false - - - - org.apache.maven.plugins - maven-shade-plugin - 2.4.1 - - - package - - shade - - - - - org.jmqtt.broker.BrokerStartup - - - - - - - - - - alibaba -- Gitee From 88d47e3f5e3164b3ba75d2dc5221e2a47e1a3a8d Mon Sep 17 00:00:00 2001 From: dyc Date: Mon, 22 Mar 2021 11:05:00 +0800 Subject: [PATCH 05/12] fix bug --- .../jmqtt/broker/processor/dispatcher/EventConsumeHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java index 236037c..47e3be0 100644 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java @@ -104,7 +104,7 @@ public class EventConsumeHandler { return; } String clientId = event.getBody(); - if (ConnectManager.getInstance().containClient(clientId)) { + if (!ConnectManager.getInstance().containClient(clientId)) { return; } ClientSession clientSession = ConnectManager.getInstance().getClient(clientId); -- Gitee From b378790ee6315a4447f23c6974361bc15e035da2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 22 Apr 2021 19:26:04 +0000 Subject: [PATCH 06/12] chore(deps): bump mybatis from 3.4.6 to 3.5.6 Bumps [mybatis](https://github.com/mybatis/mybatis-3) from 3.4.6 to 3.5.6. - [Release notes](https://github.com/mybatis/mybatis-3/releases) - [Commits](https://github.com/mybatis/mybatis-3/compare/mybatis-3.4.6...mybatis-3.5.6) Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 51c962a..5f715b5 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 2.9.0 4.0.3 2.9.46 - 3.4.6 + 3.5.6 1.2.4 8.0.16 2.13.3 -- Gitee From 0234aac6fcac8a666bc1a5f97fb9c6b7de2e6070 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 26 Apr 2021 20:45:24 +0000 Subject: [PATCH 07/12] chore(deps): bump commons-io from 2.6 to 2.7 in /jmqtt-acceptance Bumps commons-io from 2.6 to 2.7. Signed-off-by: dependabot[bot] --- jmqtt-acceptance/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jmqtt-acceptance/pom.xml b/jmqtt-acceptance/pom.xml index 32075cd..9ca06ba 100644 --- a/jmqtt-acceptance/pom.xml +++ b/jmqtt-acceptance/pom.xml @@ -74,7 +74,7 @@ commons-io commons-io - 2.6 + 2.7 com.fasterxml.jackson.dataformat -- Gitee From e8c1d74e65b7d8ee02bae7018b7e1f43fb289356 Mon Sep 17 00:00:00 2001 From: song_jx <1649991905@qq.com> Date: Wed, 21 Jul 2021 18:43:30 +0800 Subject: [PATCH 08/12] =?UTF-8?q?[Bug=E4=BF=AE=E5=A4=8D](master):=20spring?= =?UTF-8?q?boot-starter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复引入jmqtt-springboot-starter并配置yaml后依旧读取properties文件 --- .../starter/service/BrokerStartupService.java | 62 +++++++++++-------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java b/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java index 5d8dfc3..afbb3d8 100644 --- a/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java +++ b/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java @@ -10,11 +10,7 @@ import org.jmqtt.broker.common.config.BrokerConfig; import org.jmqtt.broker.common.config.NettyConfig; import org.jmqtt.broker.common.helper.MixAll; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; +import java.io.*; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -59,26 +55,40 @@ public class BrokerStartupService { } public BrokerController getBrokerController() { - String jmqttConfigPath = - brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "jmqtt.properties"; - initConfig(jmqttConfigPath, brokerConfig, nettyConfig); - try { - LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); - File file = new File(brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "log4j2.xml"); - context.setConfigLocation(file.toURI()); - Configuration configuration = context.getConfiguration(); - Map loggerConfigMap = configuration.getLoggers(); - Level newLevel = Level.getLevel(brokerConfig.getLogLevel()); - if (newLevel == null) { - newLevel = Level.INFO; - } - for (LoggerConfig value : loggerConfigMap.values()) { - value.setLevel(newLevel); - } - context.updateLoggers(configuration); - } catch (Exception ex) { - System.err.print("Log4j2 load error,ex:" + ex); - } - return new BrokerController(brokerConfig, nettyConfig); + if (null == brokerConfig) { + // brokerConfig不为空时加载配置文件 + String jmqttConfigPath = + brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "jmqtt.properties"; + initConfig(jmqttConfigPath, brokerConfig, nettyConfig); + try { + LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); + File file = new File(brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "log4j2.xml"); + context.setConfigLocation(file.toURI()); + Configuration configuration = context.getConfiguration(); + Map loggerConfigMap = configuration.getLoggers(); + Level newLevel = Level.getLevel(brokerConfig.getLogLevel()); + if (newLevel == null) { + newLevel = Level.INFO; + } + for (LoggerConfig value : loggerConfigMap.values()) { + value.setLevel(newLevel); + } + context.updateLoggers(configuration); + } catch (Exception ex) { + System.err.print("Log4j2 load error,ex:" + ex); + } + } + + // 启动服务,线程等 + BrokerController brokerController = new BrokerController(brokerConfig, nettyConfig); + brokerController.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + brokerController.shutdown(); + } + })); + return brokerController; } } -- Gitee From 89cf2434e4ea6a19629ebf646b3ef7959d925c17 Mon Sep 17 00:00:00 2001 From: song_jx <1649991905@qq.com> Date: Wed, 21 Jul 2021 18:50:16 +0800 Subject: [PATCH 09/12] =?UTF-8?q?[=E6=96=B0=E5=A2=9E=E5=8A=9F=E8=83=BD](ma?= =?UTF-8?q?ster):=20redis=E5=93=A8=E5=85=B5=E9=9B=86=E7=BE=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 通过Class.forName反射实例化redis连接方式增加redis哨兵集群 配置属性: # 默认 # redisSupportClass=org.jmqtt.broker.store.redis.support.RedisSupportImpl # 哨兵集群 redisSupportClass=org.jmqtt.broker.store.redis.support.RedisSentinelSupportImpl redisDataBase=0 masterName=master --- .../broker/common/config/BrokerConfig.java | 27 +++++ .../store/redis/support/RedisKeySupport.java | 19 ++-- .../support/RedisSentinelSupportImpl.java | 100 ++++++++++++++++++ .../store/redis/support/RedisSupport.java | 18 +++- .../store/redis/support/RedisSupportImpl.java | 40 +++---- .../store/redis/support/RedisUtils.java | 25 ++++- 6 files changed, 196 insertions(+), 33 deletions(-) create mode 100644 jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSentinelSupportImpl.java diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java index f7347c1..8df95bd 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java @@ -23,11 +23,14 @@ public class BrokerConfig { private String messageStoreClass = "org.jmqtt.broker.store.rdb.RDBMessageStore"; private String authValidClass = "org.jmqtt.broker.acl.impl.DefaultAuthValid"; private String clusterEventHandlerClass = "org.jmqtt.broker.processor.dispatcher.rdb.RDBClusterEventHandler"; + private String redisSupportClass = "org.jmqtt.broker.store.redis.support.RedisSupportImpl"; /* redis相关配置 */ private String redisHost = "127.0.0.1"; private int redisPort = 6379; private String redisPassword = ""; + private int redisDataBase = 0; + private String masterName = "master"; private int maxWaitMills = 60 * 1000; private boolean testOnBorrow = true; private int minIdle = 20; @@ -69,6 +72,22 @@ public class BrokerConfig { this.redisPassword = redisPassword; } + public String getMasterName() { + return masterName; + } + + public void setMasterName(String masterName) { + this.masterName = masterName; + } + + public int getRedisDataBase() { + return redisDataBase; + } + + public void setRedisDataBase(int redisDataBase) { + this.redisDataBase = redisDataBase; + } + public int getMaxWaitMills() { return maxWaitMills; } @@ -217,6 +236,14 @@ public class BrokerConfig { this.clusterEventHandlerClass = clusterEventHandlerClass; } + public String getRedisSupportClass() { + return redisSupportClass; + } + + public void setRedisSupportClass(String redisSupportClass) { + this.redisSupportClass = redisSupportClass; + } + public int getMaxPollEventNum() { return maxPollEventNum; } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java index c6a339b..8537cbb 100644 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java @@ -2,13 +2,14 @@ package org.jmqtt.broker.store.redis.support; public interface RedisKeySupport { - String PREFIX = "JMQTT"; - String SEND_FLOW_MESSAGE = PREFIX + "_SEND_FLOW_"; - String SEND_FLOW_SEC_MESSAGE = PREFIX + "_SEND_FLOW_SEC_"; - String REC_FLOW_MESSAGE = PREFIX + "_REC_FLOW_"; - String OFFLINE = PREFIX + "_OFFLINE_"; - String RETAIN = PREFIX + "_RETAIN"; - String SESSION = PREFIX + "_SESSION_"; - String SUBSCRIPTION = PREFIX + "_SUBSCRIPTION_"; - String WILL = PREFIX + "_WILL_"; + String PREFIX = "JMQTT:"; + String SEND_FLOW_MESSAGE = PREFIX + "SEND_FLOW_"; + String SEND_FLOW_SEC_MESSAGE = PREFIX + "SEND_FLOW_SEC_"; + String REC_FLOW_MESSAGE = PREFIX + "REC_FLOW_"; + String OFFLINE = PREFIX + "OFFLINE_"; + String RETAIN = PREFIX + "RETAIN"; + String SESSION = PREFIX + "SESSION_"; + String SUBSCRIPTION = PREFIX + "SUBSCRIPTION_"; + String WILL = PREFIX + "WILL_"; + } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSentinelSupportImpl.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSentinelSupportImpl.java new file mode 100644 index 0000000..f662f9e --- /dev/null +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSentinelSupportImpl.java @@ -0,0 +1,100 @@ + +package org.jmqtt.broker.store.redis.support; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.jmqtt.broker.common.config.BrokerConfig; +import org.jmqtt.broker.common.log.JmqttLogger; +import org.jmqtt.broker.common.log.LogUtil; +import org.jmqtt.broker.store.redis.RedisCallBack; +import org.slf4j.Logger; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.JedisSentinelPool; + +import java.util.HashSet; +import java.util.Set; + +/** + * redis哨兵支持实现 + * + * @author nn200433 + * @date 2021-07-21 03:27:04 + */ +public class RedisSentinelSupportImpl implements RedisSupport { + + private static final Logger log = JmqttLogger.storeLog; + + private BrokerConfig brokerConfig; + private JedisSentinelPool jedisPool; + + public RedisSentinelSupportImpl(BrokerConfig brokerConfig) { + this.brokerConfig = brokerConfig; + } + + @Override + public void init() { + try { + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + jedisPoolConfig.setMinIdle(brokerConfig.getMinIdle()); + jedisPoolConfig.setMaxTotal(brokerConfig.getMaxTotal()); + jedisPoolConfig.setTestOnBorrow(brokerConfig.isTestOnBorrow()); + jedisPoolConfig.setMaxTotal(jedisPoolConfig.getMaxIdle()); + jedisPoolConfig.setMaxWaitMillis(jedisPoolConfig.getMaxWaitMillis()); + + String[] redisHosts = brokerConfig.getRedisHost().split(","); + if (ArrayUtils.isEmpty(redisHosts)) { + throw new Exception("array [redisHost] is empty"); + } + + Set redisNodes = new HashSet(redisHosts.length); + for (String redisHost : redisHosts) { + String[] ipPort = redisHost.split(":"); + if (ipPort.length != 2) { + throw new Exception("redisHost wrong format, example: 127.0.0.1:6379,127.0.0.1:6380"); + } else { + redisNodes.add(redisHost); + } + } + + if (StringUtils.isEmpty(brokerConfig.getRedisPassword())) { + throw new Exception("redisPassword is empty"); + } + + jedisPool = new JedisSentinelPool(brokerConfig.getMasterName(), redisNodes, + jedisPoolConfig, 10000, brokerConfig.getRedisPassword(), brokerConfig.getRedisDataBase()); + + LogUtil.debug(log, "[Redis handle] JedisSentinelPool init success"); + } catch (Exception ex) { + LogUtil.error(log, "[Redis handle error],ex:{}", ex); + } + } + + @Override + public T operate(RedisCallBack redisCallBack) { + LogUtil.debug(log, "[Cluster] redis operate begin"); + long startTime = System.currentTimeMillis(); + Jedis jedis = null; + try { + jedis = jedisPool.getResource(); + return (T) redisCallBack.operate(jedis); + } catch (Exception ex) { + LogUtil.error(log, "[Cluster] redis operate error,ex:{}", ex); + } finally { + LogUtil.debug(log, "[Cluster] redis operate cost:{}", (System.currentTimeMillis() - startTime)); + if (jedis != null) { + jedis.close(); + } + } + return null; + } + + @Override + public void close() { + LogUtil.info(log, "[Cluster] redis close"); + if (jedisPool != null) { + jedisPool.close(); + } + } + +} diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java index 0246523..7348297 100644 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java @@ -6,11 +6,27 @@ import org.jmqtt.broker.store.redis.RedisCallBack; * redis 支持类,定义主要的redis操作 */ public interface RedisSupport { + /** * 封装基本的redis操作,对{@link RedisCallBack} 暴露了Jedis对象 * @param redisCallBack * @param * @return */ - T operate(RedisCallBack redisCallBack); + public T operate(RedisCallBack redisCallBack); + + /** + * 初始化 + * + * @return + */ + public void init(); + + /** + * 关闭 + * + * @return + */ + public void close(); + } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java index 3a54e17..423981a 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java @@ -3,28 +3,27 @@ package org.jmqtt.broker.store.redis.support; import org.apache.commons.lang3.StringUtils; import org.jmqtt.broker.common.config.BrokerConfig; -import org.jmqtt.broker.store.redis.RedisCallBack; import org.jmqtt.broker.common.log.JmqttLogger; import org.jmqtt.broker.common.log.LogUtil; +import org.jmqtt.broker.store.redis.RedisCallBack; import org.slf4j.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; -public class RedisSupportImpl implements RedisSupport{ +public class RedisSupportImpl implements RedisSupport { private static final Logger log = JmqttLogger.storeLog; private BrokerConfig brokerConfig; - private JedisPool jedisPool; + private JedisPool jedisPool; - public static final String PROJECT = "JMQTT"; - - public RedisSupportImpl(BrokerConfig brokerConfig){ + public RedisSupportImpl(BrokerConfig brokerConfig) { this.brokerConfig = brokerConfig; } - void init(){ + @Override + public void init() { try { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMinIdle(brokerConfig.getMinIdle()); @@ -33,26 +32,30 @@ public class RedisSupportImpl implements RedisSupport{ jedisPoolConfig.setMaxTotal(jedisPoolConfig.getMaxIdle()); jedisPoolConfig.setMaxWaitMillis(jedisPoolConfig.getMaxWaitMillis()); if (StringUtils.isEmpty(brokerConfig.getRedisPassword())) { - jedisPool = new JedisPool(jedisPoolConfig,brokerConfig.getRedisHost(),brokerConfig.getRedisPort()); + jedisPool = new JedisPool(jedisPoolConfig, brokerConfig.getRedisHost(), brokerConfig.getRedisPort()); + jedisPool.getResource().select(brokerConfig.getRedisDataBase()); } else { - jedisPool = new JedisPool(jedisPoolConfig,brokerConfig.getRedisHost(),brokerConfig.getRedisPort(),10000,brokerConfig.getRedisPassword()); + jedisPool = new JedisPool(jedisPoolConfig, brokerConfig.getRedisHost(), brokerConfig.getRedisPort(), + 10000, brokerConfig.getRedisPassword(), brokerConfig.getRedisDataBase()); } + LogUtil.debug(log, "[Redis handle] JedisPool init success"); } catch (Exception ex) { - LogUtil.error(log,"[Redis handle error],ex:{}",ex); + LogUtil.error(log, "[Redis handle error],ex:{}", ex); } } + @Override - public T operate(RedisCallBack redisCallBack){ - LogUtil.debug(log,"[Cluster] redis operate begin"); + public T operate(RedisCallBack redisCallBack) { + LogUtil.debug(log, "[Cluster] redis operate begin"); long startTime = System.currentTimeMillis(); Jedis jedis = null; try { jedis = jedisPool.getResource(); - return (T)redisCallBack.operate(jedis); - }catch (Exception ex) { - LogUtil.error(log,"[Cluster] redis operate error,ex:{}",ex); + return (T) redisCallBack.operate(jedis); + } catch (Exception ex) { + LogUtil.error(log, "[Cluster] redis operate error,ex:{}", ex); } finally { - LogUtil.debug(log,"[Cluster] redis operate cost:{}",(System.currentTimeMillis() - startTime)); + LogUtil.debug(log, "[Cluster] redis operate cost:{}", (System.currentTimeMillis() - startTime)); if (jedis != null) { jedis.close(); } @@ -60,8 +63,9 @@ public class RedisSupportImpl implements RedisSupport{ return null; } - void close(){ - LogUtil.info(log,"[Cluster] redis close"); + @Override + public void close() { + LogUtil.info(log, "[Cluster] redis close"); if (jedisPool != null) { jedisPool.close(); } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java index d2edfc5..dfd348d 100644 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java @@ -1,16 +1,23 @@ package org.jmqtt.broker.store.redis.support; import org.jmqtt.broker.common.config.BrokerConfig; -import org.jmqtt.broker.store.redis.RedisCallBack; +import org.jmqtt.broker.common.log.JmqttLogger; +import org.jmqtt.broker.common.log.LogUtil; +import org.slf4j.Logger; +import java.lang.reflect.Constructor; import java.util.concurrent.atomic.AtomicBoolean; /** * redis 访问模板方法类 */ public class RedisUtils { + + private static final Logger log = JmqttLogger.otherLog; + private static final RedisUtils redisUtils = new RedisUtils(); - private volatile RedisSupportImpl redisSupport; + + private volatile RedisSupport redisSupport; private AtomicBoolean start = new AtomicBoolean(false); @@ -21,15 +28,23 @@ public class RedisUtils { } public RedisSupport createSupport(BrokerConfig brokerConfig) { - if (start.compareAndSet(false,true)) { - this.redisSupport = new RedisSupportImpl(brokerConfig); + if (start.compareAndSet(false, true)) { + // this.redisSupport = new RedisSupportImpl(brokerConfig); + try { + // 使用配置反射实例化redis + Class redisSupportClass = Class.forName(brokerConfig.getRedisSupportClass()); + Constructor redisSupportConstructor = redisSupportClass.getConstructor(BrokerConfig.class); + this.redisSupport = (RedisSupport) redisSupportConstructor.newInstance(brokerConfig); + } catch (Exception e) { + LogUtil.error(log, "init redis error, ex:{}", e); + } this.redisSupport.init(); } return redisSupport; } public void close() { - RedisSupportImpl redisSupport = this.redisSupport; + RedisSupport redisSupport = this.redisSupport; if(start.compareAndSet(true,false)&&redisSupport!=null){ redisSupport.close(); } -- Gitee From 32285abc60dc2b3d272545eb19a8dd1b16cf589e Mon Sep 17 00:00:00 2001 From: song_jx <1649991905@qq.com> Date: Wed, 21 Jul 2021 18:52:19 +0800 Subject: [PATCH 10/12] =?UTF-8?q?[=E5=85=B6=E4=BB=96=E6=8F=90=E4=BA=A4](ma?= =?UTF-8?q?ster):=20=E8=A1=A5=E5=85=85debug=E6=97=A5=E5=BF=97=E8=BE=93?= =?UTF-8?q?=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit messageTraceLog增加可读性json message展示 --- .../dispatcher/DefaultDispatcherInnerMessage.java | 8 +++++++- .../jmqtt/broker/processor/protocol/PublishProcessor.java | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java index 2c4df44..c39c76a 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java @@ -13,10 +13,10 @@ import org.jmqtt.broker.processor.HighPerformanceMessageHandler; import org.jmqtt.broker.remoting.session.ClientSession; import org.jmqtt.broker.remoting.session.ConnectManager; import org.jmqtt.broker.remoting.util.MessageUtil; +import org.jmqtt.broker.remoting.util.RemotingHelper; import org.jmqtt.broker.store.SessionStore; import org.jmqtt.broker.subscribe.SubscriptionMatcher; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; @@ -130,6 +130,12 @@ public class DefaultDispatcherInnerMessage extends HighPerformanceMessageHandler } MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false); clientSession.getCtx().writeAndFlush(publishMessage); + + // 消息解码后的字符串 + String messageDecodeStr = new String(message.getPayload(), "UTF-8"); + LogUtil.debug(log, "[Broker Dispatcher Message] -> receiveClientIp={}, clientId={}, subTopic={}, qos={}, message={}", + RemotingHelper.getRemoteAddr(clientSession.getCtx().channel()), clientId, subscription.getTopic(), + qos, messageDecodeStr.replace("\n", "")); } else { sessionStore.storeOfflineMsg(clientId, message); } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java index e8c3545..5128e3b 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java @@ -17,8 +17,8 @@ import org.jmqtt.broker.remoting.session.ClientSession; import org.jmqtt.broker.remoting.session.ConnectManager; import org.jmqtt.broker.remoting.util.MessageUtil; import org.jmqtt.broker.remoting.util.NettyUtil; +import org.jmqtt.broker.remoting.util.RemotingHelper; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -74,6 +74,12 @@ public class PublishProcessor extends AbstractMessageProcessor implements Reques default: LogUtil.warn(log,"[PubMessage] -> Wrong mqtt message,clientId={}", clientId); } + + // 消息解码后的字符串 + String messageDecodeStr = new String(innerMsg.getPayload(), "UTF-8"); + LogUtil.debug(log, "[Client Publish Message] -> sendClientIp={}, clientId={}, topic={}, qos={}, message={}", + RemotingHelper.getRemoteAddr(ctx.channel()), clientId, topic, qos, + messageDecodeStr.replace("\n", "")); } catch (Throwable tr) { LogUtil.warn(log,"[PubMessage] -> Solve mqtt pub message exception:{}", tr.getMessage()); } finally { -- Gitee From 08b66d40dd1ffd1140eb654772007f9c70ec8399 Mon Sep 17 00:00:00 2001 From: song_jx <1649991905@qq.com> Date: Mon, 26 Jul 2021 17:21:12 +0800 Subject: [PATCH 11/12] =?UTF-8?q?[=E5=85=B6=E4=BB=96=E6=8F=90=E4=BA=A4](ma?= =?UTF-8?q?ster):=20=E8=A1=A5=E5=85=85debug=E6=97=A5=E5=BF=97=E8=BE=93?= =?UTF-8?q?=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/jmqtt/broker/remoting/netty/NettyRemotingServer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java index 677a925..25d7015 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java @@ -28,6 +28,7 @@ import org.jmqtt.broker.processor.RequestProcessor; import org.jmqtt.broker.remoting.RemotingService; import org.jmqtt.broker.remoting.netty.codec.ByteBuf2WebSocketEncoder; import org.jmqtt.broker.remoting.netty.codec.WebSocket2ByteBufDecoder; +import org.jmqtt.broker.remoting.util.RemotingHelper; import org.slf4j.Logger; import java.util.HashMap; @@ -205,7 +206,8 @@ public class NettyRemotingServer implements RemotingService { MqttMessage mqttMessage = (MqttMessage) obj; if (mqttMessage != null && mqttMessage.decoderResult().isSuccess()) { MqttMessageType messageType = mqttMessage.fixedHeader().messageType(); - LogUtil.debug(log,"[Remoting] -> receive mqtt code,type:{},name:{}", messageType.value(), messageType.name()); + LogUtil.debug(log,"[Remoting] -> receive 『{}』 mqtt code,type:{},name:{}", + RemotingHelper.getRemoteAddr(ctx.channel()), messageType.value(), messageType.name()); Runnable runnable = () -> processorTable.get(messageType).getObject1().processRequest(ctx, mqttMessage); try { processorTable.get(messageType).getObject2().submit(runnable); -- Gitee From a848df606e641ef9731cd39334826be84f8f1f1d Mon Sep 17 00:00:00 2001 From: song_jx <1649991905@qq.com> Date: Thu, 29 Jul 2021 19:08:49 +0800 Subject: [PATCH 12/12] =?UTF-8?q?[=E6=96=B0=E5=A2=9E=E5=8A=9F=E8=83=BD](ma?= =?UTF-8?q?ster):=20=E8=A1=A5=E5=85=85=E4=B8=80=E4=BA=9B=E5=B0=8F=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 心跳检测,超时一段时间清理会话 2. 补充clientsession属性 --- .../org/jmqtt/broker/BrokerController.java | 3 +- .../client/ClientLifeCycleHookService.java | 45 ++++++++++++-- .../broker/common/config/NettyConfig.java | 13 ++++ .../processor/protocol/ConnectProcessor.java | 7 ++- .../remoting/netty/NettyConnectHandler.java | 62 +++++++++++++++---- .../remoting/netty/NettyRemotingServer.java | 6 +- .../remoting/session/ClientSession.java | 32 ++++++++++ .../remoting/session/ConnectManager.java | 28 +++++++++ 8 files changed, 171 insertions(+), 25 deletions(-) diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java index 613ab19..b916f6f 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java @@ -102,7 +102,8 @@ public class BrokerController { this.innerMessageDispatcher = new DefaultDispatcherInnerMessage(this); this.eventConsumeHandler = new EventConsumeHandler(this); - this.channelEventListener = new ClientLifeCycleHookService(messageStore, innerMessageDispatcher); + this.channelEventListener = new ClientLifeCycleHookService(sessionStore, messageStore, subscriptionMatcher, + innerMessageDispatcher); this.remotingServer = new NettyRemotingServer(brokerConfig, nettyConfig, channelEventListener); this.reSendMessageService = new ReSendMessageService(this); diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java index 3ab7f2f..bbd234b 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java @@ -5,21 +5,33 @@ import org.apache.commons.lang3.StringUtils; import org.jmqtt.broker.common.log.JmqttLogger; import org.jmqtt.broker.common.log.LogUtil; import org.jmqtt.broker.common.model.Message; +import org.jmqtt.broker.common.model.Subscription; import org.jmqtt.broker.processor.dispatcher.InnerMessageDispatcher; import org.jmqtt.broker.remoting.netty.ChannelEventListener; +import org.jmqtt.broker.remoting.session.ClientSession; import org.jmqtt.broker.remoting.session.ConnectManager; import org.jmqtt.broker.remoting.util.NettyUtil; import org.jmqtt.broker.store.MessageStore; +import org.jmqtt.broker.store.SessionState; +import org.jmqtt.broker.store.SessionStore; +import org.jmqtt.broker.subscribe.SubscriptionMatcher; import org.slf4j.Logger; +import java.util.Set; + public class ClientLifeCycleHookService implements ChannelEventListener { - private static final Logger log = JmqttLogger.clientTraceLog; - private MessageStore messageStore; - private InnerMessageDispatcher innerMessageDispatcher; + private static final Logger log = JmqttLogger.clientTraceLog; + private MessageStore messageStore; + private SessionStore sessionStore; + private SubscriptionMatcher subscriptionMatcher; + private InnerMessageDispatcher innerMessageDispatcher; - public ClientLifeCycleHookService(MessageStore messageStore, InnerMessageDispatcher innerMessageDispatcher) { + public ClientLifeCycleHookService(SessionStore sessionStore, MessageStore messageStore, + SubscriptionMatcher subscriptionMatcher, InnerMessageDispatcher innerMessageDispatcher) { + this.sessionStore = sessionStore; this.messageStore = messageStore; + this.subscriptionMatcher = subscriptionMatcher; this.innerMessageDispatcher = innerMessageDispatcher; } @@ -40,6 +52,17 @@ public class ClientLifeCycleHookService implements ChannelEventListener { @Override public void onChannelIdle(String remoteAddr, Channel channel) { + String clientId = NettyUtil.getClientId(channel); + if (StringUtils.isNotEmpty(clientId)) { + // 移除用户 + ConnectManager connectManager = ConnectManager.getInstance(); + ClientSession client = connectManager.getClient(clientId); + if (null != client) { + LogUtil.info(log, "[ClientLifeCycleHook] -> 心跳获取超时 『{}』 将被清理....", clientId); + clearSession(client); + connectManager.removeClient(clientId); + } + } } @Override @@ -48,4 +71,18 @@ public class ClientLifeCycleHookService implements ChannelEventListener { ConnectManager.getInstance().removeClient(clientId); LogUtil.warn(log, "[ClientLifeCycleHook] -> {} channelException,close channel and remove ConnectCache!", clientId); } + + private void clearSession(ClientSession clientSession) { + if (clientSession.isCleanStart()) { + Set subscriptions = sessionStore.getSubscriptions(clientSession.getClientId()); + for (Subscription subscription : subscriptions) { + subscriptionMatcher.unSubscribe(subscription.getTopic(), clientSession.getClientId()); + } + sessionStore.clearSession(clientSession.getClientId(), false); + } else { + SessionState sessionState = new SessionState(SessionState.StateEnum.OFFLINE, System.currentTimeMillis()); + sessionStore.storeSession(clientSession.getClientId(), sessionState); + } + } + } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java index f2f350f..7ef7646 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java @@ -53,6 +53,11 @@ public class NettyConfig { */ private int maxMsgSize = 512 * 1024; + /** + * 最大丢失连接时间,即10分钟未操作关闭连接 + */ + private long maxLossConnectTime = 10; + public int getTcpBackLog() { return tcpBackLog; } @@ -246,4 +251,12 @@ public class NettyConfig { public void setHttpPort(int httpPort) { this.httpPort = httpPort; } + + public long getMaxLossConnectTime() { + return maxLossConnectTime; + } + + public void setMaxLossConnectTime(long maxLossConnectTime) { + this.maxLossConnectTime = maxLossConnectTime; + } } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java index 03c9fcd..34671ae 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java @@ -30,6 +30,7 @@ import org.jmqtt.broker.store.SessionStore; import org.jmqtt.broker.subscribe.SubscriptionMatcher; import org.slf4j.Logger; +import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -97,7 +98,7 @@ public class ConnectProcessor implements RequestProcessor { } } if (sessionState.getState() == SessionState.StateEnum.NULL) { - clientSession = new ClientSession(clientId, false, ctx); + clientSession = new ClientSession(clientId, false, new Date(),ctx); sessionPresent = false; notifyClearOtherSession = false; } else { @@ -179,7 +180,7 @@ public class ConnectProcessor implements RequestProcessor { } private ClientSession createNewClientSession(String clientId, ChannelHandlerContext ctx) { - ClientSession clientSession = new ClientSession(clientId, true); + ClientSession clientSession = new ClientSession(clientId, true, new Date()); clientSession.setCtx(ctx); //clear previous sessions this.sessionStore.clearSession(clientId,true); @@ -190,7 +191,7 @@ public class ConnectProcessor implements RequestProcessor { * cleanStart is false, reload client session */ private ClientSession reloadClientSession(ChannelHandlerContext ctx, String clientId) { - ClientSession clientSession = new ClientSession(clientId, false); + ClientSession clientSession = new ClientSession(clientId, false, new Date()); clientSession.setCtx(ctx); Set subscriptions = sessionStore.getSubscriptions(clientId); for (Subscription subscription : subscriptions) { diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java index a3251bb..97c6ca3 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java @@ -4,46 +4,82 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import org.apache.commons.lang3.StringUtils; +import org.jmqtt.broker.common.config.NettyConfig; import org.jmqtt.broker.common.log.JmqttLogger; import org.jmqtt.broker.common.log.LogUtil; +import org.jmqtt.broker.remoting.session.ClientSession; +import org.jmqtt.broker.remoting.session.ConnectManager; +import org.jmqtt.broker.remoting.util.NettyUtil; import org.jmqtt.broker.remoting.util.RemotingHelper; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.util.Date; public class NettyConnectHandler extends ChannelDuplexHandler { private static final Logger log = JmqttLogger.remotingLog; private NettyEventExecutor eventExecutor; + private NettyConfig nettyConfig; - public NettyConnectHandler(NettyEventExecutor nettyEventExecutor){ + public NettyConnectHandler(NettyEventExecutor nettyEventExecutor, NettyConfig nettyConfig){ this.eventExecutor = nettyEventExecutor; + this.nettyConfig = nettyConfig; } @Override public void channelActive(ChannelHandlerContext ctx){ final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel()); - LogUtil.debug(log,"[ChannelActive] -> addr = {}",remoteAddr); - this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr,NettyEventType.CONNECT,ctx.channel())); + LogUtil.debug(log, "[ChannelActive] -> 通道连接... addr = {}", remoteAddr); + this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr, NettyEventType.CONNECT, ctx.channel())); } @Override public void channelInactive(ChannelHandlerContext ctx){ final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel()); - LogUtil.debug(log,"[ChannelInactive] -> addr = {}",remoteAddr); - this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr,NettyEventType.CLOSE,ctx.channel())); + LogUtil.debug(log, "[ChannelInactive] -> 通道关闭... addr = {}", remoteAddr); + this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr, NettyEventType.CLOSE, ctx.channel())); } @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt){ - if(evt instanceof IdleStateEvent){ + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + String clientId = NettyUtil.getClientId(ctx.channel()); + if (StringUtils.isEmpty(clientId)) { + LogUtil.warn(log, "[HEART_BEAT] -> 根据通道未查到客户端id, 无法判断心跳..."); + return; + } + ClientSession client = ConnectManager.getInstance().getClient(clientId); + long lossConnectTime = client.getLossConnectTime(); + + if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; - if(event.state().equals(IdleState.READER_IDLE)){ - final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel()); - LogUtil.warn(log,"[HEART_BEAT] -> IDLE exception, addr = {}",remoteAddr); - RemotingHelper.closeChannel(ctx.channel()); - this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr,NettyEventType.IDLE,ctx.channel())); + if (event.state().equals(IdleState.READER_IDLE)) { + // 设备离线,更改设备状态,增加离线操作日志 + lossConnectTime++; + client.setLossConnectTime(lossConnectTime); + LogUtil.info(log, "[HEART_BEAT] -> 客户端 『{}』 离线 {} 分钟...", client.getClientId(), lossConnectTime); + if (lossConnectTime >= nettyConfig.getMaxLossConnectTime()) { + // 客户端断连10分钟 + final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel()); + LogUtil.warn(log, "[HEART_BEAT] -> 『{}』 心跳异常,服务器将主动关闭 『{}』 链路,原因:{}s 没收到消息了...", + remoteAddr, client.getClientId(), (lossConnectTime * 60)); + RemotingHelper.closeChannel(ctx.channel()); + this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr, NettyEventType.IDLE, ctx.channel())); + } + } else { + //复位 + LogUtil.info(log, "[HEART_BEAT] -> 客户端 『{}』 恢复连接...", client.getClientId(), lossConnectTime); + client.setLossConnectTime(0); + client.setInitConnectDate(new Date()); + super.userEventTriggered(ctx, evt); } + } else { + //复位 + LogUtil.info(log, "[HEART_BEAT] -> 客户端 『{}』 恢复连接...", client.getClientId(), lossConnectTime); + client.setLossConnectTime(0); + client.setInitConnectDate(new Date()); + super.userEventTriggered(ctx, evt); } } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java index 25d7015..48e7323 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java @@ -126,8 +126,7 @@ public class NettyRemotingServer implements RemotingService { .addLast("byteBuf2WebSocketEncoder", new ByteBuf2WebSocketEncoder()) .addLast("mqttDecoder", new MqttDecoder(nettyConfig.getMaxMsgSize())) .addLast("mqttEncoder", MqttEncoder.INSTANCE) - .addLast("nettyConnectionManager", new NettyConnectHandler( - nettyEventExecutor)) + .addLast("nettyConnectionManager", new NettyConnectHandler(nettyEventExecutor, nettyConfig)) .addLast("nettyMqttHandler", new NettyMqttHandler()); } }); @@ -169,8 +168,7 @@ public class NettyRemotingServer implements RemotingService { pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0)) .addLast("mqttEncoder", MqttEncoder.INSTANCE) .addLast("mqttDecoder", new MqttDecoder(nettyConfig.getMaxMsgSize())) - .addLast("nettyConnectionManager", new NettyConnectHandler( - nettyEventExecutor)) + .addLast("nettyConnectionManager", new NettyConnectHandler(nettyEventExecutor, nettyConfig)) .addLast("nettyMqttHandler", new NettyMqttHandler()); } }); diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java index 8d3859c..12f503d 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java @@ -2,6 +2,7 @@ package org.jmqtt.broker.remoting.session; import io.netty.channel.ChannelHandlerContext; +import java.util.Date; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; @@ -16,6 +17,8 @@ public class ClientSession { private String clientId; private boolean cleanStart; private transient ChannelHandlerContext ctx; + private long lossConnectTime = 0; + private Date initConnectDate; private transient AtomicInteger messageIdCounter = new AtomicInteger(1); @@ -26,12 +29,25 @@ public class ClientSession { this.cleanStart = cleanStart; } + public ClientSession(String clientId, boolean cleanStart, Date initConnectDate) { + this.clientId = clientId; + this.cleanStart = cleanStart; + this.initConnectDate = initConnectDate; + } + public ClientSession(String clientId, boolean cleanStart, ChannelHandlerContext ctx) { this.clientId = clientId; this.cleanStart = cleanStart; this.ctx = ctx; } + public ClientSession(String clientId, boolean cleanStart, Date initConnectDate, ChannelHandlerContext ctx) { + this.clientId = clientId; + this.cleanStart = cleanStart; + this.initConnectDate = initConnectDate; + this.ctx = ctx; + } + public String getClientId() { return clientId; } @@ -56,6 +72,14 @@ public class ClientSession { this.ctx = ctx; } + public long getLossConnectTime() { + return lossConnectTime; + } + + public void setLossConnectTime(long lossConnectTime) { + this.lossConnectTime = lossConnectTime; + } + public int generateMessageId() { int messageId = messageIdCounter.getAndIncrement(); messageId = Math.abs(messageId % 0xFFFF); @@ -65,6 +89,14 @@ public class ClientSession { return messageId; } + public Date getInitConnectDate() { + return initConnectDate; + } + + public void setInitConnectDate(Date initConnectDate) { + this.initConnectDate = initConnectDate; + } + @Override public boolean equals(Object o) { if (this == o) { return true; } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java index d6eb0e9..ee3933b 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java @@ -1,5 +1,11 @@ package org.jmqtt.broker.remoting.session; +import com.alibaba.fastjson.JSONObject; +import io.netty.channel.ChannelHandlerContext; +import org.jmqtt.broker.remoting.util.RemotingHelper; + +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -36,4 +42,26 @@ public class ConnectManager { } return null; } + + /** + * 返回客户端列表 + * + * @return @return {@link List } + */ + public List listClients() { + List resultList = new ArrayList(); + clientCache.forEach((k, v) -> { + ChannelHandlerContext ctx = v.getCtx(); + resultList.add( + new JSONObject() {{ + put("clientId", k); + put("clientIp", RemotingHelper.getRemoteAddr(ctx.channel())); + put("initConnectDate", v.getInitConnectDate()); + put("lossConnectTime", v.getLossConnectTime()); + }} + ); + }); + return resultList; + } + } -- Gitee