diff --git a/smart-mqtt-data-persistence/pom.xml b/smart-mqtt-data-persistence/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..1e7bd7ba1f055c724a1d32bc26deadba5e389556 --- /dev/null +++ b/smart-mqtt-data-persistence/pom.xml @@ -0,0 +1,70 @@ + + + smart-mqtt + org.smartboot.mqtt + 0.19 + ../pom.xml + + 4.0.0 + + smart-mqtt-data-persistence + jar + + smart-mqtt-data-persistence + + + UTF-8 + + + + + org.smartboot.mqtt + smart-mqtt-broker + + + + redis.clients + jedis + 4.3.1 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.10.1 + + 1.8 + 1.8 + false + + + + maven-shade-plugin + 3.2.4 + + + package + + shade + + + false + + + + META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin + + + + + + + + + + diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java new file mode 100644 index 0000000000000000000000000000000000000000..7a198db24d97b8f55ee58555c12dde0e9615c983 --- /dev/null +++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java @@ -0,0 +1,18 @@ +package org.smartboot.mqtt.data.persistence; + + +import org.smartboot.mqtt.broker.plugin.Plugin; +import org.smartboot.mqtt.data.persistence.config.DataSourcePluginConfig; + + +public class DataPersistPlugin extends Plugin { + private DataSourcePluginConfig config; + + public void setConfig(DataSourcePluginConfig config) { + this.config = config; + } + + public DataSourcePluginConfig getConfig() { + return config; + } +} diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/DataSourcePluginConfig.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/DataSourcePluginConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..0514107a9bcc5c025b3fbe0cad41227a8275709c --- /dev/null +++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/DataSourcePluginConfig.java @@ -0,0 +1,37 @@ +package org.smartboot.mqtt.data.persistence.config; + + +import org.smartboot.mqtt.broker.config.PluginConfig; + +public class DataSourcePluginConfig extends PluginConfig { + private String password; + + private int timeout = 1000; + + private boolean base64 = false; + + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public boolean isBase64() { + return base64; + } + + public void setBase64(boolean base64) { + this.base64 = base64; + } +} diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/handler/BrokerHandler.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/handler/BrokerHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..08823065eac9ebc29c7614282ee40966f1d2ca43 --- /dev/null +++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/handler/BrokerHandler.java @@ -0,0 +1,42 @@ +package org.smartboot.mqtt.data.persistence.handler; + +import com.sun.management.OperatingSystemMXBean; +import org.smartboot.mqtt.broker.BrokerConfigure; +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.BrokerRuntime; +import org.smartboot.mqtt.data.persistence.nodeinfo.BrokerNodeInfo; + +import java.lang.management.ManagementFactory; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; + +public class BrokerHandler { + + private static final String MQTT_PREFIX = "smart-mqtt@"; + + + public static Map handler(BrokerContext brokerContext){ + BrokerRuntime brokerRuntime = brokerContext.getRuntime(); + BrokerNodeInfo brokerNodeInfo = new BrokerNodeInfo(); + // 名字设置 + brokerNodeInfo.setName(MQTT_PREFIX + brokerContext.getBrokerConfigure().getName()); + // 设置Broker版本号 + brokerNodeInfo.setVersion(BrokerConfigure.VERSION); + // 设置ip地址 + brokerNodeInfo.setIpAddress(brokerContext.getBrokerConfigure().getHost()); + // 设置Pid + brokerNodeInfo.setPid(brokerRuntime.getPid()); + // 设置内存 + brokerNodeInfo.setMemory(String.valueOf((int) ((Runtime.getRuntime().totalMemory()) * 100.0 / (Runtime.getRuntime().maxMemory())))); + // 设置cpu资源 + OperatingSystemMXBean systemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + brokerNodeInfo.setCpu(String.valueOf((int) (systemMXBean.getSystemCpuLoad() * 100))); + // 最后一次启动时间 + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String currentDateTimeString = dateFormat.format(new Date()); + brokerNodeInfo.setRecentTime(currentDateTimeString); + + return brokerNodeInfo.toMap(); + } +} diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java new file mode 100644 index 0000000000000000000000000000000000000000..e4d35420d94553d00100df3504694bf9516e571d --- /dev/null +++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java @@ -0,0 +1,97 @@ +package org.smartboot.mqtt.data.persistence.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.eventbus.ServerEventType; +import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; +import org.smartboot.mqtt.broker.plugin.PluginException; +import org.smartboot.mqtt.common.eventbus.EventBus; +import org.smartboot.mqtt.data.persistence.DataPersistPlugin; +import org.smartboot.mqtt.data.persistence.config.DataSourcePluginConfig; +import org.smartboot.mqtt.data.persistence.handler.BrokerHandler; +import org.smartboot.mqtt.data.persistence.nodeinfo.MessageNodeInfo; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; + +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +public class RedisPlugin extends DataPersistPlugin { + private static final Logger LOGGER = LoggerFactory.getLogger(RedisPlugin.class); + private static final String CONFIG_JSON_PATH = "$['plugins']['redis-bridge'][0]"; + private static final String CRTEATE_TIME_FIELD_NAME = "createTime"; + private static final String RECENT_TIME_FIELD_NAME = "recentTime"; + private static final String DEFALUT_REDIS_BROKER_KEY_FIELD = "name"; + private static final Lock lock = new ReentrantLock(); + private static JedisPool jedisPool = null; + + @Override + protected void initPlugin(BrokerContext brokerContext) { + DataSourcePluginConfig config = brokerContext.parseConfig(CONFIG_JSON_PATH, DataSourcePluginConfig.class); + if (config == null) { + LOGGER.error("config maybe error, parse fail!"); + throw new PluginException("start DataPersistRedisPlugin exception"); + } + this.setConfig(config); + + // 完成redis线程池的 + if (jedisPool == null) { + lock.lock(); + try { + if (jedisPool == null){ + JedisPoolConfig poolConfig = new JedisPoolConfig(); + poolConfig.setMaxTotal(10); // 最大连接数 + poolConfig.setMaxIdle(2); // 最大空闲连接数 + poolConfig.setTestOnReturn(false); + poolConfig.setTestOnBorrow(true); // 检查连接可用性, 确保获取的redis实例可用 + poolConfig.setTestOnCreate(false); + jedisPool = new JedisPool(poolConfig, config.getHost(), config.getPort(),config.getTimeout(), config.getPassword()); + LOGGER.info("redisPoll create success"); + } + }finally { + lock.unlock(); + } + } + EventBus eventBus = brokerContext.getEventBus(); + // 事件监听总线 监听broker创建 + eventBus.subscribe(ServerEventType.BROKER_STARTED, (eventType, subscriber) -> { + Jedis resource = jedisPool.getResource(); + Map handler = BrokerHandler.handler(brokerContext); + String result = resource.hget(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD), CRTEATE_TIME_FIELD_NAME); + if (result == null){ + handler.put(CRTEATE_TIME_FIELD_NAME,handler.get(RECENT_TIME_FIELD_NAME)); + } + resource.hmset(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD),handler); + jedisPool.returnResource(resource); + }); + + // 消息总线监听 + MessageBus messageBus = brokerContext.getMessageBus(); + messageBus.consumer((brokerContext1, publishMessage) -> { + Jedis resource = jedisPool.getResource(); + String message = new MessageNodeInfo(publishMessage).toString(this.getConfig().isBase64()); + resource.lpush(brokerContext1.getBrokerConfigure().getName() + ":" + publishMessage.getVariableHeader().getTopicName(),message); + jedisPool.returnResource(resource); + }); + } + + + + @Override + protected void destroyPlugin() { + lock.lock(); + try { + if (jedisPool != null) { + jedisPool.close(); + // 防止内存泄漏 + jedisPool = null; + } + } finally { + lock.unlock(); + } + } +} diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/BrokerNodeInfo.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/BrokerNodeInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..907d5fe0bb8d0e054f9bd2809be72b9033d7854c --- /dev/null +++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/BrokerNodeInfo.java @@ -0,0 +1,118 @@ +package org.smartboot.mqtt.data.persistence.nodeinfo; + + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.TypeReference; + +import java.util.Map; + +public class BrokerNodeInfo { + /** + * 节点名称 + */ + private String name; + + + /** + * broker版本 + */ + private String version; + /** + * Broker IP地址 + */ + private String ipAddress; + + + /** + * 服务进程 + */ + private String pid; + + /** + * 内存使用率 + */ + private String memory; + + /** + * CPU使用率 + */ + private String cpu; + + /** + * 最近启动时间 + */ + private String recentTime; + + /** + * broker创建时间 + */ + private String createTime; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getIpAddress() { + return ipAddress; + } + + public void setIpAddress(String ipAddress) { + this.ipAddress = ipAddress; + } + + public String getPid() { + return pid; + } + + public void setPid(String pid) { + this.pid = pid; + } + + public String getMemory() { + return memory; + } + + public void setMemory(String memory) { + this.memory = memory; + } + + public String getCpu() { + return cpu; + } + + public void setCpu(String cpu) { + this.cpu = cpu; + } + + public String getRecentTime() { + return recentTime; + } + + public void setRecentTime(String recentTime) { + this.recentTime = recentTime; + } + + public String getCreateTime() { + return createTime; + } + + public void setCreateTime(String createTime) { + this.createTime = createTime; + } + + public Map toMap(){ + return JSON.parseObject(JSON.toJSONString(this), new TypeReference>() {}); + } +} diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/MessageNodeInfo.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/MessageNodeInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..010a36bac4c6fdc898892a308a2238929017c471 --- /dev/null +++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/MessageNodeInfo.java @@ -0,0 +1,66 @@ +package org.smartboot.mqtt.data.persistence.nodeinfo; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import org.smartboot.mqtt.common.message.MqttPublishMessage; + +import java.io.Serializable; +import java.util.Base64; + +public class MessageNodeInfo implements Serializable { + /** + * 负载数据 + */ + private final byte[] payload; + /** + * 主题 + */ + private final String topic; + + private final boolean retained; + + /** + * 消息存储时间 + */ + private final long createTime = System.currentTimeMillis(); + + public MessageNodeInfo(MqttPublishMessage message) { + this.payload = message.getPayload().getPayload(); + this.retained = message.getFixedHeader().isRetain(); + this.topic = message.getVariableHeader().getTopicName(); + } + + public byte[] getPayload() { + return payload; + } + + public String getTopic() { + return topic; + } + + public boolean isRetained() { + return retained; + } + + public long getCreateTime() { + return createTime; + } + + public String toString(boolean base64){ + if (!base64) return toString(); + // 将对象转换为JSON字符串 + String jsonString = JSON.toJSONString(this); + JSONObject json = JSONObject.parseObject(jsonString); + // 对payload进行base64编码 + String payload = json.getString("payload"); + String encodedPayload = Base64.getEncoder().encodeToString(payload.getBytes()); + json.put("payload", encodedPayload); + return json.toJSONString(); + } + + @Override + public String toString() { + return JSONObject.toJSONString(this); + } + +} diff --git a/smart-mqtt-data-persistence/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin b/smart-mqtt-data-persistence/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin new file mode 100644 index 0000000000000000000000000000000000000000..02f209379a7c536c70ec312e927860135ad8bdc9 --- /dev/null +++ b/smart-mqtt-data-persistence/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin @@ -0,0 +1 @@ +org.smartboot.mqtt.data.persistence.impl.RedisPlugin \ No newline at end of file