diff --git a/smart-mqtt-data-persistence/pom.xml b/smart-mqtt-data-persistence/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..1e7bd7ba1f055c724a1d32bc26deadba5e389556
--- /dev/null
+++ b/smart-mqtt-data-persistence/pom.xml
@@ -0,0 +1,70 @@
+
+
+ smart-mqtt
+ org.smartboot.mqtt
+ 0.19
+ ../pom.xml
+
+ 4.0.0
+
+ smart-mqtt-data-persistence
+ jar
+
+ smart-mqtt-data-persistence
+
+
+ UTF-8
+
+
+
+
+ org.smartboot.mqtt
+ smart-mqtt-broker
+
+
+
+ redis.clients
+ jedis
+ 4.3.1
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.10.1
+
+ 1.8
+ 1.8
+ false
+
+
+
+ maven-shade-plugin
+ 3.2.4
+
+
+ package
+
+ shade
+
+
+ false
+
+
+
+ META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin
+
+
+
+
+
+
+
+
+
+
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java
new file mode 100644
index 0000000000000000000000000000000000000000..7a198db24d97b8f55ee58555c12dde0e9615c983
--- /dev/null
+++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java
@@ -0,0 +1,18 @@
+package org.smartboot.mqtt.data.persistence;
+
+
+import org.smartboot.mqtt.broker.plugin.Plugin;
+import org.smartboot.mqtt.data.persistence.config.DataSourcePluginConfig;
+
+
+public class DataPersistPlugin extends Plugin {
+ private DataSourcePluginConfig config;
+
+ public void setConfig(DataSourcePluginConfig config) {
+ this.config = config;
+ }
+
+ public DataSourcePluginConfig getConfig() {
+ return config;
+ }
+}
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/DataSourcePluginConfig.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/DataSourcePluginConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..0514107a9bcc5c025b3fbe0cad41227a8275709c
--- /dev/null
+++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/DataSourcePluginConfig.java
@@ -0,0 +1,37 @@
+package org.smartboot.mqtt.data.persistence.config;
+
+
+import org.smartboot.mqtt.broker.config.PluginConfig;
+
+public class DataSourcePluginConfig extends PluginConfig {
+ private String password;
+
+ private int timeout = 1000;
+
+ private boolean base64 = false;
+
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public boolean isBase64() {
+ return base64;
+ }
+
+ public void setBase64(boolean base64) {
+ this.base64 = base64;
+ }
+}
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/handler/BrokerHandler.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/handler/BrokerHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..08823065eac9ebc29c7614282ee40966f1d2ca43
--- /dev/null
+++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/handler/BrokerHandler.java
@@ -0,0 +1,42 @@
+package org.smartboot.mqtt.data.persistence.handler;
+
+import com.sun.management.OperatingSystemMXBean;
+import org.smartboot.mqtt.broker.BrokerConfigure;
+import org.smartboot.mqtt.broker.BrokerContext;
+import org.smartboot.mqtt.broker.BrokerRuntime;
+import org.smartboot.mqtt.data.persistence.nodeinfo.BrokerNodeInfo;
+
+import java.lang.management.ManagementFactory;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+public class BrokerHandler {
+
+ private static final String MQTT_PREFIX = "smart-mqtt@";
+
+
+ public static Map handler(BrokerContext brokerContext){
+ BrokerRuntime brokerRuntime = brokerContext.getRuntime();
+ BrokerNodeInfo brokerNodeInfo = new BrokerNodeInfo();
+ // 名字设置
+ brokerNodeInfo.setName(MQTT_PREFIX + brokerContext.getBrokerConfigure().getName());
+ // 设置Broker版本号
+ brokerNodeInfo.setVersion(BrokerConfigure.VERSION);
+ // 设置ip地址
+ brokerNodeInfo.setIpAddress(brokerContext.getBrokerConfigure().getHost());
+ // 设置Pid
+ brokerNodeInfo.setPid(brokerRuntime.getPid());
+ // 设置内存
+ brokerNodeInfo.setMemory(String.valueOf((int) ((Runtime.getRuntime().totalMemory()) * 100.0 / (Runtime.getRuntime().maxMemory()))));
+ // 设置cpu资源
+ OperatingSystemMXBean systemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
+ brokerNodeInfo.setCpu(String.valueOf((int) (systemMXBean.getSystemCpuLoad() * 100)));
+ // 最后一次启动时间
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String currentDateTimeString = dateFormat.format(new Date());
+ brokerNodeInfo.setRecentTime(currentDateTimeString);
+
+ return brokerNodeInfo.toMap();
+ }
+}
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java
new file mode 100644
index 0000000000000000000000000000000000000000..e4d35420d94553d00100df3504694bf9516e571d
--- /dev/null
+++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java
@@ -0,0 +1,97 @@
+package org.smartboot.mqtt.data.persistence.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.smartboot.mqtt.broker.BrokerContext;
+import org.smartboot.mqtt.broker.eventbus.ServerEventType;
+import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus;
+import org.smartboot.mqtt.broker.plugin.PluginException;
+import org.smartboot.mqtt.common.eventbus.EventBus;
+import org.smartboot.mqtt.data.persistence.DataPersistPlugin;
+import org.smartboot.mqtt.data.persistence.config.DataSourcePluginConfig;
+import org.smartboot.mqtt.data.persistence.handler.BrokerHandler;
+import org.smartboot.mqtt.data.persistence.nodeinfo.MessageNodeInfo;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class RedisPlugin extends DataPersistPlugin {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RedisPlugin.class);
+ private static final String CONFIG_JSON_PATH = "$['plugins']['redis-bridge'][0]";
+ private static final String CRTEATE_TIME_FIELD_NAME = "createTime";
+ private static final String RECENT_TIME_FIELD_NAME = "recentTime";
+ private static final String DEFALUT_REDIS_BROKER_KEY_FIELD = "name";
+ private static final Lock lock = new ReentrantLock();
+ private static JedisPool jedisPool = null;
+
+ @Override
+ protected void initPlugin(BrokerContext brokerContext) {
+ DataSourcePluginConfig config = brokerContext.parseConfig(CONFIG_JSON_PATH, DataSourcePluginConfig.class);
+ if (config == null) {
+ LOGGER.error("config maybe error, parse fail!");
+ throw new PluginException("start DataPersistRedisPlugin exception");
+ }
+ this.setConfig(config);
+
+ // 完成redis线程池的
+ if (jedisPool == null) {
+ lock.lock();
+ try {
+ if (jedisPool == null){
+ JedisPoolConfig poolConfig = new JedisPoolConfig();
+ poolConfig.setMaxTotal(10); // 最大连接数
+ poolConfig.setMaxIdle(2); // 最大空闲连接数
+ poolConfig.setTestOnReturn(false);
+ poolConfig.setTestOnBorrow(true); // 检查连接可用性, 确保获取的redis实例可用
+ poolConfig.setTestOnCreate(false);
+ jedisPool = new JedisPool(poolConfig, config.getHost(), config.getPort(),config.getTimeout(), config.getPassword());
+ LOGGER.info("redisPoll create success");
+ }
+ }finally {
+ lock.unlock();
+ }
+ }
+ EventBus eventBus = brokerContext.getEventBus();
+ // 事件监听总线 监听broker创建
+ eventBus.subscribe(ServerEventType.BROKER_STARTED, (eventType, subscriber) -> {
+ Jedis resource = jedisPool.getResource();
+ Map handler = BrokerHandler.handler(brokerContext);
+ String result = resource.hget(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD), CRTEATE_TIME_FIELD_NAME);
+ if (result == null){
+ handler.put(CRTEATE_TIME_FIELD_NAME,handler.get(RECENT_TIME_FIELD_NAME));
+ }
+ resource.hmset(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD),handler);
+ jedisPool.returnResource(resource);
+ });
+
+ // 消息总线监听
+ MessageBus messageBus = brokerContext.getMessageBus();
+ messageBus.consumer((brokerContext1, publishMessage) -> {
+ Jedis resource = jedisPool.getResource();
+ String message = new MessageNodeInfo(publishMessage).toString(this.getConfig().isBase64());
+ resource.lpush(brokerContext1.getBrokerConfigure().getName() + ":" + publishMessage.getVariableHeader().getTopicName(),message);
+ jedisPool.returnResource(resource);
+ });
+ }
+
+
+
+ @Override
+ protected void destroyPlugin() {
+ lock.lock();
+ try {
+ if (jedisPool != null) {
+ jedisPool.close();
+ // 防止内存泄漏
+ jedisPool = null;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/BrokerNodeInfo.java b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/BrokerNodeInfo.java
new file mode 100644
index 0000000000000000000000000000000000000000..907d5fe0bb8d0e054f9bd2809be72b9033d7854c
--- /dev/null
+++ b/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/BrokerNodeInfo.java
@@ -0,0 +1,118 @@
+package org.smartboot.mqtt.data.persistence.nodeinfo;
+
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.TypeReference;
+
+import java.util.Map;
+
+public class BrokerNodeInfo {
+ /**
+ * 节点名称
+ */
+ private String name;
+
+
+ /**
+ * broker版本
+ */
+ private String version;
+ /**
+ * Broker IP地址
+ */
+ private String ipAddress;
+
+
+ /**
+ * 服务进程
+ */
+ private String pid;
+
+ /**
+ * 内存使用率
+ */
+ private String memory;
+
+ /**
+ * CPU使用率
+ */
+ private String cpu;
+
+ /**
+ * 最近启动时间
+ */
+ private String recentTime;
+
+ /**
+ * broker创建时间
+ */
+ private String createTime;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getIpAddress() {
+ return ipAddress;
+ }
+
+ public void setIpAddress(String ipAddress) {
+ this.ipAddress = ipAddress;
+ }
+
+ public String getPid() {
+ return pid;
+ }
+
+ public void setPid(String pid) {
+ this.pid = pid;
+ }
+
+ public String getMemory() {
+ return memory;
+ }
+
+ public void setMemory(String memory) {
+ this.memory = memory;
+ }
+
+ public String getCpu() {
+ return cpu;
+ }
+
+ public void setCpu(String cpu) {
+ this.cpu = cpu;
+ }
+
+ public String getRecentTime() {
+ return recentTime;
+ }
+
+ public void setRecentTime(String recentTime) {
+ this.recentTime = recentTime;
+ }
+
+ public String getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(String createTime) {
+ this.createTime = createTime;
+ }
+
+ public Map toMap(){
+ return JSON.parseObject(JSON.toJSONString(this), new TypeReference