# fast-mq **Repository Path**: randomterror/fast-mq ## Basic Information - **Project Name**: fast-mq - **Description**: 支持 - 🚀 开箱即用 - 🔆 ACK机制 - 📦 异步通信 - 🎨 消息故障修复 - 🌕 死信队列处理 - 🌪️ 消息、消费组、消费者监控管理 - 💫 灵活接口幂等控制 - 🪐 支持redis单机、主从、集群 等强大特性的基于redis\redisson的轻量级MQ中间件 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 4 - **Created**: 2024-07-30 - **Last Updated**: 2024-07-30 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README

## 🔥特性(Features) - 🚀 开箱即用 - 🍄 延时队列 - 🔆 ACK机制 - 📦 异步通信 - 🎨 消息故障修复 - 🌕 死信队列处理 - 🌪️ 消息、消费组、消费者监控管理 - 💫 灵活接口幂等控制 - 🪐 支持redis单机、主从、集群 - ..........(待续) ## 🖥 环境要求 (Environment Required) - redis v6.0.0+ - springboot v2.6.5 - jdk 1.8+ - ...... ## 🌎 整体架构 (Architecture) ....(待续) ## ☀️ 快速开始(Quick Start) ### 依赖 (Dependency) ```java ##此版本还未有监控页面 io.github.disaster1-tesk fast-mq-core 1.3.0 ``` ### 队列 (Queue) #### 生产者 (Producer) 注入FastMQTemplate即可使用 ```java public class FastMQTemplateTest extends BaseTest { @Autowired private FastMQTemplate fastMQTemplate; @Test public void sendMsgTest() { HashMap msg = Maps.newHashMap(); msg.put("name", "disaster"); msg.put("age", 20); fastMQTemplate.sendMsgAsync("disaster_topic", msg); fastMQTemplate.sendMsgAsync("disaster_topic", msg); fastMQTemplate.sendMsgAsync(FastMQConstant.DEFAULT_TOPIC, msg); while (true){ } } } ``` #### 消费者(Consumer) ```java /** * 不使用注解,则使用框架默认的topic和consumername * */ @Service @Slf4j public class FastMQConsumerTest implements FastMQListener { @Override public void onMessage(Object o) { log.info("result = {}", o); } } /** * 使用注解可指定topic和consumername,同时还支持接口幂等处理 * */ @Service @FastMQMessageListener(idempotent = true,groupName = "disaster",consumeName = "disaster1",topic = "disaster_topic", readSize = 0) @Slf4j public class FastMQConsumerAnnotationTest implements FastMQListener{ @Override public void onMessage(Object t) { log.info("result = {}", t); } } ``` ### 延时队列 (Delay Queue) #### 生产者 (Producer) 注入FastMQTemplate即可使用 ```java public class FastMQDelayTemplateTest extends BaseTest { @Autowired private FastMQDelayTemplate fastMQDelayTemplate; @Test public void sendMsgTest() throws InterruptedException { Thread.sleep(2000l); fastMQDelayTemplate.msgEnQueue("hello", 20, null, TimeUnit.SECONDS); while (true) { } } } ``` #### 消费者(Consumer) ```java /** * 不使用注解则使用框架默认队列名和线程池 */ @Service @Slf4j public class FastMQDelayConsumerTest implements FastMQDelayListener { @Override public void onMessage(Object t) throws Throwable { log.info("result = {}", t); } } /** * 使用注解可自定义队列名称与线程池 */ @FastMQDelayMessageListener(queueName = "test",executorName = "test_executor") @Service @Slf4j public class FastMQDelayConsumerAnnotationTest implements FastMQDelayListener { @Override public void onMessage(Object t) throws Throwable { log.info("result = {}", t); } } ``` ## 💐 配置 (Configuration) ### 🦫Redission配置项 #### 1.fast-mq内置配置 fast-mq支持通过YAML配置Redission单机、主从、集群 ``` ## 单机版本 redisson: server: host: 127.0.0.1 port: 6379 database: 0 deployment: stand_alone ## 主从版本 redisson: server: host: 127.0.0.1 port: 6379 database: 0 nodes: 127.0.0.1:xxx,127.0.0.1:xxx,127.0.0.1:xxx master: mymaster deployment: master_slave ## 集群 server: host: 127.0.0.1 port: 6379 database: 0 nodes: 127.0.0.1:xxx,127.0.0.1:xxx,127.0.0.1:xxx deployment: cluster ``` #### 2.用户自定义 如果不想使用fast-mq提供的Redission-YAML配置,则只需要在springboot项目中实例化一个RedissonClient对象并被spring管理即可 ```java @Configuration public class RedissionConfig { @Bean public RedissonClient redissonClient() { Config config = new Config(); SingleServerConfig singleServerConfig = config.useSingleServer(); singleServerConfig.setAddress("redis://" + "127.0.0.1:6379"); singleServerConfig.setDatabase(1); singleServerConfig.setPassword("123456"); return Redisson.create(config); } } ``` ### 🦦FastMQ配置项 ``` fastmq: config: #是否开启fastmq enable: false # 每次拉取数据的量 fetchMessageSize: 5 #每次拉取PendingList的大小 pullPendingListSize: 1000 #死信门槛(秒) deadLetterThreshold: 32 #是否从头开始订阅消息 isStartFromHead: true #超过了该长度stream前面部分会被持久化(非严格模式——MAXLEN~) trimThreshold: 10000 #是否是异步 isAsync: false executor: #拉取默认主题信息的周期 pullDefaultTopicMessagesPeriod: 10 #检查PendingList周期 pullTopicMessagesPeriod: 1 time-unit: seconds #第一次延迟执行的时间 initial-delay: 1 #线程池的核心线程数,同步时调此参数能有效提高效率,如果采用的是异步消费的方式,使用默认配置即可 executor-core-size: 20 claim: #认领门槛(单位毫秒) claimThreshold: 20 time-unit: milliseconds idle: #检查consumer不活跃的门槛(单位秒) pendingListIdleThreshold: 10 time-unit: seconds ```