diff --git a/plugins/pom.xml b/plugins/pom.xml index 68fee07d68adb2ab0a414cf5cf5a6fdd28a2c49b..fe5105ccd833dcda85ded2424653890e83d7919d 100644 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -32,18 +32,32 @@ org.smartboot.mqtt - redis-bridge-plugin - ${parent.version} + smart-mqtt-data-persistence + ${project.parent.version} + + - redis.clients - jedis - 4.3.1 + io.lettuce + lettuce-core + 5.1.8.RELEASE + + + + org.apache.kafka + kafka-clients + 3.0.0 + + + + cn.hutool + hutool-all + 5.8.10 - redis-bridge-plugin + smart-mqtt-data-persistence \ No newline at end of file diff --git a/plugins/redis-bridge-plugin/pom.xml b/plugins/redis-bridge-plugin/pom.xml deleted file mode 100644 index 592d095faac2e5d27ae1fb91a5d885e17934b381..0000000000000000000000000000000000000000 --- a/plugins/redis-bridge-plugin/pom.xml +++ /dev/null @@ -1,22 +0,0 @@ - - - plugins - org.smartboot.mqtt - 0.30 - ../pom.xml - - 4.0.0 - - redis-bridge-plugin - - redis-bridge-plugin - - - - redis.clients - jedis - - - - diff --git a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/Config.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/Config.java deleted file mode 100644 index ee160ec4102eda41c2b33d6975e42453a20bba88..0000000000000000000000000000000000000000 --- a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/Config.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.bridge.redis; - - -class Config { - private String host; - private int port; - private String password; - - private int timeout = 1000; - - private boolean base64 = false; - - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public int getTimeout() { - return timeout; - } - - public void setTimeout(int timeout) { - this.timeout = timeout; - } - - public boolean isBase64() { - return base64; - } - - public void setBase64(boolean base64) { - this.base64 = base64; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } -} diff --git a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/RedisPlugin.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/RedisPlugin.java deleted file mode 100644 index 53c3af8ff1048a89823bfd6ec554510965153b8d..0000000000000000000000000000000000000000 --- a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/RedisPlugin.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.bridge.redis; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.smartboot.mqtt.bridge.redis.handler.BrokerHandler; -import org.smartboot.mqtt.broker.BrokerContext; -import org.smartboot.mqtt.broker.eventbus.ServerEventType; -import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; -import org.smartboot.mqtt.broker.plugin.Plugin; -import org.smartboot.mqtt.broker.plugin.PluginException; -import org.smartboot.mqtt.common.eventbus.EventBus; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; - -import java.util.Map; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - -public class RedisPlugin extends Plugin { - private static final Logger LOGGER = LoggerFactory.getLogger(RedisPlugin.class); - private static final String CONFIG_JSON_PATH = "$['plugins']['redis-bridge'][0]"; - private static final String CRTEATE_TIME_FIELD_NAME = "createTime"; - private static final String RECENT_TIME_FIELD_NAME = "recentTime"; - private static final String DEFALUT_REDIS_BROKER_KEY_FIELD = "name"; - private static final Lock lock = new ReentrantLock(); - private static JedisPool jedisPool = null; - - private Config config; - - @Override - protected void initPlugin(BrokerContext brokerContext) { - config = brokerContext.parseConfig(CONFIG_JSON_PATH, Config.class); - if (config == null) { - LOGGER.error("config maybe error, parse fail!"); - throw new PluginException("start DataPersistRedisPlugin exception"); - } - - // 完成redis线程池的 - if (jedisPool == null) { - lock.lock(); - try { - if (jedisPool == null) { - JedisPoolConfig poolConfig = new JedisPoolConfig(); - poolConfig.setMaxTotal(10); // 最大连接数 - poolConfig.setMaxIdle(2); // 最大空闲连接数 - poolConfig.setTestOnReturn(false); - poolConfig.setTestOnBorrow(true); // 检查连接可用性, 确保获取的redis实例可用 - poolConfig.setTestOnCreate(false); - jedisPool = new JedisPool(poolConfig, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword()); - LOGGER.info("redisPoll create success"); - } - } finally { - lock.unlock(); - } - } - EventBus eventBus = brokerContext.getEventBus(); - // 事件监听总线 监听broker创建 - eventBus.subscribe(ServerEventType.BROKER_STARTED, (eventType, subscriber) -> { - Jedis resource = jedisPool.getResource(); - Map handler = BrokerHandler.handler(brokerContext); - String result = resource.hget(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD), CRTEATE_TIME_FIELD_NAME); - if (result == null) { - handler.put(CRTEATE_TIME_FIELD_NAME, handler.get(RECENT_TIME_FIELD_NAME)); - } - resource.hmset(handler.get(DEFALUT_REDIS_BROKER_KEY_FIELD), handler); - jedisPool.returnResource(resource); - }); - - // 消息总线监听 - MessageBus messageBus = brokerContext.getMessageBus(); -// messageBus.consumer((brokerContext1, publishMessage) -> { -// Jedis resource = jedisPool.getResource(); -// String message = new MessageNodeInfo(publishMessage).toString(config.isBase64()); -// resource.lpush(brokerContext1.getBrokerConfigure().getName() + ":" + publishMessage.getVariableHeader().getTopicName(), message); -// jedisPool.returnResource(resource); -// }); - } - - - @Override - protected void destroyPlugin() { - lock.lock(); - try { - if (jedisPool != null) { - jedisPool.close(); - // 防止内存泄漏 - jedisPool = null; - } - } finally { - lock.unlock(); - } - } -} diff --git a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/handler/BrokerHandler.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/handler/BrokerHandler.java deleted file mode 100644 index 20b6f04c796bfea599c5d58fa6ae50d34a5fbba0..0000000000000000000000000000000000000000 --- a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/handler/BrokerHandler.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.bridge.redis.handler; - -import com.sun.management.OperatingSystemMXBean; -import org.smartboot.mqtt.bridge.redis.nodeinfo.BrokerNodeInfo; -import org.smartboot.mqtt.broker.BrokerConfigure; -import org.smartboot.mqtt.broker.BrokerContext; - -import java.lang.management.ManagementFactory; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Map; - -public class BrokerHandler { - - private static final String MQTT_PREFIX = "smart-mqtt@"; - - - public static Map handler(BrokerContext brokerContext) { - BrokerNodeInfo brokerNodeInfo = new BrokerNodeInfo(); - // 名字设置 - brokerNodeInfo.setName(MQTT_PREFIX + brokerContext.getBrokerConfigure().getNodeId()); - // 设置Broker版本号 - brokerNodeInfo.setVersion(BrokerConfigure.VERSION); - // 设置ip地址 - brokerNodeInfo.setIpAddress(brokerContext.getBrokerConfigure().getHost()); - // 设置内存 - brokerNodeInfo.setMemory(String.valueOf((int) ((Runtime.getRuntime().totalMemory()) * 100.0 / (Runtime.getRuntime().maxMemory())))); - // 设置cpu资源 - OperatingSystemMXBean systemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); - brokerNodeInfo.setCpu(String.valueOf((int) (systemMXBean.getSystemCpuLoad() * 100))); - // 最后一次启动时间 - SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - String currentDateTimeString = dateFormat.format(new Date()); - brokerNodeInfo.setRecentTime(currentDateTimeString); - - return brokerNodeInfo.toMap(); - } -} diff --git a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/BrokerNodeInfo.java b/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/BrokerNodeInfo.java deleted file mode 100644 index 1dc025af6be5f100876ae20a8223dd3510d4e6bc..0000000000000000000000000000000000000000 --- a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/BrokerNodeInfo.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.bridge.redis.nodeinfo; - - -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.TypeReference; - -import java.util.Map; - -public class BrokerNodeInfo { - /** - * 节点名称 - */ - private String name; - - - /** - * broker版本 - */ - private String version; - /** - * Broker IP地址 - */ - private String ipAddress; - - - /** - * 服务进程 - */ - private String pid; - - /** - * 内存使用率 - */ - private String memory; - - /** - * CPU使用率 - */ - private String cpu; - - /** - * 最近启动时间 - */ - private String recentTime; - - /** - * broker创建时间 - */ - private String createTime; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; - } - - public String getIpAddress() { - return ipAddress; - } - - public void setIpAddress(String ipAddress) { - this.ipAddress = ipAddress; - } - - public String getPid() { - return pid; - } - - public void setPid(String pid) { - this.pid = pid; - } - - public String getMemory() { - return memory; - } - - public void setMemory(String memory) { - this.memory = memory; - } - - public String getCpu() { - return cpu; - } - - public void setCpu(String cpu) { - this.cpu = cpu; - } - - public String getRecentTime() { - return recentTime; - } - - public void setRecentTime(String recentTime) { - this.recentTime = recentTime; - } - - public String getCreateTime() { - return createTime; - } - - public void setCreateTime(String createTime) { - this.createTime = createTime; - } - - public Map toMap(){ - return JSON.parseObject(JSON.toJSONString(this), new TypeReference>() {}); - } -} diff --git a/plugins/redis-bridge-plugin/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin b/plugins/redis-bridge-plugin/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin deleted file mode 100644 index 993ce6eab42a60714b34ba37e22c168bc9707077..0000000000000000000000000000000000000000 --- a/plugins/redis-bridge-plugin/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin +++ /dev/null @@ -1 +0,0 @@ -org.smartboot.mqtt.bridge.redis.RedisPlugin \ No newline at end of file diff --git a/plugins/smart-mqtt-data-persistence/pom.xml b/plugins/smart-mqtt-data-persistence/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..b9ed23e7ce199b4ca3b5587791030f6cd953c597 --- /dev/null +++ b/plugins/smart-mqtt-data-persistence/pom.xml @@ -0,0 +1,79 @@ + + + plugins + org.smartboot.mqtt + 0.30 + ../pom.xml + + 4.0.0 + + smart-mqtt-data-persistence + jar + + smart-mqtt-data-persistence + + + UTF-8 + + + + + org.smartboot.mqtt + smart-mqtt-broker + + + + io.lettuce + lettuce-core + + + + cn.hutool + hutool-all + + + + org.apache.kafka + kafka-clients + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.10.1 + + 1.8 + 1.8 + false + + + + maven-shade-plugin + 3.2.4 + + + package + + shade + + + false + + + + META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin + + + + + + + + + + diff --git a/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java new file mode 100644 index 0000000000000000000000000000000000000000..7a78d4a75f4f2b4fd6af4b66c9cbd9e70fae75f5 --- /dev/null +++ b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/DataPersistPlugin.java @@ -0,0 +1,27 @@ +package org.smartboot.mqtt.data.persistence; +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.plugin.Plugin; + +/** +* @Description: 数据持久化插件约定 + * @Author: learnhope + * @Date: 2023/9/18 + */ +public abstract class DataPersistPlugin extends Plugin { + private T config; + + public void setConfig(T config) { + this.config = config; + } + + public T getConfig() { + return config; + } + @Override + protected void initPlugin(BrokerContext brokerContext) { + T config = connect(brokerContext); + listenAndPushMessage(brokerContext, config); + } + protected abstract T connect(BrokerContext brokerContext); + protected abstract void listenAndPushMessage(BrokerContext brokerContext, T config); +} diff --git a/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/PluginConfig.java b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/PluginConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..90813918af3d33a3e0dfd9026d66709a2d6b6c58 --- /dev/null +++ b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/PluginConfig.java @@ -0,0 +1,29 @@ +package org.smartboot.mqtt.data.persistence.config; + +/** +* @Description: 通用持久化插件配置信息 + * @Author: learnhope + * @Date: 2023/9/18 + */ +public class PluginConfig { + // host地址 + private String host; + // base64编码 + private boolean base64 = false; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public boolean isBase64() { + return base64; + } + + public void setBase64(boolean base64) { + this.base64 = base64; + } +} diff --git a/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/imp/KafkaPluginConfig.java b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/imp/KafkaPluginConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..1f15418852ab029633ef9abe30b7189763de5126 --- /dev/null +++ b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/imp/KafkaPluginConfig.java @@ -0,0 +1,67 @@ +package org.smartboot.mqtt.data.persistence.config.imp; + +import org.smartboot.mqtt.data.persistence.config.PluginConfig; + +/** +* @Description: kakfa特有配置 + * @Author: learnhope + * @Date: 2023/9/18 + */ +public class KafkaPluginConfig extends PluginConfig { + + // 确认机制 + private String acks = "all"; + + // 重试次数 + private int retries = 0; + + // 批次大小 + private int batchSize = 16384; + + // 延迟时间 + private int lingerMs = 1; + + // 缓冲区大小 + private int buffer = 1024; + + public String getAcks() { + return acks; + } + + public void setAcks(String acks) { + this.acks = acks; + } + + public int getRetries() { + return retries; + } + + public void setRetries(int retries) { + this.retries = retries; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public int getLingerMs() { + return lingerMs; + } + + public void setLingerMs(int lingerMs) { + this.lingerMs = lingerMs; + } + + public int getBuffer() { + return buffer; + } + + public void setBuffer(int buffer) { + this.buffer = buffer; + } + +} diff --git a/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/imp/RedisPluginConfig.java b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/imp/RedisPluginConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..e53057f81dba46ac60b5d0aa5dd59075e6eac49d --- /dev/null +++ b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/config/imp/RedisPluginConfig.java @@ -0,0 +1,39 @@ +package org.smartboot.mqtt.data.persistence.config.imp; + + +import org.smartboot.mqtt.data.persistence.config.PluginConfig; + +/** +* @Description: Redis特有配置 + * @Author: learnhope + * @Date: 2023/9/19 + */ +public class RedisPluginConfig extends PluginConfig { + private String password; + private int timeout = 1000; + private boolean simple = true; + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public boolean isSimple() { + return simple; + } + + public void setSimple(boolean simple) { + this.simple = simple; + } +} diff --git a/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/KafkaPlugin.java b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/KafkaPlugin.java new file mode 100644 index 0000000000000000000000000000000000000000..193f82828fcf24311dbb2af423d391365f4d72b4 --- /dev/null +++ b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/KafkaPlugin.java @@ -0,0 +1,80 @@ +package org.smartboot.mqtt.data.persistence.impl; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; +import org.smartboot.mqtt.broker.plugin.PluginException; +import org.smartboot.mqtt.data.persistence.DataPersistPlugin; +import org.smartboot.mqtt.data.persistence.config.imp.KafkaPluginConfig; +import org.smartboot.mqtt.data.persistence.nodeinfo.MessageNodeInfo; +import org.smartboot.mqtt.data.persistence.utils.StrUtils; + +import java.util.Properties; +import java.util.concurrent.Future; + +/** +* @Description: KafkaPlugin插件 + * @Author: learnhope + * @Date: 2023/9/19 + */ +public class KafkaPlugin extends DataPersistPlugin { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPlugin.class); + private static final String CONFIG_JSON_PATH = "$['plugins']['kafka-bridge'][0]"; + private static StrUtils StrUtil = new StrUtils<>(); + private static final Properties KAFKAPROPS = new Properties(); + private KafkaProducer producer; + @Override + protected KafkaPluginConfig connect(BrokerContext brokerContext) { + KafkaPluginConfig config = brokerContext.parseConfig(CONFIG_JSON_PATH, KafkaPluginConfig.class); + if (config == null) { + LOGGER.error("config maybe error, parse fail!"); + throw new PluginException("start DataPersistKafkaPlugin exception"); + } + this.setConfig(config); + // 相关配置 + KAFKAPROPS.put("bootstrap.servers", config.getHost()); + KAFKAPROPS.put("acks", config.getAcks()); + KAFKAPROPS.put("retries", config.getRetries()); + KAFKAPROPS.put("batch.size", config.getBatchSize()); + KAFKAPROPS.put("linger.ms", config.getLingerMs()); + KAFKAPROPS.put("buffer.memory", config.getBuffer()); + KAFKAPROPS.put("key.serializer", + "org.apache.kafka.common.serialization.IntegerSerializer"); + KAFKAPROPS.put("value.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer(KAFKAPROPS); + return config; + } + @Override + protected void listenAndPushMessage(BrokerContext brokerContext, KafkaPluginConfig config) { + MessageBus messageBus = brokerContext.getMessageBus(); + messageBus.consumer(busMessage -> { + MessageNodeInfo messageNodeInfo = new MessageNodeInfo(busMessage); + String message = messageNodeInfo.toString(); + // 完成playload信息base64编码 + if (config.isBase64()){ + message = StrUtil.base64(messageNodeInfo); + } + // 异步发送消息 + ProducerRecord record = new ProducerRecord<>(messageNodeInfo.getTopic(), message); + Future result = producer.send(record, new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + exception.printStackTrace(); + } + } + }); + }); + } + @Override + protected void destroyPlugin() { + producer.flush(); + producer.close(); + } +} diff --git a/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java new file mode 100644 index 0000000000000000000000000000000000000000..b3a219a9007d7b08fcf233aaea5267e425923c64 --- /dev/null +++ b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/impl/RedisPlugin.java @@ -0,0 +1,88 @@ +package org.smartboot.mqtt.data.persistence.impl; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.smartboot.mqtt.broker.BrokerContext; +import org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus; +import org.smartboot.mqtt.broker.plugin.PluginException; +import org.smartboot.mqtt.data.persistence.DataPersistPlugin; +import org.smartboot.mqtt.data.persistence.config.imp.RedisPluginConfig; +import org.smartboot.mqtt.data.persistence.nodeinfo.MessageNodeInfo; +import org.smartboot.mqtt.data.persistence.utils.StrUtils; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.atomic.AtomicInteger; + + +public class RedisPlugin extends DataPersistPlugin { + private static final Logger LOGGER = LoggerFactory.getLogger(RedisPlugin.class); + private static final String CONFIG_JSON_PATH = "$['plugins']['redis-bridge'][0]"; + private static final String MESSAGE_PREFIX = "smart-mqtt-message:"; + private static StrUtils StrUtil = new StrUtils<>(); + + private static AtomicInteger atomicInteger = new AtomicInteger(0); + private static StatefulRedisConnection CONNECTION; + private static RedisClient CLIENT; + private static RedisAsyncCommands ASYNC_COMMAND; + + @Override + protected RedisPluginConfig connect(BrokerContext brokerContext) { + RedisPluginConfig config = brokerContext.parseConfig(CONFIG_JSON_PATH, RedisPluginConfig.class); + // 启动加载redis的配置文件 + if (config == null) { + LOGGER.error("config maybe error, parse fail!"); + throw new PluginException("start DataPersistRedisPlugin exception"); + } + this.setConfig(config); + + LOGGER.info("redisPoll create success"); + RedisURI redisUri = RedisURI.builder() + .withHost(config.getHost().split(":")[0]) + .withPort(Integer.parseInt(config.getHost().split(":")[1])) + .withPassword(config.getPassword()) + .withTimeout(Duration.of(config.getTimeout(), ChronoUnit.SECONDS)).build(); + // 创建客户端,建立链接 + CLIENT = RedisClient.create(redisUri); + // 建立链接 + CONNECTION = CLIENT.connect(); + // 构建命令实例 + ASYNC_COMMAND = CONNECTION.async(); + return config; + } + + @Override + protected void listenAndPushMessage(BrokerContext brokerContext, RedisPluginConfig config) { + // 消息总线监听 + MessageBus messageBus = brokerContext.getMessageBus(); + // 客户端发送消息来了,到消息总线调用consumer方法 + long start = System.currentTimeMillis(); + messageBus.consumer(busMessage -> { + // 获得message信息Vo对象 + MessageNodeInfo messageNodeInfo = new MessageNodeInfo(busMessage); + String key = MESSAGE_PREFIX + ":" + busMessage.getTopic(); + String message = messageNodeInfo.toString(); + // 完成playload信息base64编码 + if (config.isBase64()){ + message = StrUtil.base64(messageNodeInfo); + } + // 是否加上随机Id + if (!config.isSimple()){ + message = StrUtil.addId(message); + } + RedisFuture future = ASYNC_COMMAND.zadd(key, messageNodeInfo.getCreateTime(), message); + future.thenAccept(value -> {}); + }); + } + + @Override + protected void destroyPlugin() { + CONNECTION.close(); + CLIENT.shutdown(); + } +} diff --git a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/MessageNodeInfo.java b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/MessageNodeInfo.java similarity index 41% rename from plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/MessageNodeInfo.java rename to plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/MessageNodeInfo.java index cc41fbaa494dc6266ae5ef07f895a8b62bdafcbe..fb069af62f60a2ce1ec943ee645c0954363dc14f 100644 --- a/plugins/redis-bridge-plugin/src/main/java/org/smartboot/mqtt/bridge/redis/nodeinfo/MessageNodeInfo.java +++ b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/nodeinfo/MessageNodeInfo.java @@ -1,22 +1,16 @@ -/* - * Copyright (C) [2022] smartboot [zhengjunweimail@163.com] - * - * 企业用户未经smartboot组织特别许可,需遵循AGPL-3.0开源协议合理合法使用本项目。 - * - * Enterprise users are required to use this project reasonably - * and legally in accordance with the AGPL-3.0 open source agreement - * without special permission from the smartboot organization. - */ - -package org.smartboot.mqtt.bridge.redis.nodeinfo; +package org.smartboot.mqtt.data.persistence.nodeinfo; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import org.smartboot.mqtt.broker.eventbus.messagebus.Message; import org.smartboot.mqtt.common.message.MqttPublishMessage; import java.io.Serializable; -import java.util.Base64; +/** +* @Description: 持久化Vo对象 + * @Author: learnhope + * @Date: 2023/9/18 + */ public class MessageNodeInfo implements Serializable { /** * 负载数据 @@ -34,10 +28,10 @@ public class MessageNodeInfo implements Serializable { */ private final long createTime = System.currentTimeMillis(); - public MessageNodeInfo(MqttPublishMessage message) { - this.payload = message.getPayload().getPayload(); - this.retained = message.getFixedHeader().isRetain(); - this.topic = message.getVariableHeader().getTopicName(); + public MessageNodeInfo(Message message) { + this.payload = message.getPayload(); + this.retained = message.isRetained(); + this.topic = message.getTopic(); } public byte[] getPayload() { @@ -56,18 +50,6 @@ public class MessageNodeInfo implements Serializable { return createTime; } - public String toString(boolean base64){ - if (!base64) return toString(); - // 将对象转换为JSON字符串 - String jsonString = JSON.toJSONString(this); - JSONObject json = JSONObject.parseObject(jsonString); - // 对payload进行base64编码 - String payload = json.getString("payload"); - String encodedPayload = Base64.getEncoder().encodeToString(payload.getBytes()); - json.put("payload", encodedPayload); - return json.toJSONString(); - } - @Override public String toString() { return JSONObject.toJSONString(this); diff --git a/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/utils/IdUtils.java b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/utils/IdUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..4b9f6256cc2044120c5e07e5dcef46435d626786 --- /dev/null +++ b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/utils/IdUtils.java @@ -0,0 +1,32 @@ +package org.smartboot.mqtt.data.persistence.utils; + +import cn.hutool.core.lang.Snowflake; +import cn.hutool.core.util.IdUtil; + +/** +* @Description: 雪花算法生成ID + * @Author: learnhope + * @Date: 2023/9/18 + */ +public class IdUtils { + + private static Snowflake snowflake = IdUtil.getSnowflake(); + + /** + * 生成long 类型的ID + * + * @return + */ + public static Long getId() { + return snowflake.nextId(); + } + + /** + * 生成String 类型的ID + * + * @return + */ + public static String getIdStr() { + return snowflake.nextIdStr(); + } +} \ No newline at end of file diff --git a/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/utils/StrUtils.java b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/utils/StrUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..6f70dc395c4403638a3d3be2a1106ae9dde1bb97 --- /dev/null +++ b/plugins/smart-mqtt-data-persistence/src/main/java/org/smartboot/mqtt/data/persistence/utils/StrUtils.java @@ -0,0 +1,29 @@ +package org.smartboot.mqtt.data.persistence.utils; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; + +import java.util.Base64; + +/** +* @Description: 对字符串进行base64编码 + * @Author: learnhope + * @Date: 2023/9/18 + */ +public class StrUtils { + public String base64(T t){ + String jsonString = JSON.toJSONString(t); + JSONObject json = JSONObject.parseObject(jsonString); + // 对payload进行base64编码 + String payload = json.getString("payload"); + String encodedPayload = Base64.getEncoder().encodeToString(payload.getBytes()); + json.put("payload", encodedPayload); + return json.toJSONString(); + } + + public static String addId(String jsonString){ + JSONObject json = JSONObject.parseObject(jsonString); + json.put("randomId", IdUtils.getIdStr()); + return json.toJSONString(); + } +} diff --git a/plugins/smart-mqtt-data-persistence/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin b/plugins/smart-mqtt-data-persistence/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin new file mode 100644 index 0000000000000000000000000000000000000000..02f209379a7c536c70ec312e927860135ad8bdc9 --- /dev/null +++ b/plugins/smart-mqtt-data-persistence/src/main/resources/META-INF/services/org.smartboot.mqtt.broker.plugin.Plugin @@ -0,0 +1 @@ +org.smartboot.mqtt.data.persistence.impl.RedisPlugin \ No newline at end of file diff --git a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java index 30d9cfcf7122371e828e53e6c49a3f405acfdd14..aa370f1387b53912d0b6e5e999a19c457811e292 100644 --- a/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java +++ b/smart-mqtt-broker/src/main/java/org/smartboot/mqtt/broker/BrokerContextImpl.java @@ -270,7 +270,6 @@ public class BrokerContextImpl implements BrokerContext { BrokerTopic topic = getOrCreateTopic(eventObject.getObject().getVariableHeader().getTopicName()); try { //触发消息总线 - messageBusSubscriber.consume(eventObject.getSession(), eventObject.getObject()); } finally { eventBus.publish(ServerEventType.MESSAGE_BUS_CONSUMED, topic); diff --git a/smart-mqtt-broker/src/main/resources/smart-mqtt.yaml b/smart-mqtt-broker/src/main/resources/smart-mqtt.yaml index 97fd19fad40406e337578cd3a2038f700ef05499..cd5394724eebda20a26af2c6c817711fc0eda5f9 100644 --- a/smart-mqtt-broker/src/main/resources/smart-mqtt.yaml +++ b/smart-mqtt-broker/src/main/resources/smart-mqtt.yaml @@ -3,3 +3,24 @@ broker: port: 1883 maxInflight: 256 lowMemory: false + openapi: + port: 18083 + host: 0.0.0.0 +plugins: + - websocket: + port: 1884 +# - kafka-bridge: +# host: 121.xxx.xxx.21:9092,121.xxx.xxx.21:9093,121.xxx.xxx.21:9094 +# acks: 0 +# retries: 0 +# batchSize: 16384 +# lingerMs: 100 +# buffer: 33554432 +# base64: false + - redis-bridge: + host: 121.194.93.21:6379 + password: 990194923 + base64: false + simple: false + + diff --git a/smart-mqtt-common/pom.xml b/smart-mqtt-common/pom.xml index 7cead73067a9ed96c4ba0fcb9b76d96bd3b7c2ff..2186f3489d0a668462aebebae72d33ae83e3c56b 100644 --- a/smart-mqtt-common/pom.xml +++ b/smart-mqtt-common/pom.xml @@ -9,7 +9,6 @@ ../pom.xml 4.0.0 - smart-mqtt-common