diff --git a/plugins/pom.xml b/plugins/pom.xml
index 68fee07d68adb2ab0a414cf5cf5a6fdd28a2c49b..fe5105ccd833dcda85ded2424653890e83d7919d 100644
--- a/plugins/pom.xml
+++ b/plugins/pom.xml
@@ -32,18 +32,32 @@
org.smartboot.mqtt
- redis-bridge-plugin
- ${parent.version}
+ smart-mqtt-data-persistence
+ ${project.parent.version}
+
+
- redis.clients
- jedis
- 4.3.1
+ io.lettuce
+ lettuce-core
+ 5.1.8.RELEASE
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.0.0
+
+
+
+ cn.hutool
+ hutool-all
+ 5.8.10
- redis-bridge-plugin
+ smart-mqtt-data-persistence
\ No newline at end of file
diff --git a/plugins/redis-bridge-plugin/pom.xml b/plugins/redis-bridge-plugin/pom.xml
deleted file mode 100644
index 592d095faac2e5d27ae1fb91a5d885e17934b381..0000000000000000000000000000000000000000
--- a/plugins/redis-bridge-plugin/pom.xml
+++ /dev/null
@@ -1,22 +0,0 @@
-
-
- plugins
- org.smartboot.mqtt
- 0.30
- ../pom.xml
-
- 4.0.0
-
- redis-bridge-plugin
-
- redis-bridge-plugin
-
-
-
- redis.clients
- jedis
-
-
-
-
diff --git a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/Config.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/Config.java
deleted file mode 100644
index ee160ec4102eda41c2b33d6975e42453a20bba88..0000000000000000000000000000000000000000
--- a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/Config.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
- *
- * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
- *
- * Enterprise users are required to use this project reasonably
- * and legally in accordance with the AGPL-3.0 open source agreement
- * without special permission from the smartboot organization.
- */
-
-package org.smartboot.mqtt.bridge.redis;
-
-
-class Config {
- private String host;
- private int port;
- private String password;
-
- private int timeout = 1000;
-
- private boolean base64 = false;
-
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public int getTimeout() {
- return timeout;
- }
-
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
-
- public boolean isBase64() {
- return base64;
- }
-
- public void setBase64(boolean base64) {
- this.base64 = base64;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-}
diff --git a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/RedisPlugin.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/RedisPlugin.java
deleted file mode 100644
index 53c3af8ff1048a89823bfd6ec554510965153b8d..0000000000000000000000000000000000000000
--- a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/RedisPlugin.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
- *
- * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
- *
- * Enterprise users are required to use this project reasonably
- * and legally in accordance with the AGPL-3.0 open source agreement
- * without special permission from the smartboot organization.
- */
-
-package org.smartboot.mqtt.bridge.redis;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.smartboot.mqtt.bridge.redis.handler.BrokerHandler;
-import org.smartboot.mqtt.broker.BrokerContext;
-import org.smartboot.mqtt.broker.eventbus.ServerEventType;
-import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus;
-import org.smartboot.mqtt.broker.plugin.Plugin;
-import org.smartboot.mqtt.broker.plugin.PluginException;
-import org.smartboot.mqtt.common.eventbus.EventBus;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisPoolConfig;
-
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-public class RedisPlugin extends Plugin {
- private static final Logger LOGGER = LoggerFactory.getLogger(RedisPlugin.class);
- private static final String CONFIG_JSON_PATH = "$['plugins']['redis-bridge'][0]";
- private static final String CRTEATE_TIME_FIELD_NAME = "createTime";
- private static final String RECENT_TIME_FIELD_NAME = "recentTime";
- private static final String DEFALUT_REDIS_BROKER_KEY_FIELD = "name";
- private static final Lock lock = new ReentrantLock();
- private static JedisPool jedisPool = null;
-
- private Config config;
-
- @Override
- protected void initPlugin(BrokerContext brokerContext) {
- config = brokerContext.parseConfig(CONFIG_JSON_PATH, Config.class);
- if (config == null) {
- LOGGER.error("config maybe error, parse fail!");
- throw new PluginException("start DataPersistRedisPlugin exception");
- }
-
- // 完成redis线程池的
- if (jedisPool == null) {
- lock.lock();
- try {
- if (jedisPool == null) {
- JedisPoolConfig poolConfig = new JedisPoolConfig();
- poolConfig.setMaxTotal(10); // 最大连接数
- poolConfig.setMaxIdle(2); // 最大空闲连接数
- poolConfig.setTestOnReturn(false);
- poolConfig.setTestOnBorrow(true); // 检查连接可用性, 确保获取的redis实例可用
- poolConfig.setTestOnCreate(false);
- jedisPool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword());
- LOGGER.info("redisPoll create success");
- }
- } finally {
- lock.unlock();
- }
- }
- EventBus eventBus = brokerContext.getEventBus();
- // 事件监听总线 监听broker创建
- eventBus.subscribe(ServerEventType.BROKER_STARTED, (eventType, subscriber) -> {
- Jedis resource = jedisPool.getResource();
- Map handler = BrokerHandler.handler(brokerContext);
- String result = resource.hget(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD), CRTEATE_TIME_FIELD_NAME);
- if (result == null) {
- handler.put(CRTEATE_TIME_FIELD_NAME, handler.get(RECENT_TIME_FIELD_NAME));
- }
- resource.hmset(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD), handler);
- jedisPool.returnResource(resource);
- });
-
- // 消息总线监听
- MessageBus messageBus = brokerContext.getMessageBus();
-// messageBus.consumer((brokerContext1, publishMessage) -> {
-// Jedis resource = jedisPool.getResource();
-// String message = new MessageNodeInfo(publishMessage).toString(config.isBase64());
-// resource.lpush(brokerContext1.getBrokerConfigure().getName() + ":" + publishMessage.getVariableHeader().getTopicName(), message);
-// jedisPool.returnResource(resource);
-// });
- }
-
-
- @Override
- protected void destroyPlugin() {
- lock.lock();
- try {
- if (jedisPool != null) {
- jedisPool.close();
- // 防止内存泄漏
- jedisPool = null;
- }
- } finally {
- lock.unlock();
- }
- }
-}
diff --git a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/handler/BrokerHandler.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/handler/BrokerHandler.java
deleted file mode 100644
index 20b6f04c796bfea599c5d58fa6ae50d34a5fbba0..0000000000000000000000000000000000000000
--- a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/handler/BrokerHandler.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
- *
- * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
- *
- * Enterprise users are required to use this project reasonably
- * and legally in accordance with the AGPL-3.0 open source agreement
- * without special permission from the smartboot organization.
- */
-
-package org.smartboot.mqtt.bridge.redis.handler;
-
-import com.sun.management.OperatingSystemMXBean;
-import org.smartboot.mqtt.bridge.redis.nodeinfo.BrokerNodeInfo;
-import org.smartboot.mqtt.broker.BrokerConfigure;
-import org.smartboot.mqtt.broker.BrokerContext;
-
-import java.lang.management.ManagementFactory;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Map;
-
-public class BrokerHandler {
-
- private static final String MQTT_PREFIX = "smart-mqtt@";
-
-
- public static Map handler(BrokerContext brokerContext) {
- BrokerNodeInfo brokerNodeInfo = new BrokerNodeInfo();
- // 名字设置
- brokerNodeInfo.setName(MQTT_PREFIX + brokerContext.getBrokerConfigure().getNodeId());
- // 设置Broker版本号
- brokerNodeInfo.setVersion(BrokerConfigure.VERSION);
- // 设置ip地址
- brokerNodeInfo.setIpAddress(brokerContext.getBrokerConfigure().getHost());
- // 设置内存
- brokerNodeInfo.setMemory(String.valueOf((int) ((Runtime.getRuntime().totalMemory()) * 100.0 / (Runtime.getRuntime().maxMemory()))));
- // 设置cpu资源
- OperatingSystemMXBean systemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
- brokerNodeInfo.setCpu(String.valueOf((int) (systemMXBean.getSystemCpuLoad() * 100)));
- // 最后一次启动时间
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String currentDateTimeString = dateFormat.format(new Date());
- brokerNodeInfo.setRecentTime(currentDateTimeString);
-
- return brokerNodeInfo.toMap();
- }
-}
diff --git a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/BrokerNodeInfo.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/BrokerNodeInfo.java
deleted file mode 100644
index 1dc025af6be5f100876ae20a8223dd3510d4e6bc..0000000000000000000000000000000000000000
--- a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/BrokerNodeInfo.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (C) [2022] smartboot [zhengjunweimail@163.com]
- *
- * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。
- *
- * Enterprise users are required to use this project reasonably
- * and legally in accordance with the AGPL-3.0 open source agreement
- * without special permission from the smartboot organization.
- */
-
-package org.smartboot.mqtt.bridge.redis.nodeinfo;
-
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.TypeReference;
-
-import java.util.Map;
-
-public class BrokerNodeInfo {
- /**
- * 节点名称
- */
- private String name;
-
-
- /**
- * broker版本
- */
- private String version;
- /**
- * Broker IP地址
- */
- private String ipAddress;
-
-
- /**
- * 服务进程
- */
- private String pid;
-
- /**
- * 内存使用率
- */
- private String memory;
-
- /**
- * CPU使用率
- */
- private String cpu;
-
- /**
- * 最近启动时间
- */
- private String recentTime;
-
- /**
- * broker创建时间
- */
- private String createTime;
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getVersion() {
- return version;
- }
-
- public void setVersion(String version) {
- this.version = version;
- }
-
- public String getIpAddress() {
- return ipAddress;
- }
-
- public void setIpAddress(String ipAddress) {
- this.ipAddress = ipAddress;
- }
-
- public String getPid() {
- return pid;
- }
-
- public void setPid(String pid) {
- this.pid = pid;
- }
-
- public String getMemory() {
- return memory;
- }
-
- public void setMemory(String memory) {
- this.memory = memory;
- }
-
- public String getCpu() {
- return cpu;
- }
-
- public void setCpu(String cpu) {
- this.cpu = cpu;
- }
-
- public String getRecentTime() {
- return recentTime;
- }
-
- public void setRecentTime(String recentTime) {
- this.recentTime = recentTime;
- }
-
- public String getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(String createTime) {
- this.createTime = createTime;
- }
-
- public Map toMap(){
- return JSON.parseObject(JSON.toJSONString(this), new TypeReference