diff --git a/jmqtt-acceptance/pom.xml b/jmqtt-acceptance/pom.xml
index 32075cd15a133cacfc2c83ea0449b6cbc3939406..9ca06ba067beb0d90301bd0259d5c9bafab2f931 100644
--- a/jmqtt-acceptance/pom.xml
+++ b/jmqtt-acceptance/pom.xml
@@ -74,7 +74,7 @@
commons-io
commons-io
- 2.6
+ 2.7
com.fasterxml.jackson.dataformat
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java
index 613ab1938df47fee43360da8ab445856b26598c2..b916f6ffb4bd0ed03fcc87aaff5ea392ddf58381 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java
@@ -102,7 +102,8 @@ public class BrokerController {
this.innerMessageDispatcher = new DefaultDispatcherInnerMessage(this);
this.eventConsumeHandler = new EventConsumeHandler(this);
- this.channelEventListener = new ClientLifeCycleHookService(messageStore, innerMessageDispatcher);
+ this.channelEventListener = new ClientLifeCycleHookService(sessionStore, messageStore, subscriptionMatcher,
+ innerMessageDispatcher);
this.remotingServer = new NettyRemotingServer(brokerConfig, nettyConfig, channelEventListener);
this.reSendMessageService = new ReSendMessageService(this);
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java
index 3ab7f2fd033431a11c17219df85dcc0e5e907144..bbd234b4e29d9c2dc9ae00a9b3b03de653e7c5a7 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java
@@ -5,21 +5,33 @@ import org.apache.commons.lang3.StringUtils;
import org.jmqtt.broker.common.log.JmqttLogger;
import org.jmqtt.broker.common.log.LogUtil;
import org.jmqtt.broker.common.model.Message;
+import org.jmqtt.broker.common.model.Subscription;
import org.jmqtt.broker.processor.dispatcher.InnerMessageDispatcher;
import org.jmqtt.broker.remoting.netty.ChannelEventListener;
+import org.jmqtt.broker.remoting.session.ClientSession;
import org.jmqtt.broker.remoting.session.ConnectManager;
import org.jmqtt.broker.remoting.util.NettyUtil;
import org.jmqtt.broker.store.MessageStore;
+import org.jmqtt.broker.store.SessionState;
+import org.jmqtt.broker.store.SessionStore;
+import org.jmqtt.broker.subscribe.SubscriptionMatcher;
import org.slf4j.Logger;
+import java.util.Set;
+
public class ClientLifeCycleHookService implements ChannelEventListener {
- private static final Logger log = JmqttLogger.clientTraceLog;
- private MessageStore messageStore;
- private InnerMessageDispatcher innerMessageDispatcher;
+ private static final Logger log = JmqttLogger.clientTraceLog;
+ private MessageStore messageStore;
+ private SessionStore sessionStore;
+ private SubscriptionMatcher subscriptionMatcher;
+ private InnerMessageDispatcher innerMessageDispatcher;
- public ClientLifeCycleHookService(MessageStore messageStore, InnerMessageDispatcher innerMessageDispatcher) {
+ public ClientLifeCycleHookService(SessionStore sessionStore, MessageStore messageStore,
+ SubscriptionMatcher subscriptionMatcher, InnerMessageDispatcher innerMessageDispatcher) {
+ this.sessionStore = sessionStore;
this.messageStore = messageStore;
+ this.subscriptionMatcher = subscriptionMatcher;
this.innerMessageDispatcher = innerMessageDispatcher;
}
@@ -40,6 +52,17 @@ public class ClientLifeCycleHookService implements ChannelEventListener {
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
+ String clientId = NettyUtil.getClientId(channel);
+ if (StringUtils.isNotEmpty(clientId)) {
+ // 移除用户
+ ConnectManager connectManager = ConnectManager.getInstance();
+ ClientSession client = connectManager.getClient(clientId);
+ if (null != client) {
+ LogUtil.info(log, "[ClientLifeCycleHook] -> 心跳获取超时 『{}』 将被清理....", clientId);
+ clearSession(client);
+ connectManager.removeClient(clientId);
+ }
+ }
}
@Override
@@ -48,4 +71,18 @@ public class ClientLifeCycleHookService implements ChannelEventListener {
ConnectManager.getInstance().removeClient(clientId);
LogUtil.warn(log, "[ClientLifeCycleHook] -> {} channelException,close channel and remove ConnectCache!", clientId);
}
+
+ private void clearSession(ClientSession clientSession) {
+ if (clientSession.isCleanStart()) {
+ Set subscriptions = sessionStore.getSubscriptions(clientSession.getClientId());
+ for (Subscription subscription : subscriptions) {
+ subscriptionMatcher.unSubscribe(subscription.getTopic(), clientSession.getClientId());
+ }
+ sessionStore.clearSession(clientSession.getClientId(), false);
+ } else {
+ SessionState sessionState = new SessionState(SessionState.StateEnum.OFFLINE, System.currentTimeMillis());
+ sessionStore.storeSession(clientSession.getClientId(), sessionState);
+ }
+ }
+
}
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java
index f7347c18021610168e207691cbc07f2503e4233f..8df95bd6a4e96431047f353e32d3ef640579e037 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java
@@ -23,11 +23,14 @@ public class BrokerConfig {
private String messageStoreClass = "org.jmqtt.broker.store.rdb.RDBMessageStore";
private String authValidClass = "org.jmqtt.broker.acl.impl.DefaultAuthValid";
private String clusterEventHandlerClass = "org.jmqtt.broker.processor.dispatcher.rdb.RDBClusterEventHandler";
+ private String redisSupportClass = "org.jmqtt.broker.store.redis.support.RedisSupportImpl";
/* redis相关配置 */
private String redisHost = "127.0.0.1";
private int redisPort = 6379;
private String redisPassword = "";
+ private int redisDataBase = 0;
+ private String masterName = "master";
private int maxWaitMills = 60 * 1000;
private boolean testOnBorrow = true;
private int minIdle = 20;
@@ -69,6 +72,22 @@ public class BrokerConfig {
this.redisPassword = redisPassword;
}
+ public String getMasterName() {
+ return masterName;
+ }
+
+ public void setMasterName(String masterName) {
+ this.masterName = masterName;
+ }
+
+ public int getRedisDataBase() {
+ return redisDataBase;
+ }
+
+ public void setRedisDataBase(int redisDataBase) {
+ this.redisDataBase = redisDataBase;
+ }
+
public int getMaxWaitMills() {
return maxWaitMills;
}
@@ -217,6 +236,14 @@ public class BrokerConfig {
this.clusterEventHandlerClass = clusterEventHandlerClass;
}
+ public String getRedisSupportClass() {
+ return redisSupportClass;
+ }
+
+ public void setRedisSupportClass(String redisSupportClass) {
+ this.redisSupportClass = redisSupportClass;
+ }
+
public int getMaxPollEventNum() {
return maxPollEventNum;
}
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java
index f2f350f281aa36445feea858bb1980d6aabd4ae4..7ef7646b63777914f9fdc6b69bfbf4f60386643d 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java
@@ -53,6 +53,11 @@ public class NettyConfig {
*/
private int maxMsgSize = 512 * 1024;
+ /**
+ * 最大丢失连接时间,即10分钟未操作关闭连接
+ */
+ private long maxLossConnectTime = 10;
+
public int getTcpBackLog() {
return tcpBackLog;
}
@@ -246,4 +251,12 @@ public class NettyConfig {
public void setHttpPort(int httpPort) {
this.httpPort = httpPort;
}
+
+ public long getMaxLossConnectTime() {
+ return maxLossConnectTime;
+ }
+
+ public void setMaxLossConnectTime(long maxLossConnectTime) {
+ this.maxLossConnectTime = maxLossConnectTime;
+ }
}
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java
index 2c4df440453ba7453ffc49d28fbe2bd01e0fa977..c39c76aa086ade47963297343b20eecd7f0c9845 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java
@@ -13,10 +13,10 @@ import org.jmqtt.broker.processor.HighPerformanceMessageHandler;
import org.jmqtt.broker.remoting.session.ClientSession;
import org.jmqtt.broker.remoting.session.ConnectManager;
import org.jmqtt.broker.remoting.util.MessageUtil;
+import org.jmqtt.broker.remoting.util.RemotingHelper;
import org.jmqtt.broker.store.SessionStore;
import org.jmqtt.broker.subscribe.SubscriptionMatcher;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -130,6 +130,12 @@ public class DefaultDispatcherInnerMessage extends HighPerformanceMessageHandler
}
MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false);
clientSession.getCtx().writeAndFlush(publishMessage);
+
+ // 消息解码后的字符串
+ String messageDecodeStr = new String(message.getPayload(), "UTF-8");
+ LogUtil.debug(log, "[Broker Dispatcher Message] -> receiveClientIp={}, clientId={}, subTopic={}, qos={}, message={}",
+ RemotingHelper.getRemoteAddr(clientSession.getCtx().channel()), clientId, subscription.getTopic(),
+ qos, messageDecodeStr.replace("\n", ""));
} else {
sessionStore.storeOfflineMsg(clientId, message);
}
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java
index 236037cb86cb4bb1d11487b12ad275b2525851c7..47e3be045dbf4a3d53ff8feaf081627096e5dfbd 100644
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java
@@ -104,7 +104,7 @@ public class EventConsumeHandler {
return;
}
String clientId = event.getBody();
- if (ConnectManager.getInstance().containClient(clientId)) {
+ if (!ConnectManager.getInstance().containClient(clientId)) {
return;
}
ClientSession clientSession = ConnectManager.getInstance().getClient(clientId);
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java
index 03c9fcd25f8dc8f164948a10efea61c402062664..34671aefcd2c0ba2f21ade28634b8c2be512ccea 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java
@@ -30,6 +30,7 @@ import org.jmqtt.broker.store.SessionStore;
import org.jmqtt.broker.subscribe.SubscriptionMatcher;
import org.slf4j.Logger;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -97,7 +98,7 @@ public class ConnectProcessor implements RequestProcessor {
}
}
if (sessionState.getState() == SessionState.StateEnum.NULL) {
- clientSession = new ClientSession(clientId, false, ctx);
+ clientSession = new ClientSession(clientId, false, new Date(),ctx);
sessionPresent = false;
notifyClearOtherSession = false;
} else {
@@ -179,7 +180,7 @@ public class ConnectProcessor implements RequestProcessor {
}
private ClientSession createNewClientSession(String clientId, ChannelHandlerContext ctx) {
- ClientSession clientSession = new ClientSession(clientId, true);
+ ClientSession clientSession = new ClientSession(clientId, true, new Date());
clientSession.setCtx(ctx);
//clear previous sessions
this.sessionStore.clearSession(clientId,true);
@@ -190,7 +191,7 @@ public class ConnectProcessor implements RequestProcessor {
* cleanStart is false, reload client session
*/
private ClientSession reloadClientSession(ChannelHandlerContext ctx, String clientId) {
- ClientSession clientSession = new ClientSession(clientId, false);
+ ClientSession clientSession = new ClientSession(clientId, false, new Date());
clientSession.setCtx(ctx);
Set subscriptions = sessionStore.getSubscriptions(clientId);
for (Subscription subscription : subscriptions) {
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java
index e8c3545f9eed6cfb838d8e8d282ba477309accbd..5128e3b257df51eee439403eab036547c697c27d 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java
@@ -17,8 +17,8 @@ import org.jmqtt.broker.remoting.session.ClientSession;
import org.jmqtt.broker.remoting.session.ConnectManager;
import org.jmqtt.broker.remoting.util.MessageUtil;
import org.jmqtt.broker.remoting.util.NettyUtil;
+import org.jmqtt.broker.remoting.util.RemotingHelper;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@@ -74,6 +74,12 @@ public class PublishProcessor extends AbstractMessageProcessor implements Reques
default:
LogUtil.warn(log,"[PubMessage] -> Wrong mqtt message,clientId={}", clientId);
}
+
+ // 消息解码后的字符串
+ String messageDecodeStr = new String(innerMsg.getPayload(), "UTF-8");
+ LogUtil.debug(log, "[Client Publish Message] -> sendClientIp={}, clientId={}, topic={}, qos={}, message={}",
+ RemotingHelper.getRemoteAddr(ctx.channel()), clientId, topic, qos,
+ messageDecodeStr.replace("\n", ""));
} catch (Throwable tr) {
LogUtil.warn(log,"[PubMessage] -> Solve mqtt pub message exception:{}", tr.getMessage());
} finally {
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java
index a3251bb34ab84209a8905a43a01e0e91ec6be4d8..97c6ca30428f2aa4d3c2f8685180a9386234c3e4 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java
@@ -4,46 +4,82 @@ import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
+import org.apache.commons.lang3.StringUtils;
+import org.jmqtt.broker.common.config.NettyConfig;
import org.jmqtt.broker.common.log.JmqttLogger;
import org.jmqtt.broker.common.log.LogUtil;
+import org.jmqtt.broker.remoting.session.ClientSession;
+import org.jmqtt.broker.remoting.session.ConnectManager;
+import org.jmqtt.broker.remoting.util.NettyUtil;
import org.jmqtt.broker.remoting.util.RemotingHelper;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.util.Date;
public class NettyConnectHandler extends ChannelDuplexHandler {
private static final Logger log = JmqttLogger.remotingLog;
private NettyEventExecutor eventExecutor;
+ private NettyConfig nettyConfig;
- public NettyConnectHandler(NettyEventExecutor nettyEventExecutor){
+ public NettyConnectHandler(NettyEventExecutor nettyEventExecutor, NettyConfig nettyConfig){
this.eventExecutor = nettyEventExecutor;
+ this.nettyConfig = nettyConfig;
}
@Override
public void channelActive(ChannelHandlerContext ctx){
final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel());
- LogUtil.debug(log,"[ChannelActive] -> addr = {}",remoteAddr);
- this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr,NettyEventType.CONNECT,ctx.channel()));
+ LogUtil.debug(log, "[ChannelActive] -> 通道连接... addr = {}", remoteAddr);
+ this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr, NettyEventType.CONNECT, ctx.channel()));
}
@Override
public void channelInactive(ChannelHandlerContext ctx){
final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel());
- LogUtil.debug(log,"[ChannelInactive] -> addr = {}",remoteAddr);
- this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr,NettyEventType.CLOSE,ctx.channel()));
+ LogUtil.debug(log, "[ChannelInactive] -> 通道关闭... addr = {}", remoteAddr);
+ this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr, NettyEventType.CLOSE, ctx.channel()));
}
@Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt){
- if(evt instanceof IdleStateEvent){
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ String clientId = NettyUtil.getClientId(ctx.channel());
+ if (StringUtils.isEmpty(clientId)) {
+ LogUtil.warn(log, "[HEART_BEAT] -> 根据通道未查到客户端id, 无法判断心跳...");
+ return;
+ }
+ ClientSession client = ConnectManager.getInstance().getClient(clientId);
+ long lossConnectTime = client.getLossConnectTime();
+
+ if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
- if(event.state().equals(IdleState.READER_IDLE)){
- final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel());
- LogUtil.warn(log,"[HEART_BEAT] -> IDLE exception, addr = {}",remoteAddr);
- RemotingHelper.closeChannel(ctx.channel());
- this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr,NettyEventType.IDLE,ctx.channel()));
+ if (event.state().equals(IdleState.READER_IDLE)) {
+ // 设备离线,更改设备状态,增加离线操作日志
+ lossConnectTime++;
+ client.setLossConnectTime(lossConnectTime);
+ LogUtil.info(log, "[HEART_BEAT] -> 客户端 『{}』 离线 {} 分钟...", client.getClientId(), lossConnectTime);
+ if (lossConnectTime >= nettyConfig.getMaxLossConnectTime()) {
+ // 客户端断连10分钟
+ final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel());
+ LogUtil.warn(log, "[HEART_BEAT] -> 『{}』 心跳异常,服务器将主动关闭 『{}』 链路,原因:{}s 没收到消息了...",
+ remoteAddr, client.getClientId(), (lossConnectTime * 60));
+ RemotingHelper.closeChannel(ctx.channel());
+ this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr, NettyEventType.IDLE, ctx.channel()));
+ }
+ } else {
+ //复位
+ LogUtil.info(log, "[HEART_BEAT] -> 客户端 『{}』 恢复连接...", client.getClientId(), lossConnectTime);
+ client.setLossConnectTime(0);
+ client.setInitConnectDate(new Date());
+ super.userEventTriggered(ctx, evt);
}
+ } else {
+ //复位
+ LogUtil.info(log, "[HEART_BEAT] -> 客户端 『{}』 恢复连接...", client.getClientId(), lossConnectTime);
+ client.setLossConnectTime(0);
+ client.setInitConnectDate(new Date());
+ super.userEventTriggered(ctx, evt);
}
}
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java
index 677a92571b9c5eaf2a41f4129dc99c74ae289b41..48e732368c2e3fa901e0e72008b8b4ac433df8ee 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java
@@ -28,6 +28,7 @@ import org.jmqtt.broker.processor.RequestProcessor;
import org.jmqtt.broker.remoting.RemotingService;
import org.jmqtt.broker.remoting.netty.codec.ByteBuf2WebSocketEncoder;
import org.jmqtt.broker.remoting.netty.codec.WebSocket2ByteBufDecoder;
+import org.jmqtt.broker.remoting.util.RemotingHelper;
import org.slf4j.Logger;
import java.util.HashMap;
@@ -125,8 +126,7 @@ public class NettyRemotingServer implements RemotingService {
.addLast("byteBuf2WebSocketEncoder", new ByteBuf2WebSocketEncoder())
.addLast("mqttDecoder", new MqttDecoder(nettyConfig.getMaxMsgSize()))
.addLast("mqttEncoder", MqttEncoder.INSTANCE)
- .addLast("nettyConnectionManager", new NettyConnectHandler(
- nettyEventExecutor))
+ .addLast("nettyConnectionManager", new NettyConnectHandler(nettyEventExecutor, nettyConfig))
.addLast("nettyMqttHandler", new NettyMqttHandler());
}
});
@@ -168,8 +168,7 @@ public class NettyRemotingServer implements RemotingService {
pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0))
.addLast("mqttEncoder", MqttEncoder.INSTANCE)
.addLast("mqttDecoder", new MqttDecoder(nettyConfig.getMaxMsgSize()))
- .addLast("nettyConnectionManager", new NettyConnectHandler(
- nettyEventExecutor))
+ .addLast("nettyConnectionManager", new NettyConnectHandler(nettyEventExecutor, nettyConfig))
.addLast("nettyMqttHandler", new NettyMqttHandler());
}
});
@@ -205,7 +204,8 @@ public class NettyRemotingServer implements RemotingService {
MqttMessage mqttMessage = (MqttMessage) obj;
if (mqttMessage != null && mqttMessage.decoderResult().isSuccess()) {
MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
- LogUtil.debug(log,"[Remoting] -> receive mqtt code,type:{},name:{}", messageType.value(), messageType.name());
+ LogUtil.debug(log,"[Remoting] -> receive 『{}』 mqtt code,type:{},name:{}",
+ RemotingHelper.getRemoteAddr(ctx.channel()), messageType.value(), messageType.name());
Runnable runnable = () -> processorTable.get(messageType).getObject1().processRequest(ctx, mqttMessage);
try {
processorTable.get(messageType).getObject2().submit(runnable);
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java
index 8d3859cdb7c63ddf5ced4b2b71e8d3d54cc654e1..12f503dc5764f25a5dd8e7664988b509f5a71f9f 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java
@@ -2,6 +2,7 @@ package org.jmqtt.broker.remoting.session;
import io.netty.channel.ChannelHandlerContext;
+import java.util.Date;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
@@ -16,6 +17,8 @@ public class ClientSession {
private String clientId;
private boolean cleanStart;
private transient ChannelHandlerContext ctx;
+ private long lossConnectTime = 0;
+ private Date initConnectDate;
private transient AtomicInteger messageIdCounter = new AtomicInteger(1);
@@ -26,12 +29,25 @@ public class ClientSession {
this.cleanStart = cleanStart;
}
+ public ClientSession(String clientId, boolean cleanStart, Date initConnectDate) {
+ this.clientId = clientId;
+ this.cleanStart = cleanStart;
+ this.initConnectDate = initConnectDate;
+ }
+
public ClientSession(String clientId, boolean cleanStart, ChannelHandlerContext ctx) {
this.clientId = clientId;
this.cleanStart = cleanStart;
this.ctx = ctx;
}
+ public ClientSession(String clientId, boolean cleanStart, Date initConnectDate, ChannelHandlerContext ctx) {
+ this.clientId = clientId;
+ this.cleanStart = cleanStart;
+ this.initConnectDate = initConnectDate;
+ this.ctx = ctx;
+ }
+
public String getClientId() {
return clientId;
}
@@ -56,6 +72,14 @@ public class ClientSession {
this.ctx = ctx;
}
+ public long getLossConnectTime() {
+ return lossConnectTime;
+ }
+
+ public void setLossConnectTime(long lossConnectTime) {
+ this.lossConnectTime = lossConnectTime;
+ }
+
public int generateMessageId() {
int messageId = messageIdCounter.getAndIncrement();
messageId = Math.abs(messageId % 0xFFFF);
@@ -65,6 +89,14 @@ public class ClientSession {
return messageId;
}
+ public Date getInitConnectDate() {
+ return initConnectDate;
+ }
+
+ public void setInitConnectDate(Date initConnectDate) {
+ this.initConnectDate = initConnectDate;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) { return true; }
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java
index d6eb0e9e6d2760c60ae599c5528c157f42665a6e..ee3933b96349c37c9a632c59c190793063887ff1 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java
@@ -1,5 +1,11 @@
package org.jmqtt.broker.remoting.session;
+import com.alibaba.fastjson.JSONObject;
+import io.netty.channel.ChannelHandlerContext;
+import org.jmqtt.broker.remoting.util.RemotingHelper;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -36,4 +42,26 @@ public class ConnectManager {
}
return null;
}
+
+ /**
+ * 返回客户端列表
+ *
+ * @return @return {@link List }
+ */
+ public List listClients() {
+ List resultList = new ArrayList();
+ clientCache.forEach((k, v) -> {
+ ChannelHandlerContext ctx = v.getCtx();
+ resultList.add(
+ new JSONObject() {{
+ put("clientId", k);
+ put("clientIp", RemotingHelper.getRemoteAddr(ctx.channel()));
+ put("initConnectDate", v.getInitConnectDate());
+ put("lossConnectTime", v.getLossConnectTime());
+ }}
+ );
+ });
+ return resultList;
+ }
+
}
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java
index c6a339bb47c1d9e0b995b5046c6b8be2ba8780fd..8537cbb6a949eef2ef1e3c77a1f9f29de7748acf 100644
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java
@@ -2,13 +2,14 @@ package org.jmqtt.broker.store.redis.support;
public interface RedisKeySupport {
- String PREFIX = "JMQTT";
- String SEND_FLOW_MESSAGE = PREFIX + "_SEND_FLOW_";
- String SEND_FLOW_SEC_MESSAGE = PREFIX + "_SEND_FLOW_SEC_";
- String REC_FLOW_MESSAGE = PREFIX + "_REC_FLOW_";
- String OFFLINE = PREFIX + "_OFFLINE_";
- String RETAIN = PREFIX + "_RETAIN";
- String SESSION = PREFIX + "_SESSION_";
- String SUBSCRIPTION = PREFIX + "_SUBSCRIPTION_";
- String WILL = PREFIX + "_WILL_";
+ String PREFIX = "JMQTT:";
+ String SEND_FLOW_MESSAGE = PREFIX + "SEND_FLOW_";
+ String SEND_FLOW_SEC_MESSAGE = PREFIX + "SEND_FLOW_SEC_";
+ String REC_FLOW_MESSAGE = PREFIX + "REC_FLOW_";
+ String OFFLINE = PREFIX + "OFFLINE_";
+ String RETAIN = PREFIX + "RETAIN";
+ String SESSION = PREFIX + "SESSION_";
+ String SUBSCRIPTION = PREFIX + "SUBSCRIPTION_";
+ String WILL = PREFIX + "WILL_";
+
}
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSentinelSupportImpl.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSentinelSupportImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..f662f9e058eaa52b39de56b07f8ac75a154d23f6
--- /dev/null
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSentinelSupportImpl.java
@@ -0,0 +1,100 @@
+
+package org.jmqtt.broker.store.redis.support;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.jmqtt.broker.common.config.BrokerConfig;
+import org.jmqtt.broker.common.log.JmqttLogger;
+import org.jmqtt.broker.common.log.LogUtil;
+import org.jmqtt.broker.store.redis.RedisCallBack;
+import org.slf4j.Logger;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * redis哨兵支持实现
+ *
+ * @author nn200433
+ * @date 2021-07-21 03:27:04
+ */
+public class RedisSentinelSupportImpl implements RedisSupport {
+
+ private static final Logger log = JmqttLogger.storeLog;
+
+ private BrokerConfig brokerConfig;
+ private JedisSentinelPool jedisPool;
+
+ public RedisSentinelSupportImpl(BrokerConfig brokerConfig) {
+ this.brokerConfig = brokerConfig;
+ }
+
+ @Override
+ public void init() {
+ try {
+ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+ jedisPoolConfig.setMinIdle(brokerConfig.getMinIdle());
+ jedisPoolConfig.setMaxTotal(brokerConfig.getMaxTotal());
+ jedisPoolConfig.setTestOnBorrow(brokerConfig.isTestOnBorrow());
+ jedisPoolConfig.setMaxTotal(jedisPoolConfig.getMaxIdle());
+ jedisPoolConfig.setMaxWaitMillis(jedisPoolConfig.getMaxWaitMillis());
+
+ String[] redisHosts = brokerConfig.getRedisHost().split(",");
+ if (ArrayUtils.isEmpty(redisHosts)) {
+ throw new Exception("array [redisHost] is empty");
+ }
+
+ Set redisNodes = new HashSet(redisHosts.length);
+ for (String redisHost : redisHosts) {
+ String[] ipPort = redisHost.split(":");
+ if (ipPort.length != 2) {
+ throw new Exception("redisHost wrong format, example: 127.0.0.1:6379,127.0.0.1:6380");
+ } else {
+ redisNodes.add(redisHost);
+ }
+ }
+
+ if (StringUtils.isEmpty(brokerConfig.getRedisPassword())) {
+ throw new Exception("redisPassword is empty");
+ }
+
+ jedisPool = new JedisSentinelPool(brokerConfig.getMasterName(), redisNodes,
+ jedisPoolConfig, 10000, brokerConfig.getRedisPassword(), brokerConfig.getRedisDataBase());
+
+ LogUtil.debug(log, "[Redis handle] JedisSentinelPool init success");
+ } catch (Exception ex) {
+ LogUtil.error(log, "[Redis handle error],ex:{}", ex);
+ }
+ }
+
+ @Override
+ public T operate(RedisCallBack redisCallBack) {
+ LogUtil.debug(log, "[Cluster] redis operate begin");
+ long startTime = System.currentTimeMillis();
+ Jedis jedis = null;
+ try {
+ jedis = jedisPool.getResource();
+ return (T) redisCallBack.operate(jedis);
+ } catch (Exception ex) {
+ LogUtil.error(log, "[Cluster] redis operate error,ex:{}", ex);
+ } finally {
+ LogUtil.debug(log, "[Cluster] redis operate cost:{}", (System.currentTimeMillis() - startTime));
+ if (jedis != null) {
+ jedis.close();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+ LogUtil.info(log, "[Cluster] redis close");
+ if (jedisPool != null) {
+ jedisPool.close();
+ }
+ }
+
+}
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java
index 0246523b823caba4e9267797f7ec31b589f7e186..7348297d2b0dff6223d9a7b4465152aa47f0c628 100644
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java
@@ -6,11 +6,27 @@ import org.jmqtt.broker.store.redis.RedisCallBack;
* redis 支持类,定义主要的redis操作
*/
public interface RedisSupport {
+
/**
* 封装基本的redis操作,对{@link RedisCallBack} 暴露了Jedis对象
* @param redisCallBack
* @param
* @return
*/
- T operate(RedisCallBack redisCallBack);
+ public T operate(RedisCallBack redisCallBack);
+
+ /**
+ * 初始化
+ *
+ * @return
+ */
+ public void init();
+
+ /**
+ * 关闭
+ *
+ * @return
+ */
+ public void close();
+
}
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java
index 3a54e177020cc67bb361db73e03cfde2c54dc9d8..423981a8da17fa9b92db8efec64f97b8254ffd62 100755
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java
@@ -3,28 +3,27 @@ package org.jmqtt.broker.store.redis.support;
import org.apache.commons.lang3.StringUtils;
import org.jmqtt.broker.common.config.BrokerConfig;
-import org.jmqtt.broker.store.redis.RedisCallBack;
import org.jmqtt.broker.common.log.JmqttLogger;
import org.jmqtt.broker.common.log.LogUtil;
+import org.jmqtt.broker.store.redis.RedisCallBack;
import org.slf4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
-public class RedisSupportImpl implements RedisSupport{
+public class RedisSupportImpl implements RedisSupport {
private static final Logger log = JmqttLogger.storeLog;
private BrokerConfig brokerConfig;
- private JedisPool jedisPool;
+ private JedisPool jedisPool;
- public static final String PROJECT = "JMQTT";
-
- public RedisSupportImpl(BrokerConfig brokerConfig){
+ public RedisSupportImpl(BrokerConfig brokerConfig) {
this.brokerConfig = brokerConfig;
}
- void init(){
+ @Override
+ public void init() {
try {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMinIdle(brokerConfig.getMinIdle());
@@ -33,26 +32,30 @@ public class RedisSupportImpl implements RedisSupport{
jedisPoolConfig.setMaxTotal(jedisPoolConfig.getMaxIdle());
jedisPoolConfig.setMaxWaitMillis(jedisPoolConfig.getMaxWaitMillis());
if (StringUtils.isEmpty(brokerConfig.getRedisPassword())) {
- jedisPool = new JedisPool(jedisPoolConfig,brokerConfig.getRedisHost(),brokerConfig.getRedisPort());
+ jedisPool = new JedisPool(jedisPoolConfig, brokerConfig.getRedisHost(), brokerConfig.getRedisPort());
+ jedisPool.getResource().select(brokerConfig.getRedisDataBase());
} else {
- jedisPool = new JedisPool(jedisPoolConfig,brokerConfig.getRedisHost(),brokerConfig.getRedisPort(),10000,brokerConfig.getRedisPassword());
+ jedisPool = new JedisPool(jedisPoolConfig, brokerConfig.getRedisHost(), brokerConfig.getRedisPort(),
+ 10000, brokerConfig.getRedisPassword(), brokerConfig.getRedisDataBase());
}
+ LogUtil.debug(log, "[Redis handle] JedisPool init success");
} catch (Exception ex) {
- LogUtil.error(log,"[Redis handle error],ex:{}",ex);
+ LogUtil.error(log, "[Redis handle error],ex:{}", ex);
}
}
+
@Override
- public T operate(RedisCallBack redisCallBack){
- LogUtil.debug(log,"[Cluster] redis operate begin");
+ public T operate(RedisCallBack redisCallBack) {
+ LogUtil.debug(log, "[Cluster] redis operate begin");
long startTime = System.currentTimeMillis();
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
- return (T)redisCallBack.operate(jedis);
- }catch (Exception ex) {
- LogUtil.error(log,"[Cluster] redis operate error,ex:{}",ex);
+ return (T) redisCallBack.operate(jedis);
+ } catch (Exception ex) {
+ LogUtil.error(log, "[Cluster] redis operate error,ex:{}", ex);
} finally {
- LogUtil.debug(log,"[Cluster] redis operate cost:{}",(System.currentTimeMillis() - startTime));
+ LogUtil.debug(log, "[Cluster] redis operate cost:{}", (System.currentTimeMillis() - startTime));
if (jedis != null) {
jedis.close();
}
@@ -60,8 +63,9 @@ public class RedisSupportImpl implements RedisSupport{
return null;
}
- void close(){
- LogUtil.info(log,"[Cluster] redis close");
+ @Override
+ public void close() {
+ LogUtil.info(log, "[Cluster] redis close");
if (jedisPool != null) {
jedisPool.close();
}
diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java
index d2edfc5cfcd3b71333fbf6b608c42cb28c3898c0..dfd348d3f9ef158069bcdf9bf71712e411e941c1 100644
--- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java
+++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java
@@ -1,16 +1,23 @@
package org.jmqtt.broker.store.redis.support;
import org.jmqtt.broker.common.config.BrokerConfig;
-import org.jmqtt.broker.store.redis.RedisCallBack;
+import org.jmqtt.broker.common.log.JmqttLogger;
+import org.jmqtt.broker.common.log.LogUtil;
+import org.slf4j.Logger;
+import java.lang.reflect.Constructor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* redis 访问模板方法类
*/
public class RedisUtils {
+
+ private static final Logger log = JmqttLogger.otherLog;
+
private static final RedisUtils redisUtils = new RedisUtils();
- private volatile RedisSupportImpl redisSupport;
+
+ private volatile RedisSupport redisSupport;
private AtomicBoolean start = new AtomicBoolean(false);
@@ -21,15 +28,23 @@ public class RedisUtils {
}
public RedisSupport createSupport(BrokerConfig brokerConfig) {
- if (start.compareAndSet(false,true)) {
- this.redisSupport = new RedisSupportImpl(brokerConfig);
+ if (start.compareAndSet(false, true)) {
+ // this.redisSupport = new RedisSupportImpl(brokerConfig);
+ try {
+ // 使用配置反射实例化redis
+ Class> redisSupportClass = Class.forName(brokerConfig.getRedisSupportClass());
+ Constructor> redisSupportConstructor = redisSupportClass.getConstructor(BrokerConfig.class);
+ this.redisSupport = (RedisSupport) redisSupportConstructor.newInstance(brokerConfig);
+ } catch (Exception e) {
+ LogUtil.error(log, "init redis error, ex:{}", e);
+ }
this.redisSupport.init();
}
return redisSupport;
}
public void close() {
- RedisSupportImpl redisSupport = this.redisSupport;
+ RedisSupport redisSupport = this.redisSupport;
if(start.compareAndSet(true,false)&&redisSupport!=null){
redisSupport.close();
}
diff --git a/jmqtt-manager b/jmqtt-manager
new file mode 100644
index 0000000000000000000000000000000000000000..8b137891791fe96927ad78e64b0aad7bded08bdc
--- /dev/null
+++ b/jmqtt-manager
@@ -0,0 +1 @@
+
diff --git a/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java b/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java
index 5d8dfc3b88a6a34296bb381b5ee3ce3e3be8ba5f..afbb3d89d29e7ff1cd84104aa32da0fc74b0d06a 100644
--- a/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java
+++ b/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java
@@ -10,11 +10,7 @@ import org.jmqtt.broker.common.config.BrokerConfig;
import org.jmqtt.broker.common.config.NettyConfig;
import org.jmqtt.broker.common.helper.MixAll;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
+import java.io.*;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
@@ -59,26 +55,40 @@ public class BrokerStartupService {
}
public BrokerController getBrokerController() {
- String jmqttConfigPath =
- brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "jmqtt.properties";
- initConfig(jmqttConfigPath, brokerConfig, nettyConfig);
- try {
- LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
- File file = new File(brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "log4j2.xml");
- context.setConfigLocation(file.toURI());
- Configuration configuration = context.getConfiguration();
- Map loggerConfigMap = configuration.getLoggers();
- Level newLevel = Level.getLevel(brokerConfig.getLogLevel());
- if (newLevel == null) {
- newLevel = Level.INFO;
- }
- for (LoggerConfig value : loggerConfigMap.values()) {
- value.setLevel(newLevel);
- }
- context.updateLoggers(configuration);
- } catch (Exception ex) {
- System.err.print("Log4j2 load error,ex:" + ex);
- }
- return new BrokerController(brokerConfig, nettyConfig);
+ if (null == brokerConfig) {
+ // brokerConfig不为空时加载配置文件
+ String jmqttConfigPath =
+ brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "jmqtt.properties";
+ initConfig(jmqttConfigPath, brokerConfig, nettyConfig);
+ try {
+ LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
+ File file = new File(brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "log4j2.xml");
+ context.setConfigLocation(file.toURI());
+ Configuration configuration = context.getConfiguration();
+ Map loggerConfigMap = configuration.getLoggers();
+ Level newLevel = Level.getLevel(brokerConfig.getLogLevel());
+ if (newLevel == null) {
+ newLevel = Level.INFO;
+ }
+ for (LoggerConfig value : loggerConfigMap.values()) {
+ value.setLevel(newLevel);
+ }
+ context.updateLoggers(configuration);
+ } catch (Exception ex) {
+ System.err.print("Log4j2 load error,ex:" + ex);
+ }
+ }
+
+ // 启动服务,线程等
+ BrokerController brokerController = new BrokerController(brokerConfig, nettyConfig);
+ brokerController.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ brokerController.shutdown();
+ }
+ }));
+ return brokerController;
}
}
diff --git a/pom.xml b/pom.xml
index 93095614ef882d578258e91b81650226858859a1..5f715b57497e4af72e2676336e5e290517fbcc34 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,7 +51,7 @@
2.9.0
4.0.3
2.9.46
- 3.4.6
+ 3.5.6
1.2.4
8.0.16
2.13.3
@@ -168,41 +168,6 @@
-
-
-
- org.codehaus.mojo
- versions-maven-plugin
- 2.7
-
- false
-
-
-
- org.apache.maven.plugins
- maven-shade-plugin
- 2.4.1
-
-
- package
-
- shade
-
-
-
-
- org.jmqtt.broker.BrokerStartup
-
-
-
-
-
-
-
-
-
-
alibaba