# local_delay_queue_boot_starter **Repository Path**: jixiangjun/local_delay_queue_boot_starter ## Basic Information - **Project Name**: local_delay_queue_boot_starter - **Description**: 基于Java delay_queue的持久化延迟消息队列,并且能够对消息进行持久化 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: v_starter_shareding - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2022-10-22 - **Last Updated**: 2024-01-26 ## Categories & Tags **Categories**: Uncategorized **Tags**: 延迟消息, 持久化, spring-boot-starter ## README # 描述 基于Java delay_queue的延迟消息 组件,主要功能在于提供持久化恢复 # 前置工作 1 创建对应的表 ```sql -- delay_queue.delay_message definition CREATE TABLE `delay_message` ( `id` bigint NOT NULL AUTO_INCREMENT, `msg_clazz` varchar(100) NOT NULL DEFAULT '' COMMENT '延迟消息的完整类名', `msg_json` json NOT NULL COMMENT '延迟消息内容', `init_status` varchar(1) NOT NULL DEFAULT '0' COMMENT '延迟消息被初始化的状态;0:还未被加载到内存队列中;1:已被加载到内存队列', `create_time` datetime DEFAULT CURRENT_TIMESTAMP, `is_del` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否删除', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=70 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='延迟消息表'; ``` 2 maven 添加starter 依赖: ```xml cn.lucky delay-queue-boot-starter 1.1.25 ``` 3 yaml 配置:设置数据库连接参数 ```yaml spring: datasource: url: jdbc:mysql://localhost:3306/delay_queue?useUnicode=true&characterEncoding=utf8&useSSL=true username: xxx password: xxx driver-class-name: com.mysql.cj.jdbc.Driver lucky: delayqueue: # 是否开启逻辑删除,不配置 默认 false enable-logic-del: true ``` # 示例代码 4 首先定义业务 延迟消息对象,继承 BaseDelayMsg 基础延迟消息类: ```java @Data @EqualsAndHashCode(callSuper = false) @NoArgsConstructor public class OrderDelayMsg extends BaseDelayMsg { private String orderNo; public OrderDelayMsg(String orderNo, Long time, TimeUnit unit) { super(time, unit); this.orderNo = orderNo; } @Override public String toString() { return "OrderBaseDelayMsg{" + "orderNo='" + orderNo + '\'' + '}'; } } ``` 然后在业务组件中注入 PersistentDelayQueue,然后添加 @DelayQue 注解, 注解中设置实际的 自定义的要获取的延迟消息: ```java @Component @Slf4j public class OrderController { @DelayQue(OrderDelayMsg.class) private PersistentDelayQueue delayQueue; /** * 测试添加消息 * @param msg 订单延迟消息 * @return boolean */ public boolean addOrder(OrderDelayMsg msg) { return delayQueue.add(msg); } /** * 测试消息消息 order */ public void consumeOrder() { while (true) { OrderDelayMsg msg = (OrderDelayMsg) delayQueue.take(); log.info("消费了消息:{}", msg); delayQueue.ack(msg); } } } ``` 示例中,可以看到 PersistentDelayQueue 的三个主要方法: add(msg):往持久化延迟队列添加消息 take():从队列中获取延迟消息,根据业务转换为 上面注解中标注的 class 类型,注意, 这个强转类型不是任意的,是 @DelayQue(xxx.class) 中标识的类 ack(msg):确认消息已经被消费,调用此方法会删除持久化的消息,注意,如果消息成功 消费,一定要调用此方法,不然机器重启会重复消费,反之,如果获取消息之后后续处理失败 则不要调用,否则会丢失消息。 releaseLoadedStatus(msg): 如果业务处理失败,需要释放消息,避免下次初始化不被加载