# lg-rabbitmq **Repository Path**: sunli1103_admin/lg-rabbitmq ## Basic Information - **Project Name**: lg-rabbitmq - **Description**: 【java训练营作业19-RabbitMQ死信队列】第六阶段 分布式消息服务中间件进阶 模块二 高吞吐消息中间件Kafka 本模块对Kafka集群原理和消息流处理流程、组件机制、流处理基础等进行深入讲解,对从架构选型角度对三种MQ进行比较。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-08-24 - **Last Updated**: 2022-03-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 作业说明 #### 课程介绍 > **第六阶段 分布式消息服务中间件进阶** > > 模块一 开源消息中间件RabbitMQ > > 本模块对市场上常用的开源消息中间件RabbitMQ进行深度源码剖析、并对ACK、限流、TTL、死信、延迟、集群等高级应用和原理进行讲解。 #### 作业内容 > 基于RabbitMQ的TTL以及死信队列,使用SpringBoot实现延迟付款,手动补偿操作。 > > 1. 用户下单后展示等待付款页面 > > 2. 在页面上点击付款的按钮,如果不超时,则跳转到付款成功页面 > > 3. 如果超时,则跳转到用户历史账单中查看因付款超时而取消的订单 #### 实现思路: ![输入图片说明](https://images.gitee.com/uploads/images/2020/0826/155738_e1290dac_1712191.png "屏幕截图.png") ##### 1. 在数据库中创建业务相关表。 以购买拉勾课程为例,一个订单只有一个课程,且电子虚拟商品忽略库存处理和物流信息。 * 课程表(course):课程id、课程名称、课程简介、课程价格、课程状态、备注、删除标识、创建时间、创建者、更新时间、更新者 * 用户表(user):用户id、用户名称、电话、邮箱、用户状态、备注、删除标识、创建时间、更新时间 * 订单表(order_info):订单id、用户id、课程id、课程价格、订单价格(课程价格 - 优惠价格)、订单状态、备注、删除标识、创建时间/下单时间、更新时间、过期时间 * 支付记录表(pay_info):支付id、用户id、订单id、流水id、支付方式、支付金额、支付状态、备注、创建时间/支付时间 > 本次作业只实现订单表和支付记录表,课程表和用户表省略。 ##### 2. 使用Spring Boot提供restful接口,供前端页面调用 * 获取课程信息:getCourseById(Integer courseId) * 下单:order( Long userId, Integer courseId) * 支付:pay(Long userId, Long orderId) `忽略通过打开第三方支付页面进行支付流程,直接完成支付状态` * 获取订单信息:getOrderInfoById(Long orderId) > 本次作业只实现下单、付款、获取订单信息,获取课程信息省略。 ##### 3. 设计前端页面 * 课程信息页面:用于显示所售课程详细信息的页面,并提供购买按钮,完成下单操作。 * 支付页面:用户在课程购买页面中点击购买按钮后直接跳到待付款页面,此时后台已生成待付款订单。 * 支付成功页面:完成支付后,提示成功。 * 历史订单信息页面:支付订单超时的场合,用来查看超时失效的订单信息。 > 因时间紧张,本次作业实现页面比较简陋。 #### 安装软件 ``` RabbitMQ 3.8.5 Erlang 23.0.2 MySQL 5.7 Spring Boot 2.2.8 spring-boot-starter-web mybatis-spring-boot-starter 2.1.3 ``` #### 实现步骤 `省略其他软件环境的安装步骤` ##### 1. 安装RabbitMQ (参看讲义1.2 安装和配置RabbitMQ) 1. 安装依赖 ```shell yum install socat -y ``` 2. 安装Erlang erlang-23.0.2-1.el7.x86_64.rpm下载地址: https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_64.rpm 首先将erlang-23.0.2-1.el7.x86_64.rpm上传至服务器,然后执行下述命令: ```shell rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm ``` 3. 安装RabbitMQ rabbitmq-server-3.8.5-1.el7.noarch.rpm下载地址: https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm 首先将rabbitmq-server-3.8.5-1.el7.noarch.rpm上传至服务器,然后执行下述命令: ```shell rpm -ivh rabbitmq-server-3.8.5-1.el7.noarch.rpm ``` 4. 启用RabbitMQ的管理插件 ```shell rabbitmq-plugins enable rabbitmq_management ``` 5. 开启RabbitMQ 三种方式任选一种 ```shell systemctl start rabbitmq-server # 前台启动 rabbitmq-server # 后台启动 rabbitmq-server -detached ``` 6. 添加用户 ```shell rabbitmqctl add_user root 123456 ``` 7. 给用户添加权限 给root用户在虚拟主机"/"上的配置、写、读的权限 ```shell rabbitmqctl set_permissions root -p / ".*" ".*" ".*" ``` 8. 给用户设置标签和权限 ```shell rabbitmqctl set_user_tags root administrator ``` ##### 2. 数据库表结构 ```sql SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for order_info -- ---------------------------- DROP TABLE IF EXISTS `order_info`; CREATE TABLE `order_info` ( `order_id` bigint(20) NOT NULL AUTO_INCREMENT, `user_id` bigint(20) NOT NULL, `course_id` int(11) NOT NULL, `course_price` bigint(20) NOT NULL, `order_price` bigint(20) NOT NULL, `status` tinyint(4) NOT NULL, `remark` varchar(255) DEFAULT NULL, `delete_flag` bit(1) NOT NULL, `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `update_time` timestamp NULL DEFAULT NULL, `expires_time` timestamp NULL DEFAULT NULL, PRIMARY KEY (`order_id`) ) ENGINE=InnoDB AUTO_INCREMENT=16 DEFAULT CHARSET=utf8mb4; SET FOREIGN_KEY_CHECKS = 1; -- ---------------------------- -- Table structure for pay_info -- ---------------------------- DROP TABLE IF EXISTS `pay_info`; CREATE TABLE `pay_info` ( `pay_id` bigint(20) NOT NULL AUTO_INCREMENT, `user_id` bigint(20) NOT NULL, `order_id` bigint(20) NOT NULL, `serial_number` varchar(100) NOT NULL, `pay_type` tinyint(4) NOT NULL, `pay_price` bigint(20) NOT NULL, `status` tinyint(4) NOT NULL, `remark` varchar(255) DEFAULT NULL, `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`pay_id`) ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4; SET FOREIGN_KEY_CHECKS = 1; ``` ##### 3. 搭建Spring Boot项目 首先按照课程视频和讲义搭建一个最简单的RabbitMQ工程,启动并实现消息的发送和接收。(参看课程任务二:18. SpringBoot整合RabbitMQ、讲义1.7 SpringBoot整合RabbitMQ) 然后再根据作业要求修改和添加相应代码。 1. 配置信息 application.yml ```yml spring: datasource: url: jdbc:mysql://localhost:3306/lagou_rabbitmq?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver rabbitmq: host: 192.168.0.111 virtual-host: / username: root password: 123456 port: 5672 mybatis: type-aliases-package: com.lagou.rabbit.entity mapper-locations: classpath:mapper/*.xml configuration: map-underscore-to-camel-case: true ``` 2. 主要代码 RabbitConfig.java ```java package com.lagou.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitConfig { @Bean public Queue queue() { Map props = new HashMap<>(); // 消息的生存时间 10s props.put("x-message-ttl", 10000); // 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列) props.put("x-dead-letter-exchange", "exchange.order.dlx"); // 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey props.put("x-dead-letter-routing-key", "order.dlx"); Queue queue = new Queue("queue.order", true, false, false, props); return queue; } @Bean public Queue queueDlx() { Queue queue = new Queue("queue.order.dlx", true, false, false); return queue; } @Bean public Exchange exchange() { DirectExchange exchange = new DirectExchange("exchange.order", true, false, null); return exchange; } @Bean public Exchange exchangeDlx() { DirectExchange exchange = new DirectExchange("exchange.order.dlx", true, false, null); return exchange; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with("order").noargs(); } @Bean public Binding bindingDlx() { return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("order.dlx").noargs(); } } ``` OrderController.java ```java package com.lagou.rabbitmq.controller; import com.lagou.rabbitmq.entity.OrderInfo; import com.lagou.rabbitmq.service.OrderService; import com.lagou.rabbitmq.util.SimpleResponseEntity; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import javax.validation.constraints.NotNull; @RestController public class OrderController { @Autowired private OrderService orderService; @PostMapping("/order") public ResponseEntity order(@RequestParam @NotNull(message = "用户ID不能为空") Long userId, @RequestParam @NotNull(message = "课程ID不能为空") Integer courseId) throws Exception { orderService.order(userId, courseId); return SimpleResponseEntity.post(); } @GetMapping("/order/{orderId}") public ResponseEntity getOrderInfoById(@RequestParam @NotNull(message = "用户ID不能为空") Long userId, @PathVariable @NotNull(message = "订单ID不能为空") Long orderId) throws Exception { OrderInfo orderInfo = orderService.getOrderInfoById(userId, orderId); return SimpleResponseEntity.get(orderInfo); } } ``` PayController.java ```java package com.lagou.rabbitmq.controller; import com.lagou.rabbitmq.service.PayService; import com.lagou.rabbitmq.util.SimpleResponseEntity; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.validation.constraints.NotNull; @RestController public class PayController { @Autowired private PayService payService; @PostMapping("/payment") public ResponseEntity pay(@RequestParam @NotNull(message = "用户ID不能为空") Long userId, @RequestParam @NotNull(message = "订单ID不能为空") Long orderId) throws Exception { Integer payStatus = payService.pay(userId, orderId); return SimpleResponseEntity.post(payStatus); } } ``` OrderInfoMapper.java ```java package com.lagou.rabbitmq.mapper; import com.lagou.rabbitmq.entity.OrderInfo; import org.springframework.stereotype.Component; @Component public interface OrderInfoMapper { void add(OrderInfo orderInfo); OrderInfo getOrderInfoById(Long userId, Long orderId); Integer getOrderStatusById(Long userId, Long orderId); void invalid(Long orderId); void paid(Long userId, Long orderId); } ``` PayInfoMapper.java ```java package com.lagou.rabbitmq.mapper; import com.lagou.rabbitmq.entity.PayInfo; import org.springframework.stereotype.Component; @Component public interface PayInfoMapper { void add(PayInfo payInfo); } ``` OrderService.java ```java package com.lagou.rabbitmq.service; import com.lagou.rabbitmq.entity.OrderInfo; import com.lagou.rabbitmq.mapper.OrderInfoMapper; import com.lagou.rabbitmq.mapper.PayInfoMapper; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class OrderService { @Autowired private AmqpTemplate rabbitTemplate; @Autowired private OrderInfoMapper orderInfoMapper; @Autowired private PayInfoMapper payInfoMapper; public void order(Long userId, Integer courseId) throws Exception { OrderInfo orderInfo = new OrderInfo(); orderInfo.setUserId(userId); orderInfo.setCourseId(courseId); orderInfo.setCoursePrice(9800L); orderInfo.setOrderPrice(9800L - 5000L); orderInfo.setStatus(1); orderInfoMapper.add(orderInfo); System.out.println("生成订单:" + orderInfo.getOrderId()); // 进入待支付订单的TTL队列 rabbitTemplate.convertAndSend("exchange.order", "order", orderInfo.getOrderId()); } public OrderInfo getOrderInfoById(Long userId, Long orderId) throws Exception { OrderInfo orderInfo = orderInfoMapper.getOrderInfoById(userId, orderId); if (orderInfo == null) { throw new Exception("订单取得失败"); } return orderInfo; } public Integer getOrderStatusById(Long userId, Long orderId) throws Exception { Integer orderStatus = orderInfoMapper.getOrderStatusById(userId, orderId); if (orderStatus == null) { throw new Exception("订单状态取得失败"); } return orderStatus; } @RabbitListener(queues = "queue.order.dlx") public void invalidOrder(Long orderId) { orderInfoMapper.invalid(orderId); System.out.println("超时失效订单:" + orderId); } public void paidOrder(Long userId, Long orderId) { orderInfoMapper.paid(userId, orderId); System.out.println("支付订单:" + orderId); } } ``` PayService.java ```java package com.lagou.rabbitmq.service; import com.lagou.rabbitmq.entity.PayInfo; import com.lagou.rabbitmq.enums.OrderStatusEnum; import com.lagou.rabbitmq.enums.PayStatusEnum; import com.lagou.rabbitmq.enums.PayTypeEnum; import com.lagou.rabbitmq.mapper.PayInfoMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; @Service public class PayService { @Autowired private OrderService orderService; @Autowired private PayInfoMapper payInfoMapper; public Integer pay(Long userId, Long orderId) throws Exception { Integer orderStatus = orderService.getOrderStatusById(userId, orderId); if (OrderStatusEnum.UNPAID.getCode().equals(orderStatus)) { // 插入支付记录 PayInfo payInfo = new PayInfo(); payInfo.setUserId(userId); payInfo.setOrderId(orderId); payInfo.setSerialNumber(UUID.randomUUID().toString()); payInfo.setPayType(PayTypeEnum.ALIPAY.getCode()); payInfo.setPayPrice(4800L); payInfo.setStatus(PayStatusEnum.SUCCESS.getCode()); payInfoMapper.add(payInfo); // 修改订单状态为已支付 orderService.paidOrder(userId, orderId); return PayStatusEnum.SUCCESS.getCode(); } else if (OrderStatusEnum.PAID.getCode().equals(orderStatus)) { return PayStatusEnum.REPEAT.getCode(); } else { return PayStatusEnum.FAILED.getCode(); } } } ``` OrderInfoMapper.xml ```xml INSERT INTO order_info (user_id, course_id, course_price, order_price, status, remark, delete_flag, create_time) VALUES (#{userId}, #{courseId}, #{coursePrice}, #{orderPrice}, #{status}, #{remark}, false, now()) UPDATE order_info SET status=3, update_time=now(), expires_time=now() WHERE order_id=#{orderId} AND delete_flag=false AND status=1; ``` PayInfoMapper.xml ```xml INSERT INTO pay_info (user_id, order_id, serial_number, pay_type, pay_price, status, remark, create_time) VALUES (#{userId}, #{orderId}, #{serialNumber}, #{payType}, #{payPrice}, #{status}, #{remark}, #{createTime}) ``` #### 测试流程 1. 用户下单后展示等待付款页面。确认订单是否生成,TTL队列中是否有消息。 2. 在页面上点击付款的按钮,如果不超时(10秒内),则跳转到付款成功页面。确认订单状态。 3. 重新下单,超时支付,则跳转到用户历史账单中查看因付款超时而取消的订单。 #### 视频讲解 ![视频讲解](reference/md-videos/rabbitmq.mp4)