# springboot_rocketmq **Repository Path**: hlai/springboot_rocketmq ## Basic Information - **Project Name**: springboot_rocketmq - **Description**: springboot整合消息队列RocketMQ - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2024-01-24 - **Last Updated**: 2024-01-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 黄金屋 ## 避坑指南 ### 1.广播消费 ![image-20240109144908873](assets/image-20240109144908873.png) 下面的异常显示找不到路径之类的,但是消息又可以正常接收,主要是消费者的id台常量创建文件的的时候创建不成功 处理办法: 实现`RocketMQPushConsumerLifecycleListener`重写`prepareStart`设置consumer名称 ```java package com.cai.listener.rocketmq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Component; /** * @Title: RocketConsumerListener * @Author RuoLi * @Package com.cai.listener * @Date 2024/1/8 14:50 * @description: rocket消费者监听 * @RocketMQMessageListener(topic = "topic1", consumerGroup = "group1") 一个类只能监听一个主题 * consumeMode = ConsumeMode.ORDERLY 顺序消费 ConsumeMode.CONCURRENTLY 并行消费 * MessageModel.BROADCASTING 广播消费 MessageModel.CLUSTERING 集群消费 * 循序消费不能和广播消费一起使用,否则会抛出异常 * */ @Slf4j @Component @RocketMQMessageListener(topic = "topic1", consumerGroup = "group1",messageModel = MessageModel.BROADCASTING) public class ConsumerListenerTopic1 implements RocketMQListener, RocketMQPushConsumerLifecycleListener { private String message; public String getMessage() { return message; } @Override public void onMessage(String message) { log.debug("消费者接收到消息:{}", message); this.message = message; } @Override public void prepareStart(DefaultMQPushConsumer consumer) { consumer.setInstanceName("consumer2");//初始化设置消费者的名称 } } ``` ## 2.广播消费的消费模式 设置消息监听的时候 consumerModel 在广播模式模式下是并行的,如果设置顺序会出现 广播模式下不支持顺序消费 ```java @RocketMQMessageListener(topic = "topic1", consumerGroup = "group1",messageModel = MessageModel.BROADCASTING) ``` ## 3.日志 项目启动的的时候,RocketMQ会可能会在控制台打印两行红色的字,显示日志之类的。可以在启动类中加上 ```java @SpringBootApplication public class ApplicationContext { public static void main(String[] args) { System.setProperty("rocketmq.client.logUseSlf4j", "true"); SpringApplication.run(ApplicationContext.class, args); } } ``` 在配置文件中可以设置日志的级别 ```yaml logging: level: com.cai: debug RocketmqClient: error //只打印错误日志 ``` ## RocketMQ 官网:https://rocketmq.apache.org/ ### 1.安装 ![image-20240109150620267](assets/image-20240109150620267.png) 下载的时候选择`Binary` 不要下载源码,如果下载源码的话自己打包可能会出现问题 以下就是下载之后的目录结构了 ![image-20240109150805886](assets/image-20240109150805886.png) ### 2.启动 打开`bin`目录下的 `mqnamesrv.cmd` ![image-20240109150929230](assets/image-20240109150929230.png) 然后再启动`mqbroker.cmd` !!!但是先要设置一下 先创建一个`.bat`文件 **指定端口和自动创建主题** ```shell start mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true ``` ![image-20240109151208600](assets/image-20240109151208600.png) 就是这样子了,然后再右键创建一个快捷方式 这样就不用每次点到文件里面去打开了 ![image-20240109151324407](assets/image-20240109151324407.png) 启动成功状态 nameserver ![image-20240109151410514](assets/image-20240109151410514.png) broker ![image-20240109151425866](assets/image-20240109151425866.png) ## RocketMQ Dashboard 可视化界面 ![image-20240109151625817](assets/image-20240109151625817.png) 这个可以观察消息的消费状态以及集群、消息内容之类的 ![image-20240109152302356](assets/image-20240109152302356.png) 包含以下内容和快捷启动方式的目录 ![image-20240109152410555](assets/image-20240109152410555.png) ## 业务处理 在生产端 例如用户请求修改或者编辑数据 只需要校验数据是否符合要求 做数据修改放在消费端 > 当一个处理时间可能很长的情况下 需要及时反馈给用户状态 用户长时间等待业务完成也是不合理的 - 生产端 通过一个post请求发送异步消息给消费端,只要发送的成功消息就不会丢失,如果发送失败要获取发送失败的状态或者抛出异常 准备重新发送 ```java @PostMapping("/update") public String update(@RequestBody UpdateAdminParams params) { //先检查参数是否合法 if (userService.UerIsExist(Long.valueOf(params.getId()))) { String jsonString = JSONObject.toJSONString(params); //发送异步消息 boolean message = messagesService.sendAsyncMessage("admin:update", params.getId(), jsonString); return message ? "更新管理员成功" : "更新管理员失败"; } return "当前不用存在"; } ``` - 消费端 ```java @Service @Slf4j public class UserServiceImpl implements UserService, MessageHandle { private MessageDetails messageDetails; @Autowired private UserRepository userRepository; @Override public void handleMessage(MessageDetails messageDetails) { if (Objects.isNull(messageDetails)) throw new NullPointerException(); this.messageDetails = messageDetails; System.err.println(messageDetails); //处理业务 updateAdmin(); deleteAdmin(); } @Override public synchronized void updateAdmin() { if (messageDetails.getTopic().equals("admin") && messageDetails.getTags().equals("update")) { String message = messageDetails.getMessage(); UpdateAdminParams updateAdminParams = JSONObject.parseObject(message, UpdateAdminParams.class); //更新管理员信息 UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.eq("id", updateAdminParams.getId()); updateWrapper.set("name", updateAdminParams.getName()); userRepository.update(updateWrapper); log.info("更新管理员信息成功"); } } @Override public synchronized void deleteAdmin() { if (messageDetails.getTopic().equals("admin") && messageDetails.getTags().equals("delete")) { String message = messageDetails.getMessage(); DeleteAdminParams updateAdminParams = JSONObject.parseObject(message, DeleteAdminParams.class); //更新管理员信息 UpdateWrapper updateWrapper = new UpdateWrapper<>(); updateWrapper.eq("id", updateAdminParams.getId()); userRepository.remove(updateWrapper); log.info("删除管理员信息成功"); } } } ``` - 运行结果 生产端 ![image-20240116164635967](assets/image-20240116164635967.png) 消费端 ![image-20240116164725717](assets/image-20240116164725717.png)