# RabbitMQ
**Repository Path**: lambertee/RabbitMQ
## Basic Information
- **Project Name**: RabbitMQ
- **Description**: RabbitMQ是流行的消息队列服务软件,是开源的AMQP(高级消息队列协议)实现,用于在分布式系统中存储转发消息,是程序员的必备技能
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2022-01-03
- **Last Updated**: 2022-06-14
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# RabbitMQ
# 一、消息队列协议
- AMQP协议
支持者:RabbitMQ、ActiveMQ
- MQTT协议
支持者:RabbitMQ、ActiveMQ
- OpenMessage协议
支持者:Apache RocketMQ
- Kafka协议
支持者:kafka
# 二、消息队列持久化
## 1、持久化
简单来说就是将数据存入磁盘,而不是存在内存中随着服务器重启而消失,使数据能够永久保存

## 2、常见的持久化方式
| | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
| -------- | -------- | -------- | ----- | -------- |
| 文件存储 | 支持 | 支持 | 支持 | 支持 |
| 数据库 | 支持 | / | / | / |
# 三、消息的分发策略
## 1、场景模型
## 2、消息分发策略的机制和对比
| | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
| -------- | -------- | -------- | ----- | -------- |
| 发布订阅 | 支持 | 支持 | 支持 | 支持 |
| 轮询分发 | 支持 | 支持 | 支持 | / |
| 公平分发 | / | 支持 | 支持 | / |
| 重发 | 支持 | 支持 | / | 支持 |
| 消息拉取 | / | 支持 | 支持 | 支持 |
# 四、消息队列高可用和高可靠
## 1、高可用
所谓高可用是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
当业务量增加时,请求也过大,一台消息中间件服务器会触及硬件的极限,一台消息服务器已经无法满足业务需求,所以消息中间件必须支持集群部署来达到高可用的目的
## 2、集群模式1-Master-salve主从共享数据的部署模式

生产者将消息发送到Master节点,所有的都连接这个消息队列共享这块数据区域,Master节点负责写入,一旦Master挂掉,slave节点继续服务。从而形成高可用
## 3、集群模式2-Master-slave主从同步部署方式

这种模式写入消息同样在Master主节点上,但是主节点会同步数据到slave节点形成副本,和zookeeper或者redis主从机制很类同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同节点就行消费,消息的拷贝和同步会暂用很大的带宽和网络资源。在后续rabbitmq中会有使用
## 4、集群模式3-多主集群同步部署模式

和集群模式2差不多,但是它可以在任意节点中写入
## 5、集群模式4-多主集群转发部署模式

如果你插入的数据到broker1中,元数据信息会存储数据的相关描述和记录存放的位置。
它会对描述信息也就是元数据信息进行同步,如果消费者在broker2中进行消费,发现自己没有对应的信息,可以从对应的元数据信息去查询,然后返回对应的消息场景。
## 6、集群模式5-Master-slave与Broker-cluster组合的方案
> 总结:消息共享/消息同步/元数据共享
# 五、RabbitMQ的入门及安装
> RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端。用于分布式系统中存储,转发消息,具有高可用,高可扩性,易用性等特征。
## 01、安装RabbitMQ
1:下载地址:https://www.rabbitmq.com/download.html

2:环境准备:CentOS7.x + / Erlang
RabbitMQ是采用Erlang语言开发的,所以系统环境必须提供Erlang环境,第一步就是安装Erlang
## 02、Erlang安装
地址:http://www.erlang-solutions.com/downloads/

- 选择CentOS

- 打开虚拟机,新建一个rabbitmq文件夹
```shell
mkdir -p /usr/rabbitmq
```
- 将下载好的Rabbitmq和erlang放入该文件夹
- 安装Rabbitmq和erlang
可能会出现错误
```shell
错误:依赖检测失败:
libodbc.so.2()(64bit) 被 esl-erlang-23.2-1.x86_64 需要
```
需要安装ODBC
```shell
yum install unixODBC
```
解压文件
```shell
rpm -Uvh esl-erlang_23.2-1_centos_7_amd64.rpm
```
安装
```shell
yum install -y erlang
```
查看版本号
```shell
erl -v
```
## 03、安装socat
```shell
yum install -y socat
```
## 04、安装RabbitMQ
- 解压
```shell
rpm -Uvh rabbitmq-server-3.8.16-1.el7.noarch.rpm
```
- 安装
```shell
yum install rabbitmq -y
```
- 启动rabbitmq-server
```shell
systemctl start rabbitmq-server
```
- 查看服务状态
```shell
systemctl status rabbitmq-server
```
有Active:active(running)表已启动
- 开机自动启动
```shell
systemctl enable rabbitmq-server #开启开机自动启动
systemctl stop rabbitmq-server #关闭开机自动启动
```
# 六、RabbitMQWeb管理界面及授权操作
## 01、RabbitMQ管理界面
默认情况下,rabbitmq是没有安装web端的客户端插件,需要安装才可以生效
```shell
rabbitmq-plugins enable rabbitmq_management
```
安装完毕后,重启服务器
```shell
systemctl restart rabbitmq-server
```
进入虚拟机ip地址:15672(10.1.53.169:15672)

若无法进入,需要释放15672端口的防火墙
```shell
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload #重启防火墙
```
rabbitmq有一个默认账号和密码:guest

默认情况只能在localhost本机下访问,所以需要添加一个远程登录的用户
## 02、授权账号和密码
### 02-1 新增用户
```shell
rabbitmqctl add_user admin admin
```
### 02-2 设置用户分配操作权限
```er
rabbitmqctl set_user_tags admin administrator
```
用户级别:
- adminstrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
- monitoring 监控者 可以登录控制台、查看所有信息
- policymaker 策略制定者 登录控制台,指定策略
- managment 普通管理员,登录控制台
### 03 小结
```shell
rabbitmqctl add_user `账号` `密码` #添加用户
rabbitmqctl set_user_tags `账号` administrator #赋予角色
rabbitmqctl change_password `Username` `Newpassword` #修改密码
rabbitmqctl delete_user `Username` #删除用户
rabbitmqctl list_users #查看用户名单
rabbitmqctl set_permissions -p / `用户名` ".*" ".*" ".*" #为用户设置administrator角色
```
# 七、RabbitMQ之Docker安装
## 01、虚拟化容器技术—Docker的安装
```shell
#(1) yum包更新到最新
> yum update
#(2) 安装需要的软件包,yum-util提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
> yum install -y yum-utils device-mapper-persistent-data lvm2
#(3) 设置yum源为阿里云
> yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
#(4) 安装docker
> yum install docker-ce -y
#(5) 安装后查看docker版本
> docker -v
#(6) 启动docker
> systemctl start docker
#(7) 查看docker状态
> systemctl status docker
#(8)安装加速镜像
sudo mkdir -p /etc/docker
vim daemon.json
#在daemon.json中写入下面的数据
{
"registry-mirrors":["https://0wrdwnn6.mirror.aliyuns.com"]
}
#重启daemon和docker
sudo systemctl daemon-reload
sudo systemctl restart docker
#(9) 测试加速器设置成功
> docker run hello-world
```
## 02、docker的相关命令
```shell
#启动docker:
systemctl start docker
#停止docker
systemctl stop docker
#重启docker
systemctl restart docker
#查看docker状态
systemctl status docker
#开机启动
systemctl enable docker
systemctl unenable docker
#查看docker概要信息
docker info
#查看docker帮助文档
docker --help
```
## 03、安装RabbitMQ
### 03-1 获取rabbbit镜像
```shell
docker pull rabbbitmq:management
```
### 03-2 创建并运行容器
```shell
docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management
```
—hostname:指定容器主机名称
—name:指定容器名称
-p:将mq端口号映射到本地
或者运行时设置用户和密码
**!只需执行下面这一步操作 无需执行上面那两步**
```shell
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
```
```shell
#(1) 查看所有容器
docker ps -a
#(2) 查看所有镜像
docker images
#(3) 启动容器
docker start `CONTAINER ID`
#若端口启动不了 可尝试将已启动的rabbit-server关闭
systemctl stop rabbitmq-server
```
### 03-3 额外Linux相关排查命令
```shell
#查看端口是否被占用
netstat -naop | grep 5672
#查看进程
ps -ef | grep 5672
```
# 八、RabbitMQ的角色分类
## 01、none
- 不能访问management plugin
## 02、mangement:查看自己相关的节点信息
- 列出自己可以通过AMQP登入的虚拟机
- 查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
- 查看和关闭自己的channels和connections
- 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息
## 03、Policymaker
- 包含management所有权限
- 查看和创建和删除自己的virtual hosts所属的policies和parameters信息
## 04、Monitorng
- 包含management所有权限
- 罗列出所有的virtual hosts,包括不能登录的virtual hosts
- 查看其他用户的connections和channels信息
- 查看节点级别的数据如clustering和memory使用情况
- 查看所有的virtual hosts的全局统计情况
## 05、Administrator
- 最高权限
- 可以创建和删除virtual hosts
- 可以查看,创建和删除users
- 查看创建permissions
- 关闭所有用户的connections
## 06、具体操作的界面

# 九、RabbitMQ入门案例 - Simple 简单模式
RabbitMQ支持消息的模式:https://www.rabbitmq.com/getstarted.html

## 01、实现步骤
1. jdk1.8
2. 构建一个maven工程
3. 导入rabbitmq的maven依赖
4. 启动rabbitmq-server服务
5. 定义生产者
6. 定义消费者
7. 观察消息在rabbitmq-server服务中的过程
## 02、导入rabbitmq的maven依赖
### 02-1、Java原生依赖
```java
com.rabbitmq
amqp-client
5.12.0
```
### 02-2、Spring依赖
```java
org.springframework.amqp
spring-amqp
2.3.7
org.springframework.amqp
spring-rabbit
2.3.7
```
### 02-3、Springboot依赖
```java
org.springframework.boot
spring-boot-starter-amqp
2.5.0
```
## 03、Producer
```java
package com.geek.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ClassName Producer
* @Description TODO
* @Author Lambert
* @Date 2021/5/25 11:10
* @Version 1.0
**/
public class Producer {
public static void main(String[] args) {
//所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
//ip port
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2:创建连接Connection
connection = connectionFactory.newConnection("生产者");
//3:通过连接获取通道Channel
channel = connection.createChannel();
//4:通过通道创建交换机,声明队列,绑定关系,路由key,发送消息和接收消息
String queueName = "queue1";
/**
* @params1 队列的名字
* @params2 是否要持久化durable=false,所谓消息是否存盘,如果false是非持久化 true是持久化
* 非持久化会存盘吗,会存盘,但是随着重启服务器会丢失。持久化时重启服务器队列仍存在
* @params3 排他性,是否独占独立
* @params4 是否自动删除,随着最后一个消费者消息消费完毕以后是否把队列删除
* @params5 携带附属参数
*/
channel.queueDeclare(queueName, false, false, false, null);
//5:准备消息内容
String message = "Hello RabbitMQ";
//6:发送消息给队列queue
/**
* @params1:交换机
* @params2:队列、路由key
* @params3:消息状态控制
* @params4:消息的内容
* //面试题:可以存在没有交换机的队列吗?不可以,虽然没有指定交换机但是一定会有存在一个默认的交换机
*/
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
## 04、Consumer
```java
package com.geek.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Consumer
* @Description TODO
* @Author Lambert
* @Date 2021/5/25 11:10
* @Version 1.0
**/
public class Consumer {
public static void main(String[] args) {
//所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2:创建连接Connection
connection = connectionFactory.newConnection("生产者");
//3:通过连接获取通道Channel
channel = connection.createChannel();
//4:通过通道创建交换机,声明队列,绑定关系,路由key,发送消息和接收消息
channel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String s, Delivery message) throws IOException {
System.out.println("收到消息是:" + new String(message.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("接收消息失败");
}
});
System.out.println("开始接收消息");
//持续执行该块代码
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
# 十、AMQP
## 01、AMQP的概念
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议),是应用层协议的一个开发标准,为面向消息的中间件设计
## 02、AMQP生产者流转过程

## 03、AMQP消费者流转过程

# 十一、RabbitMQ的核心组成部分
## 01、RabbitMQ的核心组成部分

## 02、RabbitMQ整体架构

## 03、RabbitMQ的运转流程

## 04、RabbitMQ支持消息的模式
- 简单模式Simple
- 工作模式work
- 类型:无
- 特点:分发机制
- 发布订阅模式
- 类型:fanout
- 特点:Fanout一发布与定义模式,是一种广播机制,它是没有路由Key的模式
- 理由模式
- 类型:direct
- 特点:有routing-key的匹配模式
- 主题Topic模式
- 类型:topic
- 特点:模糊的routing-key的匹配模式
- 参数模式
- 类型:headers
- 参数匹配模式
## 05、RabbitWeb管理界面
获取队列信息

- Nack message requeue true:仅查看队列的信息,不消费
- Automatic ack:查看队列的信息并消费
向队列发送信息

# 十二、RabbitMQ入门案例 - Fanout模式

## 01、Web管理界面模拟操作
- 新建Fanout交换机

- 新建三条新的队列 queue1、queue2、queue3

- 进入刚才创建的fanout-exchange交换机,Bindings下绑定queue1、2、3

- 通过交换机的Publish message模块向绑定该交换机的队列推送消息

- 三条队列同时接收到消息

## 02、代码具体实现
- 类型:fanout
- 特点:fanout—发布与订阅模式,是一种广播机制,他没有理由key的模式
### 02-1、生产者
```java
package com.geek.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ClassName Producer
* @Description TODO
* @Author Lambert
* @Date 2021/5/25 11:10
* @Version 1.0
**/
public class Producer {
public static void main(String[] args) {
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:准备发送的消息内容
String message = "Hello RabbitMQ";
//6:准备交换机
String exchangeName = "fanout-exchange";
//7:定义路由key
String routeKey = "";
//8:指定交换机的类型
String type = "fanout";
//7:发送消息给中间件rabbitmq-server
/**
* @params1:交换机
* @params2:队列、路由key
* @params3:消息状态控制
* @params4:消息的内容
* //面试题:可以存在没有交换机的队列吗?不可以,虽然没有指定交换机但是一定会有存在一个默认的交换机
*/
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
System.out.println("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
### 02-2、消费者(所有模式的消费者均相同)
```java
package com.geek.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Consumer
* @Description TODO
* @Author Lambert
* @Date 2021/5/25 11:10
* @Version 1.0
**/
public class Consumer {
private static Runnable runnable = new Runnable() {
public void run() {
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:定义接受信息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName + "收到的消息是:"+new String(delivery.getBody(),"UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
}
});
System.out.println("开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
new Thread(runnable,"queue1").start();
new Thread(runnable,"queue2").start();
new Thread(runnable,"queue3").start();
}
}
```
# 十三、RabbitMQ入门案例 - direct模式

## 01、Web管理界面模拟操作
- 新建direct交换机

- 进入direct_exchange,绑定队列与交换机的关系

- 向routing_key为email的发送信息

- queue1和queue3同时接收到消息

## 02、代码具体实现
### 02-1、生产者
> 同广播模式对比 只需修改交换机名字、类型并添加路由key
```java
package com.geek.rabbitmq.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ClassName Producer
* @Description TODO
* @Author Lambert
* @Date 2021/5/25 11:10
* @Version 1.0
**/
public class Producer {
public static void main(String[] args) {
//所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
//ip port
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:准备发送的消息内容
String message = "Hello direct_exchange";
//6:准备交换机
String exchangeName = "direct_exchange";
//7:定义路由key
String routeKey = "email";
//8:指定交换机的类型
String type = "direct";
//7:发送消息给中间件rabbitmq-server
/**
* @params1:交换机
* @params2:队列、路由key
* @params3:消息状态控制
* @params4:消息的内容
* //面试题:可以存在没有交换机的队列吗?不可以,虽然没有指定交换机但是一定会有存在一个默认的交换机
*/
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
System.out.println("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
# 十四、RabbitMQ入门案例 - Topics模式

## 01、Web管理界面模拟操作
- 新建topic交换机

- 进入topic_exchange,绑定队列与交换机的关系


\# 代表后面可0级或多级 例如 com.xxxx.xxxx.xxxxx、com
\* 只能有一级并且必须有一级
- 向队列发送消息
| 接收队列 | routing_key | |
| ---------------------- | ---------------------- | -------------------------- |
| queue1 | com、com.xxxx.xxxx.xxx | |
| queue1、queue2 | com.course.xxx | |
| queue1、queue2、queue3 | com.course.order | |
| queue4 | user.xxxxx | |
| queue1、queue3、queue4 | com.course.user.order | \*.courser.*前后只能有一级 |

## 02、代码具体实现
### 02-1、生产者
```java
```
# 十五、RabbitMQ入门案例 - Headers模式
通过参数来指定对应的队列
- 新建Headers_exchange

- 绑定队列


- 向queue1发送消息

- 接收成功

# 十六、完全的声明创建方式
> 这里以direct路由模式为例
## 01、生产者
```java
package com.geek.rabbitmq.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ClassName Producer
* @Description TODO
* @Author Lambert
* @Date 2021/5/25 11:10
* @Version 1.0
**/
public class Producer {
public static void main(String[] args) {
//所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
//ip port
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:准备交换机
String exchangeName = "direct_message_exchange";
//6:定义路由key
String routeKey = "";
//7:指定交换机的类型 direct/topic/fanout/headers
String exchangeType = "direct";
//8:声明交换机
/**
* @params1:交换机名字
* @params2:交换机类型
* @params3:是否持久化
*/
channel.exchangeDeclare(exchangeName, exchangeType,true);
//9:声明队列
/**
* @params1 队列的名字
* @params2 是否要持久化durable=false,所谓消息是否存盘,如果false是非持久化 true是持久化
* 非持久化会存盘吗,会存盘,但是随着重启服务器会丢失。持久化时重启服务器队列仍存在
* @params3 排他性,是否独占独立
* @params4 是否自动删除,随着最后一个消费者消息消费完毕以后是否把队列删除
* @params5 携带附属参数
*/
channel.queueDeclare("queue5", true, false, false, null);
channel.queueDeclare("queue6", true, false, false, null);
channel.queueDeclare("queue7", true, false, false, null);
//10:绑定交换机与队列的关系
channel.queueBind("queue5", exchangeName, "order");
channel.queueBind("queue6", exchangeName, "order");
channel.queueBind("queue7", exchangeName, "course");
//11:准备发送的消息内容
String message = "hello";
//12:发送消息到交换机
/**
* @params1:交换机
* @params2:队列、路由key
* @params3:消息状态控制
* @params4:消息的内容
* //面试题:可以存在没有交换机的队列吗?不可以,虽然没有指定交换机但是一定会有存在一个默认的交换机
*/
channel.basicPublish(exchangeName, "order", null, message.getBytes());
System.out.println("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
## 02、消费者
```java
package com.geek.rabbitmq.all;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Consumer
* @Description TODO
* @Author Lambert
* @Date 2021/5/25 11:10
* @Version 1.0
**/
public class Consumer {
private static Runnable runnable = new Runnable() {
public void run() {
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//获取队列的名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:定义接受信息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName + "收到的消息是:"+new String(delivery.getBody(),"UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
}
});
System.out.println("开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
new Thread(runnable,"queue5").start();
new Thread(runnable,"queue6").start();
new Thread(runnable,"queue7").start();
}
}
```
# 十七、RabbitMQ入门案例-Work模式
RabbitMQ支持消息的模式:https://www.rabbitmq.com/getstarted.html
## 01、Work模式轮询模式(Round-Robin)

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
1. 轮询模式的分发:一个消费者一条,按均分发;
2. 公平分发:根据消费者的消费能力公平分发,处理快的处理得多,处理慢的处理的少;按劳分配;
### 01-1、轮询模式的特点
- 类型:无
- 特点:该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者分配一条直至消息消费完成
### 01-2、生产者
```java
package com.geek.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @ClassName Producer
* @Description 简单模式(轮询)生产者
* @Author Lambert
* @Date 2021/6/1 14:20
* @Version 1.0
**/
public class Producer {
public static void main(String[] args) {
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:准备发送消息的内容
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "消息" + i;
//6:发送消息给交换机
/**
* @params1:交换机 / 当该参数为""时 params2为队列名
* @params2:队列、路由key
* @params3:消息状态控制
* @params4:消息的内容
*/
channel.basicPublish("", "queue1", null, msg.getBytes());
System.out.println("发送消息成功:消息"+i);
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("发送消息出现异常");
}finally {
//6:释放连接关闭通道
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
### 01-3、消费者
> Work1
```java
package com.geek.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @ClassName Work1
* @Description TODO
* @Author Lambert
* @Date 2021/6/1 14:31
* @Version 1.0
**/
public class Work1 {
public static void main(String[] args) {
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-work1");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:定义接收消息的回调
Channel finalChannel = channel;
// finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("work1-收到的消息是" + new String(delivery.getBody(),"UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("work1-接收消息失败");
}
});
System.out.println("work1-开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
> Work2
```java
package com.geek.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Work2
* @Description TODO
* @Author Lambert
* @Date 2021/6/1 14:32
* @Version 1.0
**/
public class Work2 {
public static void main(String[] args) {
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-work2");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:定义接收消息的回调
Channel finalChannel = channel;
// finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("work2-收到的消息是" + new String(delivery.getBody(),"UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("work2-接收消息失败");
}
});
System.out.println("work2-开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
启动Work1、Work2 生产者发送20条消息
由于是轮询模式 所以Work1和work2收到的消息数量是一样的
> Work1

> Work2

## 02、Work模式公平分发模式
### 02-1、生产者
```java
package com.geek.rabbitmq.work.fair;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @ClassName Producer
* @Description 简单模式(轮询)生产者
* @Author Lambert
* @Date 2021/6/1 14:20
* @Version 1.0
**/
public class Producer {
public static void main(String[] args) {
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:准备发送消息的内容
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "消息" + i;
//6:发送消息给交换机
/**
* @params1:交换机 / 当该参数为""时 params2为队列名
* @params2:队列、路由key
* @params3:消息状态控制
* @params4:消息的内容
*/
channel.basicPublish("", "queue1", null, msg.getBytes());
System.out.println("发送消息成功:消息"+i);
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("发送消息出现异常");
}finally {
//6:释放连接关闭通道
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
### 02-2、消费者
> Work1
```java
package com.geek.rabbitmq.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Work1
* @Description TODO
* @Author Lambert
* @Date 2021/6/1 14:31
* @Version 1.0
**/
public class Work1 {
public static void main(String[] args) {
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-work1");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:定义接收消息的回调
final Channel finalChannel = channel;
//指标定义,qos=1指每次取多少条数据出来分给消费者,若为20某一消费者会一次性取20条,不定义时默认值为null是轮询分发
finalChannel.basicQos(1);
/**
* @params1:队列名
* @params2:true:自动应答(轮询模式)/false:手动应答(公平分发模式)
*/
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
try {
System.out.println("work1-收到的消息是" + new String(delivery.getBody(),"UTF-8"));
Thread.sleep(1000);
//改成手动应答
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("work1-接收消息失败");
}
});
System.out.println("work1-开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
> Work2
```java
package com.geek.rabbitmq.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @ClassName Work2
* @Description TODO
* @Author Lambert
* @Date 2021/6/1 14:32
* @Version 1.0
**/
public class Work2 {
public static void main(String[] args) {
//1:创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2:设置连接属性
connectionFactory.setHost("10.1.53.169");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3:从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-work2");
//4:通过连接获取通道Channel
channel = connection.createChannel();
//5:定义接收消息的回调
final Channel finalChannel = channel;
//公平分发需要启动
//指标定义,qos=1指每次取多少条数据出来分给消费者,若为20某一消费者会一次性取20条,不定义时默认值为null是轮询分发
finalChannel.basicQos(1);
/**
* @params1:队列名
* @params2:true:自动应答(轮询模式)/false:手动应答(公平分发模式)
*/
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
try {
System.out.println("work2-收到的消息是" + new String(delivery.getBody(),"UTF-8"));
Thread.sleep(200);
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("work2-接收消息失败");
}
});
System.out.println("work2-开始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
//7:关闭连接
if(channel != null && channel.isOpen()){
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
//8:关闭通道
if(connection != null && connection.isOpen()){
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
```
启动Work1、Work2 生产者发送20条消息,由于Thread.sleep控制work2每200毫秒执行一次,work1每1000毫秒执行一次 所以work2会执行的更多一些


# 十八、RabbitMQ使用场景
## 01、解耦、削峰、异步
### 01-1、同步异步的问题(串行)
串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。

```java
public void makeOrder(){
//1:保存订单
orderService.saveOrder();
//2:发送短信服务
messageService.sendSMS("order");
//3:发送email服务
emailService.sendEmail("order");
//4:发送APP服务
appService.sendApp("order");
}
```
### 01-2 并行方式 异步线程池
并行方式:把订单信息写入数据库后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

```java
public class Order {
public void makeOrder(){
//1:保存订单
orderService.saveOrder();
//相关发送
relationMessage();
}
public void relationMessage(){
//异步
theadpool.submit(new Callable