# RabbitMQ_learn
**Repository Path**: java_learning2/RabbitMQ_learn
## Basic Information
- **Project Name**: RabbitMQ_learn
- **Description**: RabbitMQ学习代码
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 1
- **Created**: 2020-03-17
- **Last Updated**: 2021-05-27
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# RabbitMQ笔记
## RabbitMQ安装
```
#方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
#方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
```
## 主页分类
Overview: 概览
Connections:连接
Channels: 管道
Exchanges: 交换机
Queues:队列
Admin:管理
## 简单队列
### pom依赖
```xml
4.0.0
org.example
RabbitMq
1.0-SNAPSHOT
RabbitMq
http://www.example.com
UTF-8
1.7
1.7
com.rabbitmq
amqp-client
4.5.0
org.slf4j
slf4j-log4j12
1.7.25
org.apache.commons
commons-lang3
3.3.2
junit
junit
4.11
test
maven-clean-plugin
3.1.0
maven-resources-plugin
3.0.2
maven-compiler-plugin
3.8.0
maven-surefire-plugin
2.22.1
maven-jar-plugin
3.0.2
maven-install-plugin
2.5.2
maven-deploy-plugin
2.8.2
maven-site-plugin
3.7.1
maven-project-info-reports-plugin
3.0.0
```
### 配置类
```java
package org.example.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnexitionUtil {
public static Connection getConnection() throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("fangyulong.top");//设置server的地址
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory.newConnection();
}
}
```
### 生产者
```java
package org.example.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.util.ConnexitionUtil;
public class Sender {
private final static String QUEUE = "test-hello";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnexitionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列(队列名字 ,是否持久化队列默认在内存中,是否排外,是否自动删除,其他参数)
channel.queueDeclare(QUEUE, false, false, false, null);
//发送内容
channel.basicPublish("", QUEUE, null, "发送的消息".getBytes());
//关闭连接
channel.close();
connection.close();
}
}
```
### 消费者
```java
package org.example.hello;
import com.rabbitmq.client.*;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
import java.util.Queue;
public class Recver {
private final static String QUEUE = "test-hello";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnexitionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE, false, false, false, null);
DefaultConsumer consumer= new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("新api调用:"+msg);
}
};
//监听队列
channel.basicConsume(QUEUE,true,consumer);
// QueueingConsumer consumer = new QueueingConsumer(channel);
// //接收消息
// channel.basicConsume(QUEUE, true, consumer);
// //获取消息
// while (true) {
// QueueingConsumer.Delivery delivery = consumer.nextDelivery();//如果没有消息会等待,有的话获取销毁
// String message = new String((delivery.getBody()));
// System.out.println(message);
// }
//
}
}
```
## Work queues 工作队列
一个生产者->多个消费者
螺旋分发:两人数据一样
### 生产者
```java
package org.example.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.util.ConnexitionUtil;
public class send {
private final static String QUEUE = "test-work-queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnexitionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列(队列名字 ,是否持久化队列默认在内存中,是否排外,是否自动删除,其他参数)
channel.queueDeclare(QUEUE, false, false, false, null);
for (int i =0;i<50;i++){
String msg = "hello" + i;
channel.basicPublish("",QUEUE,null,msg.getBytes());
System.out.println("第"+i+"个");
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
```
### 消费者1
```java
package org.example.work;
import com.rabbitmq.client.*;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
public class recv1 {
private final static String QUEUE = "test-work-queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnexitionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE, false, false, false, null);
//定义消费者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[1] Recv msg :"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE,autoAck,consumer);
}
}
```
### 消费者2
```java
package org.example.work;
import com.rabbitmq.client.*;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
public class recv2 {
private final static String QUEUE = "test-work-queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnexitionUtil.getConnection();
//创建频道
Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE, false, false, false, null);
//定义消费者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[2] Recv msg :"+msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
}
}
};
boolean autoAck = true;
channel.basicConsume(QUEUE,autoAck,consumer);
}
}
```
### 公平分发Fail dispatch
使用公平分发必须关闭自动应答ack改成手动
#### 生产者
```java
package org.example.workfile;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.util.ConnexitionUtil;
public class send {
private final static String QUEUE = "test-work-queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnexitionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列(队列名字 ,是否持久化队列默认在内存中,是否排外,是否自动删除,其他参数)
channel.queueDeclare(QUEUE, false, false, false, null);
//每个消费者 发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
//限制发给一个人不能超过一个
int prefetchCount =1;
channel.basicQos(prefetchCount);
for (int i =0;i<50;i++){
String msg = "hello" + i;
channel.basicPublish("",QUEUE,null,msg.getBytes());
System.out.println("第"+i+"个");
Thread.sleep(i*10);
}
channel.close();
connection.close();
}
}
```
#### 消费者
```java
package org.example.workfile;
import com.rabbitmq.client.*;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
public class recv2 {
private final static String QUEUE = "test-work-queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnexitionUtil.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE, false, false, false, null);
channel.basicQos(1);//确保每次发一个
//定义消费者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[2] Recv msg :"+msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
/**
*手动回执
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE,autoAck,consumer);
}
}
```
### 消息持久化
channel.queueDeclare(QUEUE, false, false, false, null);
第二个flase改为true
channel.queueDeclare(QUEUE, true, false, false, null);
## 订阅模式
使用交换机,里面的内容改变之后 向消费者提供数据
### 生产者
```java
package org.example.ps;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.util.ConnexitionUtil;
public class send {
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnexitionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//申明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分发类型
//发送消息
String msg="hello ps";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("Send: "+msg);
channel.close();
connection.close();
}
}
```
### 消费者
```java
package org.example.workfile;
import com.rabbitmq.client.*;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
public class recv1 {
private final static String QUEUE = "test-work-queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnexitionUtil.getConnection();
//创建频道
final Channel channel = connection.createChannel();
//队列声明
channel.queueDeclare(QUEUE, false, false, false, null);
channel.basicQos(1);//确保每次发一个
//定义消费者
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[1] Recv msg :"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
/**
*
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;//自动应答改成false
channel.basicConsume(QUEUE,autoAck,consumer);
}
}
```
## 路由模式
根据不同的级别 error info 等等 匹配到不同的消费者
### 生产者
```java
package org.example.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.util.ConnexitionUtil;
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws Exception {
Connection connection = ConnexitionUtil.getConnection();
Channel channel = connection.createChannel();
//exchange模式
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String msg = "hello direct";
String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("send"+msg);
channel.close();
connection.close();
}
}
```
### 消费者
```java
package org.example.routing;
import com.rabbitmq.client.*;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
public class recv1 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
private static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnexitionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.basicQos(1);
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[1] Recv msg :"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
/**
*
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;//自动应答改成false
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
```
## 主题模式
按照不同的主题把指定的数据发送到不同的消费者上面
### 生产者
```java
package org.example.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.util.ConnexitionUtil;
public class send {
private static final String EXCHANGE_NAME="test_exchange_topic";
public static void main(String[] args) throws Exception {
Connection connection = ConnexitionUtil.getConnection();
Channel channel = connection.createChannel();
//exchange
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String msgString = "商品....";
channel.basicPublish(EXCHANGE_NAME,"goods.add",null,msgString.getBytes());
System.out.println("---send"+msgString);
channel.close();
connection.close();
}
}
```
### 消费者
```java
package org.example.topic;
import com.rabbitmq.client.*;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
public class recv2 {
private static final String EXCHANGE_NAME = "test_exchange_topic";
private static final String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnexitionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");
channel.basicQos(1);
DefaultConsumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[2] Recv msg :"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
/**
*
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;//自动应答改成false
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
```
## 事务机制
```java
package org.example.tx;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.util.ConnexitionUtil;
public class TxSend {
private static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws Exception {
Connection connection = ConnexitionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msgString = "hello tx message";
try {
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msgString.getBytes());
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
System.out.println("发生错误,信息已经回滚");
}
channel.close();
connection.close();
}
}
```
```java
package org.example.tx;
import com.rabbitmq.client.*;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
public class teRecv {
private static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws Exception {
Connection connection = ConnexitionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("信息为"+new String(body.toString()));
}
});
}
}
```
## confirm 模式
- 普通 发一条 waitForConfirms()
- 批量 发一批 waitForConfirms
- 异步 confirm模式:提供一个回调方法
### 普通和批量
#### 生产者
```java
package org.example.confirm;
import com.rabbitmq.client.*;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
public class Send1 {
private static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws Exception {
Connection connection = ConnexitionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生产者调用confirmSelect 将channel设置为confirm模式
channel.confirmSelect();
String msgString = "Hello confirm message!";
channel.basicPublish("",QUEUE_NAME,null,msgString.getBytes());
if (!channel.waitForConfirms()){
System.out.println("消息错误");
}else {
System.out.println("消息成功");
}
channel.close();
connection.close();
}
}
```
```java
package org.example.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.util.ConnexitionUtil;
public class Send2 {
private static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws Exception {
Connection connection = ConnexitionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生产者调用confirmSelect 将channel设置为confirm模式
channel.confirmSelect();
/**
* 批量消息发送
*/
String msgString = "Hello confirm message!";
for (int i=0;i<20;i++){
channel.basicPublish("",QUEUE_NAME,null,msgString.getBytes());
}
if (!channel.waitForConfirms()){
System.out.println("消息错误");
}else {
System.out.println("消息成功");
}
channel.close();
connection.close();
}
}
```
#### 消费者
```java
package org.example.confirm;
import com.rabbitmq.client.*;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
public class Recv1 {
private static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws Exception {
Connection connection = ConnexitionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("信息为[confirm]"+new String(body.toString()));
}
});
}
}
```
### 异步
```java
package org.example.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import org.example.util.ConnexitionUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class Send3 {
private static final String QUEUE_NAME = "test_queue_confirm3";
public static void main(String[] args) throws Exception {
Connection connection = ConnexitionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//生产者调用confirmSelect 将channel设置为confirm模式
channel.confirmSelect();
final SortedSet confirmSet = Collections.synchronizedSortedSet(new TreeSet());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple){
System.out.println("handleNack----nultiple");
confirmSet.headSet(deliveryTag).clear();
}else {
System.out.println("--handleNack---multiple false");
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long l, boolean b) throws IOException {
if (b){
System.out.println("----handAck--multiple");
confirmSet.headSet(l+1).clear();
}else {
System.out.println("----handAck--multiple false");
confirmSet.remove(l)
}
}
});
String msgStr = "ssssss";
while (true){
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());
confirmSet.add(seqNo);
}
}
```