diff --git a/jmqtt-acceptance/pom.xml b/jmqtt-acceptance/pom.xml index 32075cd15a133cacfc2c83ea0449b6cbc3939406..9ca06ba067beb0d90301bd0259d5c9bafab2f931 100644 --- a/jmqtt-acceptance/pom.xml +++ b/jmqtt-acceptance/pom.xml @@ -74,7 +74,7 @@ commons-io commons-io - 2.6 + 2.7 com.fasterxml.jackson.dataformat diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java index 613ab1938df47fee43360da8ab445856b26598c2..b916f6ffb4bd0ed03fcc87aaff5ea392ddf58381 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/BrokerController.java @@ -102,7 +102,8 @@ public class BrokerController { this.innerMessageDispatcher = new DefaultDispatcherInnerMessage(this); this.eventConsumeHandler = new EventConsumeHandler(this); - this.channelEventListener = new ClientLifeCycleHookService(messageStore, innerMessageDispatcher); + this.channelEventListener = new ClientLifeCycleHookService(sessionStore, messageStore, subscriptionMatcher, + innerMessageDispatcher); this.remotingServer = new NettyRemotingServer(brokerConfig, nettyConfig, channelEventListener); this.reSendMessageService = new ReSendMessageService(this); diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java index 3ab7f2fd033431a11c17219df85dcc0e5e907144..bbd234b4e29d9c2dc9ae00a9b3b03de653e7c5a7 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/client/ClientLifeCycleHookService.java @@ -5,21 +5,33 @@ import org.apache.commons.lang3.StringUtils; import org.jmqtt.broker.common.log.JmqttLogger; import org.jmqtt.broker.common.log.LogUtil; import org.jmqtt.broker.common.model.Message; +import org.jmqtt.broker.common.model.Subscription; import org.jmqtt.broker.processor.dispatcher.InnerMessageDispatcher; import org.jmqtt.broker.remoting.netty.ChannelEventListener; +import org.jmqtt.broker.remoting.session.ClientSession; import org.jmqtt.broker.remoting.session.ConnectManager; import org.jmqtt.broker.remoting.util.NettyUtil; import org.jmqtt.broker.store.MessageStore; +import org.jmqtt.broker.store.SessionState; +import org.jmqtt.broker.store.SessionStore; +import org.jmqtt.broker.subscribe.SubscriptionMatcher; import org.slf4j.Logger; +import java.util.Set; + public class ClientLifeCycleHookService implements ChannelEventListener { - private static final Logger log = JmqttLogger.clientTraceLog; - private MessageStore messageStore; - private InnerMessageDispatcher innerMessageDispatcher; + private static final Logger log = JmqttLogger.clientTraceLog; + private MessageStore messageStore; + private SessionStore sessionStore; + private SubscriptionMatcher subscriptionMatcher; + private InnerMessageDispatcher innerMessageDispatcher; - public ClientLifeCycleHookService(MessageStore messageStore, InnerMessageDispatcher innerMessageDispatcher) { + public ClientLifeCycleHookService(SessionStore sessionStore, MessageStore messageStore, + SubscriptionMatcher subscriptionMatcher, InnerMessageDispatcher innerMessageDispatcher) { + this.sessionStore = sessionStore; this.messageStore = messageStore; + this.subscriptionMatcher = subscriptionMatcher; this.innerMessageDispatcher = innerMessageDispatcher; } @@ -40,6 +52,17 @@ public class ClientLifeCycleHookService implements ChannelEventListener { @Override public void onChannelIdle(String remoteAddr, Channel channel) { + String clientId = NettyUtil.getClientId(channel); + if (StringUtils.isNotEmpty(clientId)) { + // 移除用户 + ConnectManager connectManager = ConnectManager.getInstance(); + ClientSession client = connectManager.getClient(clientId); + if (null != client) { + LogUtil.info(log, "[ClientLifeCycleHook] -> 心跳获取超时 『{}』 将被清理....", clientId); + clearSession(client); + connectManager.removeClient(clientId); + } + } } @Override @@ -48,4 +71,18 @@ public class ClientLifeCycleHookService implements ChannelEventListener { ConnectManager.getInstance().removeClient(clientId); LogUtil.warn(log, "[ClientLifeCycleHook] -> {} channelException,close channel and remove ConnectCache!", clientId); } + + private void clearSession(ClientSession clientSession) { + if (clientSession.isCleanStart()) { + Set subscriptions = sessionStore.getSubscriptions(clientSession.getClientId()); + for (Subscription subscription : subscriptions) { + subscriptionMatcher.unSubscribe(subscription.getTopic(), clientSession.getClientId()); + } + sessionStore.clearSession(clientSession.getClientId(), false); + } else { + SessionState sessionState = new SessionState(SessionState.StateEnum.OFFLINE, System.currentTimeMillis()); + sessionStore.storeSession(clientSession.getClientId(), sessionState); + } + } + } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java index f7347c18021610168e207691cbc07f2503e4233f..8df95bd6a4e96431047f353e32d3ef640579e037 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/BrokerConfig.java @@ -23,11 +23,14 @@ public class BrokerConfig { private String messageStoreClass = "org.jmqtt.broker.store.rdb.RDBMessageStore"; private String authValidClass = "org.jmqtt.broker.acl.impl.DefaultAuthValid"; private String clusterEventHandlerClass = "org.jmqtt.broker.processor.dispatcher.rdb.RDBClusterEventHandler"; + private String redisSupportClass = "org.jmqtt.broker.store.redis.support.RedisSupportImpl"; /* redis相关配置 */ private String redisHost = "127.0.0.1"; private int redisPort = 6379; private String redisPassword = ""; + private int redisDataBase = 0; + private String masterName = "master"; private int maxWaitMills = 60 * 1000; private boolean testOnBorrow = true; private int minIdle = 20; @@ -69,6 +72,22 @@ public class BrokerConfig { this.redisPassword = redisPassword; } + public String getMasterName() { + return masterName; + } + + public void setMasterName(String masterName) { + this.masterName = masterName; + } + + public int getRedisDataBase() { + return redisDataBase; + } + + public void setRedisDataBase(int redisDataBase) { + this.redisDataBase = redisDataBase; + } + public int getMaxWaitMills() { return maxWaitMills; } @@ -217,6 +236,14 @@ public class BrokerConfig { this.clusterEventHandlerClass = clusterEventHandlerClass; } + public String getRedisSupportClass() { + return redisSupportClass; + } + + public void setRedisSupportClass(String redisSupportClass) { + this.redisSupportClass = redisSupportClass; + } + public int getMaxPollEventNum() { return maxPollEventNum; } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java index f2f350f281aa36445feea858bb1980d6aabd4ae4..7ef7646b63777914f9fdc6b69bfbf4f60386643d 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/common/config/NettyConfig.java @@ -53,6 +53,11 @@ public class NettyConfig { */ private int maxMsgSize = 512 * 1024; + /** + * 最大丢失连接时间,即10分钟未操作关闭连接 + */ + private long maxLossConnectTime = 10; + public int getTcpBackLog() { return tcpBackLog; } @@ -246,4 +251,12 @@ public class NettyConfig { public void setHttpPort(int httpPort) { this.httpPort = httpPort; } + + public long getMaxLossConnectTime() { + return maxLossConnectTime; + } + + public void setMaxLossConnectTime(long maxLossConnectTime) { + this.maxLossConnectTime = maxLossConnectTime; + } } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java index 2c4df440453ba7453ffc49d28fbe2bd01e0fa977..c39c76aa086ade47963297343b20eecd7f0c9845 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/DefaultDispatcherInnerMessage.java @@ -13,10 +13,10 @@ import org.jmqtt.broker.processor.HighPerformanceMessageHandler; import org.jmqtt.broker.remoting.session.ClientSession; import org.jmqtt.broker.remoting.session.ConnectManager; import org.jmqtt.broker.remoting.util.MessageUtil; +import org.jmqtt.broker.remoting.util.RemotingHelper; import org.jmqtt.broker.store.SessionStore; import org.jmqtt.broker.subscribe.SubscriptionMatcher; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; @@ -130,6 +130,12 @@ public class DefaultDispatcherInnerMessage extends HighPerformanceMessageHandler } MqttPublishMessage publishMessage = MessageUtil.getPubMessage(message, false); clientSession.getCtx().writeAndFlush(publishMessage); + + // 消息解码后的字符串 + String messageDecodeStr = new String(message.getPayload(), "UTF-8"); + LogUtil.debug(log, "[Broker Dispatcher Message] -> receiveClientIp={}, clientId={}, subTopic={}, qos={}, message={}", + RemotingHelper.getRemoteAddr(clientSession.getCtx().channel()), clientId, subscription.getTopic(), + qos, messageDecodeStr.replace("\n", "")); } else { sessionStore.storeOfflineMsg(clientId, message); } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java index 236037cb86cb4bb1d11487b12ad275b2525851c7..47e3be045dbf4a3d53ff8feaf081627096e5dfbd 100644 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/dispatcher/EventConsumeHandler.java @@ -104,7 +104,7 @@ public class EventConsumeHandler { return; } String clientId = event.getBody(); - if (ConnectManager.getInstance().containClient(clientId)) { + if (!ConnectManager.getInstance().containClient(clientId)) { return; } ClientSession clientSession = ConnectManager.getInstance().getClient(clientId); diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java index 03c9fcd25f8dc8f164948a10efea61c402062664..34671aefcd2c0ba2f21ade28634b8c2be512ccea 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/ConnectProcessor.java @@ -30,6 +30,7 @@ import org.jmqtt.broker.store.SessionStore; import org.jmqtt.broker.subscribe.SubscriptionMatcher; import org.slf4j.Logger; +import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -97,7 +98,7 @@ public class ConnectProcessor implements RequestProcessor { } } if (sessionState.getState() == SessionState.StateEnum.NULL) { - clientSession = new ClientSession(clientId, false, ctx); + clientSession = new ClientSession(clientId, false, new Date(),ctx); sessionPresent = false; notifyClearOtherSession = false; } else { @@ -179,7 +180,7 @@ public class ConnectProcessor implements RequestProcessor { } private ClientSession createNewClientSession(String clientId, ChannelHandlerContext ctx) { - ClientSession clientSession = new ClientSession(clientId, true); + ClientSession clientSession = new ClientSession(clientId, true, new Date()); clientSession.setCtx(ctx); //clear previous sessions this.sessionStore.clearSession(clientId,true); @@ -190,7 +191,7 @@ public class ConnectProcessor implements RequestProcessor { * cleanStart is false, reload client session */ private ClientSession reloadClientSession(ChannelHandlerContext ctx, String clientId) { - ClientSession clientSession = new ClientSession(clientId, false); + ClientSession clientSession = new ClientSession(clientId, false, new Date()); clientSession.setCtx(ctx); Set subscriptions = sessionStore.getSubscriptions(clientId); for (Subscription subscription : subscriptions) { diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java index e8c3545f9eed6cfb838d8e8d282ba477309accbd..5128e3b257df51eee439403eab036547c697c27d 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/processor/protocol/PublishProcessor.java @@ -17,8 +17,8 @@ import org.jmqtt.broker.remoting.session.ClientSession; import org.jmqtt.broker.remoting.session.ConnectManager; import org.jmqtt.broker.remoting.util.MessageUtil; import org.jmqtt.broker.remoting.util.NettyUtil; +import org.jmqtt.broker.remoting.util.RemotingHelper; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -74,6 +74,12 @@ public class PublishProcessor extends AbstractMessageProcessor implements Reques default: LogUtil.warn(log,"[PubMessage] -> Wrong mqtt message,clientId={}", clientId); } + + // 消息解码后的字符串 + String messageDecodeStr = new String(innerMsg.getPayload(), "UTF-8"); + LogUtil.debug(log, "[Client Publish Message] -> sendClientIp={}, clientId={}, topic={}, qos={}, message={}", + RemotingHelper.getRemoteAddr(ctx.channel()), clientId, topic, qos, + messageDecodeStr.replace("\n", "")); } catch (Throwable tr) { LogUtil.warn(log,"[PubMessage] -> Solve mqtt pub message exception:{}", tr.getMessage()); } finally { diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java index a3251bb34ab84209a8905a43a01e0e91ec6be4d8..97c6ca30428f2aa4d3c2f8685180a9386234c3e4 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyConnectHandler.java @@ -4,46 +4,82 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import org.apache.commons.lang3.StringUtils; +import org.jmqtt.broker.common.config.NettyConfig; import org.jmqtt.broker.common.log.JmqttLogger; import org.jmqtt.broker.common.log.LogUtil; +import org.jmqtt.broker.remoting.session.ClientSession; +import org.jmqtt.broker.remoting.session.ConnectManager; +import org.jmqtt.broker.remoting.util.NettyUtil; import org.jmqtt.broker.remoting.util.RemotingHelper; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.util.Date; public class NettyConnectHandler extends ChannelDuplexHandler { private static final Logger log = JmqttLogger.remotingLog; private NettyEventExecutor eventExecutor; + private NettyConfig nettyConfig; - public NettyConnectHandler(NettyEventExecutor nettyEventExecutor){ + public NettyConnectHandler(NettyEventExecutor nettyEventExecutor, NettyConfig nettyConfig){ this.eventExecutor = nettyEventExecutor; + this.nettyConfig = nettyConfig; } @Override public void channelActive(ChannelHandlerContext ctx){ final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel()); - LogUtil.debug(log,"[ChannelActive] -> addr = {}",remoteAddr); - this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr,NettyEventType.CONNECT,ctx.channel())); + LogUtil.debug(log, "[ChannelActive] -> 通道连接... addr = {}", remoteAddr); + this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr, NettyEventType.CONNECT, ctx.channel())); } @Override public void channelInactive(ChannelHandlerContext ctx){ final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel()); - LogUtil.debug(log,"[ChannelInactive] -> addr = {}",remoteAddr); - this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr,NettyEventType.CLOSE,ctx.channel())); + LogUtil.debug(log, "[ChannelInactive] -> 通道关闭... addr = {}", remoteAddr); + this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr, NettyEventType.CLOSE, ctx.channel())); } @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt){ - if(evt instanceof IdleStateEvent){ + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + String clientId = NettyUtil.getClientId(ctx.channel()); + if (StringUtils.isEmpty(clientId)) { + LogUtil.warn(log, "[HEART_BEAT] -> 根据通道未查到客户端id, 无法判断心跳..."); + return; + } + ClientSession client = ConnectManager.getInstance().getClient(clientId); + long lossConnectTime = client.getLossConnectTime(); + + if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; - if(event.state().equals(IdleState.READER_IDLE)){ - final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel()); - LogUtil.warn(log,"[HEART_BEAT] -> IDLE exception, addr = {}",remoteAddr); - RemotingHelper.closeChannel(ctx.channel()); - this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr,NettyEventType.IDLE,ctx.channel())); + if (event.state().equals(IdleState.READER_IDLE)) { + // 设备离线,更改设备状态,增加离线操作日志 + lossConnectTime++; + client.setLossConnectTime(lossConnectTime); + LogUtil.info(log, "[HEART_BEAT] -> 客户端 『{}』 离线 {} 分钟...", client.getClientId(), lossConnectTime); + if (lossConnectTime >= nettyConfig.getMaxLossConnectTime()) { + // 客户端断连10分钟 + final String remoteAddr = RemotingHelper.getRemoteAddr(ctx.channel()); + LogUtil.warn(log, "[HEART_BEAT] -> 『{}』 心跳异常,服务器将主动关闭 『{}』 链路,原因:{}s 没收到消息了...", + remoteAddr, client.getClientId(), (lossConnectTime * 60)); + RemotingHelper.closeChannel(ctx.channel()); + this.eventExecutor.putNettyEvent(new NettyEvent(remoteAddr, NettyEventType.IDLE, ctx.channel())); + } + } else { + //复位 + LogUtil.info(log, "[HEART_BEAT] -> 客户端 『{}』 恢复连接...", client.getClientId(), lossConnectTime); + client.setLossConnectTime(0); + client.setInitConnectDate(new Date()); + super.userEventTriggered(ctx, evt); } + } else { + //复位 + LogUtil.info(log, "[HEART_BEAT] -> 客户端 『{}』 恢复连接...", client.getClientId(), lossConnectTime); + client.setLossConnectTime(0); + client.setInitConnectDate(new Date()); + super.userEventTriggered(ctx, evt); } } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java index 677a92571b9c5eaf2a41f4129dc99c74ae289b41..48e732368c2e3fa901e0e72008b8b4ac433df8ee 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/netty/NettyRemotingServer.java @@ -28,6 +28,7 @@ import org.jmqtt.broker.processor.RequestProcessor; import org.jmqtt.broker.remoting.RemotingService; import org.jmqtt.broker.remoting.netty.codec.ByteBuf2WebSocketEncoder; import org.jmqtt.broker.remoting.netty.codec.WebSocket2ByteBufDecoder; +import org.jmqtt.broker.remoting.util.RemotingHelper; import org.slf4j.Logger; import java.util.HashMap; @@ -125,8 +126,7 @@ public class NettyRemotingServer implements RemotingService { .addLast("byteBuf2WebSocketEncoder", new ByteBuf2WebSocketEncoder()) .addLast("mqttDecoder", new MqttDecoder(nettyConfig.getMaxMsgSize())) .addLast("mqttEncoder", MqttEncoder.INSTANCE) - .addLast("nettyConnectionManager", new NettyConnectHandler( - nettyEventExecutor)) + .addLast("nettyConnectionManager", new NettyConnectHandler(nettyEventExecutor, nettyConfig)) .addLast("nettyMqttHandler", new NettyMqttHandler()); } }); @@ -168,8 +168,7 @@ public class NettyRemotingServer implements RemotingService { pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0)) .addLast("mqttEncoder", MqttEncoder.INSTANCE) .addLast("mqttDecoder", new MqttDecoder(nettyConfig.getMaxMsgSize())) - .addLast("nettyConnectionManager", new NettyConnectHandler( - nettyEventExecutor)) + .addLast("nettyConnectionManager", new NettyConnectHandler(nettyEventExecutor, nettyConfig)) .addLast("nettyMqttHandler", new NettyMqttHandler()); } }); @@ -205,7 +204,8 @@ public class NettyRemotingServer implements RemotingService { MqttMessage mqttMessage = (MqttMessage) obj; if (mqttMessage != null && mqttMessage.decoderResult().isSuccess()) { MqttMessageType messageType = mqttMessage.fixedHeader().messageType(); - LogUtil.debug(log,"[Remoting] -> receive mqtt code,type:{},name:{}", messageType.value(), messageType.name()); + LogUtil.debug(log,"[Remoting] -> receive 『{}』 mqtt code,type:{},name:{}", + RemotingHelper.getRemoteAddr(ctx.channel()), messageType.value(), messageType.name()); Runnable runnable = () -> processorTable.get(messageType).getObject1().processRequest(ctx, mqttMessage); try { processorTable.get(messageType).getObject2().submit(runnable); diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java index 8d3859cdb7c63ddf5ced4b2b71e8d3d54cc654e1..12f503dc5764f25a5dd8e7664988b509f5a71f9f 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ClientSession.java @@ -2,6 +2,7 @@ package org.jmqtt.broker.remoting.session; import io.netty.channel.ChannelHandlerContext; +import java.util.Date; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; @@ -16,6 +17,8 @@ public class ClientSession { private String clientId; private boolean cleanStart; private transient ChannelHandlerContext ctx; + private long lossConnectTime = 0; + private Date initConnectDate; private transient AtomicInteger messageIdCounter = new AtomicInteger(1); @@ -26,12 +29,25 @@ public class ClientSession { this.cleanStart = cleanStart; } + public ClientSession(String clientId, boolean cleanStart, Date initConnectDate) { + this.clientId = clientId; + this.cleanStart = cleanStart; + this.initConnectDate = initConnectDate; + } + public ClientSession(String clientId, boolean cleanStart, ChannelHandlerContext ctx) { this.clientId = clientId; this.cleanStart = cleanStart; this.ctx = ctx; } + public ClientSession(String clientId, boolean cleanStart, Date initConnectDate, ChannelHandlerContext ctx) { + this.clientId = clientId; + this.cleanStart = cleanStart; + this.initConnectDate = initConnectDate; + this.ctx = ctx; + } + public String getClientId() { return clientId; } @@ -56,6 +72,14 @@ public class ClientSession { this.ctx = ctx; } + public long getLossConnectTime() { + return lossConnectTime; + } + + public void setLossConnectTime(long lossConnectTime) { + this.lossConnectTime = lossConnectTime; + } + public int generateMessageId() { int messageId = messageIdCounter.getAndIncrement(); messageId = Math.abs(messageId % 0xFFFF); @@ -65,6 +89,14 @@ public class ClientSession { return messageId; } + public Date getInitConnectDate() { + return initConnectDate; + } + + public void setInitConnectDate(Date initConnectDate) { + this.initConnectDate = initConnectDate; + } + @Override public boolean equals(Object o) { if (this == o) { return true; } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java index d6eb0e9e6d2760c60ae599c5528c157f42665a6e..ee3933b96349c37c9a632c59c190793063887ff1 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/remoting/session/ConnectManager.java @@ -1,5 +1,11 @@ package org.jmqtt.broker.remoting.session; +import com.alibaba.fastjson.JSONObject; +import io.netty.channel.ChannelHandlerContext; +import org.jmqtt.broker.remoting.util.RemotingHelper; + +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -36,4 +42,26 @@ public class ConnectManager { } return null; } + + /** + * 返回客户端列表 + * + * @return @return {@link List } + */ + public List listClients() { + List resultList = new ArrayList(); + clientCache.forEach((k, v) -> { + ChannelHandlerContext ctx = v.getCtx(); + resultList.add( + new JSONObject() {{ + put("clientId", k); + put("clientIp", RemotingHelper.getRemoteAddr(ctx.channel())); + put("initConnectDate", v.getInitConnectDate()); + put("lossConnectTime", v.getLossConnectTime()); + }} + ); + }); + return resultList; + } + } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java index c6a339bb47c1d9e0b995b5046c6b8be2ba8780fd..8537cbb6a949eef2ef1e3c77a1f9f29de7748acf 100644 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisKeySupport.java @@ -2,13 +2,14 @@ package org.jmqtt.broker.store.redis.support; public interface RedisKeySupport { - String PREFIX = "JMQTT"; - String SEND_FLOW_MESSAGE = PREFIX + "_SEND_FLOW_"; - String SEND_FLOW_SEC_MESSAGE = PREFIX + "_SEND_FLOW_SEC_"; - String REC_FLOW_MESSAGE = PREFIX + "_REC_FLOW_"; - String OFFLINE = PREFIX + "_OFFLINE_"; - String RETAIN = PREFIX + "_RETAIN"; - String SESSION = PREFIX + "_SESSION_"; - String SUBSCRIPTION = PREFIX + "_SUBSCRIPTION_"; - String WILL = PREFIX + "_WILL_"; + String PREFIX = "JMQTT:"; + String SEND_FLOW_MESSAGE = PREFIX + "SEND_FLOW_"; + String SEND_FLOW_SEC_MESSAGE = PREFIX + "SEND_FLOW_SEC_"; + String REC_FLOW_MESSAGE = PREFIX + "REC_FLOW_"; + String OFFLINE = PREFIX + "OFFLINE_"; + String RETAIN = PREFIX + "RETAIN"; + String SESSION = PREFIX + "SESSION_"; + String SUBSCRIPTION = PREFIX + "SUBSCRIPTION_"; + String WILL = PREFIX + "WILL_"; + } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSentinelSupportImpl.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSentinelSupportImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..f662f9e058eaa52b39de56b07f8ac75a154d23f6 --- /dev/null +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSentinelSupportImpl.java @@ -0,0 +1,100 @@ + +package org.jmqtt.broker.store.redis.support; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.jmqtt.broker.common.config.BrokerConfig; +import org.jmqtt.broker.common.log.JmqttLogger; +import org.jmqtt.broker.common.log.LogUtil; +import org.jmqtt.broker.store.redis.RedisCallBack; +import org.slf4j.Logger; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.JedisSentinelPool; + +import java.util.HashSet; +import java.util.Set; + +/** + * redis哨兵支持实现 + * + * @author nn200433 + * @date 2021-07-21 03:27:04 + */ +public class RedisSentinelSupportImpl implements RedisSupport { + + private static final Logger log = JmqttLogger.storeLog; + + private BrokerConfig brokerConfig; + private JedisSentinelPool jedisPool; + + public RedisSentinelSupportImpl(BrokerConfig brokerConfig) { + this.brokerConfig = brokerConfig; + } + + @Override + public void init() { + try { + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + jedisPoolConfig.setMinIdle(brokerConfig.getMinIdle()); + jedisPoolConfig.setMaxTotal(brokerConfig.getMaxTotal()); + jedisPoolConfig.setTestOnBorrow(brokerConfig.isTestOnBorrow()); + jedisPoolConfig.setMaxTotal(jedisPoolConfig.getMaxIdle()); + jedisPoolConfig.setMaxWaitMillis(jedisPoolConfig.getMaxWaitMillis()); + + String[] redisHosts = brokerConfig.getRedisHost().split(","); + if (ArrayUtils.isEmpty(redisHosts)) { + throw new Exception("array [redisHost] is empty"); + } + + Set redisNodes = new HashSet(redisHosts.length); + for (String redisHost : redisHosts) { + String[] ipPort = redisHost.split(":"); + if (ipPort.length != 2) { + throw new Exception("redisHost wrong format, example: 127.0.0.1:6379,127.0.0.1:6380"); + } else { + redisNodes.add(redisHost); + } + } + + if (StringUtils.isEmpty(brokerConfig.getRedisPassword())) { + throw new Exception("redisPassword is empty"); + } + + jedisPool = new JedisSentinelPool(brokerConfig.getMasterName(), redisNodes, + jedisPoolConfig, 10000, brokerConfig.getRedisPassword(), brokerConfig.getRedisDataBase()); + + LogUtil.debug(log, "[Redis handle] JedisSentinelPool init success"); + } catch (Exception ex) { + LogUtil.error(log, "[Redis handle error],ex:{}", ex); + } + } + + @Override + public T operate(RedisCallBack redisCallBack) { + LogUtil.debug(log, "[Cluster] redis operate begin"); + long startTime = System.currentTimeMillis(); + Jedis jedis = null; + try { + jedis = jedisPool.getResource(); + return (T) redisCallBack.operate(jedis); + } catch (Exception ex) { + LogUtil.error(log, "[Cluster] redis operate error,ex:{}", ex); + } finally { + LogUtil.debug(log, "[Cluster] redis operate cost:{}", (System.currentTimeMillis() - startTime)); + if (jedis != null) { + jedis.close(); + } + } + return null; + } + + @Override + public void close() { + LogUtil.info(log, "[Cluster] redis close"); + if (jedisPool != null) { + jedisPool.close(); + } + } + +} diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java index 0246523b823caba4e9267797f7ec31b589f7e186..7348297d2b0dff6223d9a7b4465152aa47f0c628 100644 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupport.java @@ -6,11 +6,27 @@ import org.jmqtt.broker.store.redis.RedisCallBack; * redis 支持类,定义主要的redis操作 */ public interface RedisSupport { + /** * 封装基本的redis操作,对{@link RedisCallBack} 暴露了Jedis对象 * @param redisCallBack * @param * @return */ - T operate(RedisCallBack redisCallBack); + public T operate(RedisCallBack redisCallBack); + + /** + * 初始化 + * + * @return + */ + public void init(); + + /** + * 关闭 + * + * @return + */ + public void close(); + } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java index 3a54e177020cc67bb361db73e03cfde2c54dc9d8..423981a8da17fa9b92db8efec64f97b8254ffd62 100755 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisSupportImpl.java @@ -3,28 +3,27 @@ package org.jmqtt.broker.store.redis.support; import org.apache.commons.lang3.StringUtils; import org.jmqtt.broker.common.config.BrokerConfig; -import org.jmqtt.broker.store.redis.RedisCallBack; import org.jmqtt.broker.common.log.JmqttLogger; import org.jmqtt.broker.common.log.LogUtil; +import org.jmqtt.broker.store.redis.RedisCallBack; import org.slf4j.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; -public class RedisSupportImpl implements RedisSupport{ +public class RedisSupportImpl implements RedisSupport { private static final Logger log = JmqttLogger.storeLog; private BrokerConfig brokerConfig; - private JedisPool jedisPool; + private JedisPool jedisPool; - public static final String PROJECT = "JMQTT"; - - public RedisSupportImpl(BrokerConfig brokerConfig){ + public RedisSupportImpl(BrokerConfig brokerConfig) { this.brokerConfig = brokerConfig; } - void init(){ + @Override + public void init() { try { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMinIdle(brokerConfig.getMinIdle()); @@ -33,26 +32,30 @@ public class RedisSupportImpl implements RedisSupport{ jedisPoolConfig.setMaxTotal(jedisPoolConfig.getMaxIdle()); jedisPoolConfig.setMaxWaitMillis(jedisPoolConfig.getMaxWaitMillis()); if (StringUtils.isEmpty(brokerConfig.getRedisPassword())) { - jedisPool = new JedisPool(jedisPoolConfig,brokerConfig.getRedisHost(),brokerConfig.getRedisPort()); + jedisPool = new JedisPool(jedisPoolConfig, brokerConfig.getRedisHost(), brokerConfig.getRedisPort()); + jedisPool.getResource().select(brokerConfig.getRedisDataBase()); } else { - jedisPool = new JedisPool(jedisPoolConfig,brokerConfig.getRedisHost(),brokerConfig.getRedisPort(),10000,brokerConfig.getRedisPassword()); + jedisPool = new JedisPool(jedisPoolConfig, brokerConfig.getRedisHost(), brokerConfig.getRedisPort(), + 10000, brokerConfig.getRedisPassword(), brokerConfig.getRedisDataBase()); } + LogUtil.debug(log, "[Redis handle] JedisPool init success"); } catch (Exception ex) { - LogUtil.error(log,"[Redis handle error],ex:{}",ex); + LogUtil.error(log, "[Redis handle error],ex:{}", ex); } } + @Override - public T operate(RedisCallBack redisCallBack){ - LogUtil.debug(log,"[Cluster] redis operate begin"); + public T operate(RedisCallBack redisCallBack) { + LogUtil.debug(log, "[Cluster] redis operate begin"); long startTime = System.currentTimeMillis(); Jedis jedis = null; try { jedis = jedisPool.getResource(); - return (T)redisCallBack.operate(jedis); - }catch (Exception ex) { - LogUtil.error(log,"[Cluster] redis operate error,ex:{}",ex); + return (T) redisCallBack.operate(jedis); + } catch (Exception ex) { + LogUtil.error(log, "[Cluster] redis operate error,ex:{}", ex); } finally { - LogUtil.debug(log,"[Cluster] redis operate cost:{}",(System.currentTimeMillis() - startTime)); + LogUtil.debug(log, "[Cluster] redis operate cost:{}", (System.currentTimeMillis() - startTime)); if (jedis != null) { jedis.close(); } @@ -60,8 +63,9 @@ public class RedisSupportImpl implements RedisSupport{ return null; } - void close(){ - LogUtil.info(log,"[Cluster] redis close"); + @Override + public void close() { + LogUtil.info(log, "[Cluster] redis close"); if (jedisPool != null) { jedisPool.close(); } diff --git a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java index d2edfc5cfcd3b71333fbf6b608c42cb28c3898c0..dfd348d3f9ef158069bcdf9bf71712e411e941c1 100644 --- a/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java +++ b/jmqtt-broker/src/main/java/org/jmqtt/broker/store/redis/support/RedisUtils.java @@ -1,16 +1,23 @@ package org.jmqtt.broker.store.redis.support; import org.jmqtt.broker.common.config.BrokerConfig; -import org.jmqtt.broker.store.redis.RedisCallBack; +import org.jmqtt.broker.common.log.JmqttLogger; +import org.jmqtt.broker.common.log.LogUtil; +import org.slf4j.Logger; +import java.lang.reflect.Constructor; import java.util.concurrent.atomic.AtomicBoolean; /** * redis 访问模板方法类 */ public class RedisUtils { + + private static final Logger log = JmqttLogger.otherLog; + private static final RedisUtils redisUtils = new RedisUtils(); - private volatile RedisSupportImpl redisSupport; + + private volatile RedisSupport redisSupport; private AtomicBoolean start = new AtomicBoolean(false); @@ -21,15 +28,23 @@ public class RedisUtils { } public RedisSupport createSupport(BrokerConfig brokerConfig) { - if (start.compareAndSet(false,true)) { - this.redisSupport = new RedisSupportImpl(brokerConfig); + if (start.compareAndSet(false, true)) { + // this.redisSupport = new RedisSupportImpl(brokerConfig); + try { + // 使用配置反射实例化redis + Class redisSupportClass = Class.forName(brokerConfig.getRedisSupportClass()); + Constructor redisSupportConstructor = redisSupportClass.getConstructor(BrokerConfig.class); + this.redisSupport = (RedisSupport) redisSupportConstructor.newInstance(brokerConfig); + } catch (Exception e) { + LogUtil.error(log, "init redis error, ex:{}", e); + } this.redisSupport.init(); } return redisSupport; } public void close() { - RedisSupportImpl redisSupport = this.redisSupport; + RedisSupport redisSupport = this.redisSupport; if(start.compareAndSet(true,false)&&redisSupport!=null){ redisSupport.close(); } diff --git a/jmqtt-manager b/jmqtt-manager new file mode 100644 index 0000000000000000000000000000000000000000..8b137891791fe96927ad78e64b0aad7bded08bdc --- /dev/null +++ b/jmqtt-manager @@ -0,0 +1 @@ + diff --git a/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java b/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java index 5d8dfc3b88a6a34296bb381b5ee3ce3e3be8ba5f..afbb3d89d29e7ff1cd84104aa32da0fc74b0d06a 100644 --- a/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java +++ b/jmqtt-springboot-starter/src/main/java/org/jmqtt/starter/service/BrokerStartupService.java @@ -10,11 +10,7 @@ import org.jmqtt.broker.common.config.BrokerConfig; import org.jmqtt.broker.common.config.NettyConfig; import org.jmqtt.broker.common.helper.MixAll; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; +import java.io.*; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -59,26 +55,40 @@ public class BrokerStartupService { } public BrokerController getBrokerController() { - String jmqttConfigPath = - brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "jmqtt.properties"; - initConfig(jmqttConfigPath, brokerConfig, nettyConfig); - try { - LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); - File file = new File(brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "log4j2.xml"); - context.setConfigLocation(file.toURI()); - Configuration configuration = context.getConfiguration(); - Map loggerConfigMap = configuration.getLoggers(); - Level newLevel = Level.getLevel(brokerConfig.getLogLevel()); - if (newLevel == null) { - newLevel = Level.INFO; - } - for (LoggerConfig value : loggerConfigMap.values()) { - value.setLevel(newLevel); - } - context.updateLoggers(configuration); - } catch (Exception ex) { - System.err.print("Log4j2 load error,ex:" + ex); - } - return new BrokerController(brokerConfig, nettyConfig); + if (null == brokerConfig) { + // brokerConfig不为空时加载配置文件 + String jmqttConfigPath = + brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "jmqtt.properties"; + initConfig(jmqttConfigPath, brokerConfig, nettyConfig); + try { + LoggerContext context = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); + File file = new File(brokerConfig.getJmqttHome() + File.separator + "conf" + File.separator + "log4j2.xml"); + context.setConfigLocation(file.toURI()); + Configuration configuration = context.getConfiguration(); + Map loggerConfigMap = configuration.getLoggers(); + Level newLevel = Level.getLevel(brokerConfig.getLogLevel()); + if (newLevel == null) { + newLevel = Level.INFO; + } + for (LoggerConfig value : loggerConfigMap.values()) { + value.setLevel(newLevel); + } + context.updateLoggers(configuration); + } catch (Exception ex) { + System.err.print("Log4j2 load error,ex:" + ex); + } + } + + // 启动服务,线程等 + BrokerController brokerController = new BrokerController(brokerConfig, nettyConfig); + brokerController.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + brokerController.shutdown(); + } + })); + return brokerController; } } diff --git a/pom.xml b/pom.xml index 93095614ef882d578258e91b81650226858859a1..5f715b57497e4af72e2676336e5e290517fbcc34 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ 2.9.0 4.0.3 2.9.46 - 3.4.6 + 3.5.6 1.2.4 8.0.16 2.13.3 @@ -168,41 +168,6 @@ - - - - org.codehaus.mojo - versions-maven-plugin - 2.7 - - false - - - - org.apache.maven.plugins - maven-shade-plugin - 2.4.1 - - - package - - shade - - - - - org.jmqtt.broker.BrokerStartup - - - - - - - - - - alibaba