# better-rocketmq
**Repository Path**: DemoMeng/better-rocketmq
## Basic Information
- **Project Name**: better-rocketmq
- **Description**: better-rocketmq
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2022-07-28
- **Last Updated**: 2022-09-08
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# rocketmq
RocketMQ是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的扩展性。RocketMQ是2012年阿里巴巴的第三代分布式消息中间件,
2016年11月21日,阿里巴巴向阿Apache软件基金会捐赠了RocketMQ,实现了面向世界开源。第二年2月20日,Apache软件基金会宣布Apache RocketMQ成为顶级项目。
## 核心概念
- Topic : 消息主题,一级消息类型,生产者向其发送消息
- 生产者 : 负责生产消息发送消息至 Topic
- 消费者 : 消息订阅者,负责从Topic接收消息,并且消费消息
- 消息 : 数据和属性的组合
- 消息属性 : 生产者可以为消息定义的属性,包含 Message Key 和 Tag
* Message Key : 消息业务的标识,唯一标识某个业务逻辑,可以根据设置的 Message Key进行查询
* Tag : 消息标签,二级消息类型,用来进一步区分某个 Topic下的消息分类,消费者可以通过 Tag对消息进行过滤
* Message ID : 消息的唯一标识,由RocketMQ服务自动生成
- Group : 一类生产者 或 消费者,这类生产者或消费者通产是产生或者消费同一类消息,并且消息发布或订阅逻辑一致
Group ID : group的id
- Name Server : 注册中心的角色,可以集群部署,负责管理 Broker的ip端口等元数据
- Broker : 负责处理消息、存储消息、转发消息。分为 MasterBroker、SlaveBroker,一个MasterBroker可以对应多个SlaveBroker
但是,一个SlaveBroker只能对应一个MasterBroker。
Broker启动后要完成一次将自己注册至Name Server的操作,长连接,每30s发送一条心跳(汇报topic路由信息)
- 分区 : 即Topic Partition,物理上的概念。每个Topic包含一个或多个分区。
## 应用场景
- 应用解耦 :
用户注册动作,调用用户系统,注册成功后,发送短信(通知系统)、增加积分(积分系统)、邀请分销(分销系统)等,均可使用消息队列来做异步操作。
若其中一个系统出现异常则无法完成注册流程。
- 流量削峰
秒杀场景下只允许队列的部分用户进行秒杀,避免过多请求打到应用。
- 顺序收发
* 全区顺序:对于同一个topic,所有的消息都需要按照先进先出(FIFO)的顺序,进行顺序发布、顺序消费
* 分区顺序:对于同一个topic,所有的消息根据 Sharding key进行区块分区,同一个分区的消息按照FIFO顺序进行消息发送、消息消费。可以保证一个消息被一个进行消费
- 分布式事务一致性
- 分布式缓存同步: 电商系统中,每个商品的架构都会事实变化,使用缓存技术也无法满足对商品价格访问的需求。
使用 RocketMQ的广播机制,一条消息可以被所有节点消息一次,相当于把最新的价格信息同步到了每台的机器上,取代了缓存的作用。
## 架构模型

- Name Server : 注册中心的角色,可以集群部署,负责管理 Broker的ip端口等元数据
- Broker : 负责处理消息、存储消息、转发消息。分为 MasterBroker、SlaveBroker,一个MasterBroker可以对应多个SlaveBroker
但是,一个SlaveBroker只能对应一个MasterBroker。
Broker启动后要完成一次将自己注册至Name Server的操作,长连接,每30s发送一条心跳(汇报topic路由信息)
- 消息生产者:与Name Server中的其中一个节点(随机)建立连接 (长连接),定期从 Name server中读取 Topic路由信息,并向提供Topic服务的Master Broker建立长连接,定时向 Master Broker发送心跳包
- 消息消费者:与Name Server中的其中一个节点(随机)建立连接 (长连接),定期从 Name server中读取 Topic路由信息,并向提供Topic服务的Master Broker、 Slave Broker建立长连接,定时向 Master Broker、Slave Broker发送心跳包
消息消费者既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅规则由Broker配置决定。
## 普通消息发送的三种方式

- 同步发送 (Sync) : 指的消息发送方发出一条消息后,会收到RocketMQ服务端同步响应之后才发下一条消息的通讯方式。
* 应用场景: 重要的邮件通知、报名短信通知、营销短信系统等

````
SendResult sendResult = rocketMQTemplate.syncSend("broker-a", message);
````
- 异步发送 (Async) : 指的发送方发送消息后,不等RocketMQ服务端响应,就能发下一条消息的通讯方式。
* 应用场景: 异步发送一般用于链路耗时较长、对于响应时间较为敏感的场景,比如: 视频上传后通知启动转码服务,转码完成后推送转码通知等。

````
rocketMQTemplate.asyncSend("broker-a", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步发送消息结果:{}",sendResult);
}
@Override
public void onException(Throwable throwable) {
log.info("异步发送消息异常:{}",throwable);
}
});
````
- 单向发送 (Oneway): 发送方只负责发送消息,不等服务端返回响应且没有回调函数出发,即只发送请求不等待应答,此方式耗时非常短,一般在微秒级别
* 应用场景: 适用于耗时非常短,但是对可靠型要求不高的场景 :比如 : 日志收集
````
rocketMQTemplate.sendOneWay("broker-a", message);
````

## 定时消息和延迟消息
定时消息:Producer将消息发送到RocketMQ服务端,但是不希望立马投递这条消息,而是推迟到当前时间后的后某一个时间再投递给Consumer进行消费。
延迟消息:Producer将消息发送到RocketMQ服务端,但是不希望立马投递这条消息,而是延迟一定时间后再投递给Consumer进行消费。
使用场景: 购物车的商品 30分钟失效、订单送到后7天内自动确认收货
## 顺序消息
顺序消息常用于金融证券、电商业务对于消息指令顺序有严格要求的场景。
顺序消息指的是消息队列的消息对于一个指定的Topic,消息严格按照先进先出(FIFO)的规则进行消息发布和消息消费。即先发布的先消费,后发布的后消费
顺序消息分为 分区顺序消息 和 全局顺序消息
- 全局顺序消息:对于同一个topic,所有的消息都需要按照先进先出(FIFO)的顺序,进行顺序发布、顺序消费
- 分区顺序消费:对于同一个topic,所有的消息根据 Sharding key进行区块分区,同一个分区的消息按照FIFO顺序进行消息发送、消息消费。可以保证一个消息被一个进行消费
## 集群消费 和 广播消费
消费者使用相同的Group ID属于同一个集群,同一个集群下的消费者消费逻辑必须一致(包含Tag的使用)。
- 集群消费 : 任意一条消息只要被消费者集群(相同的Group ID)中任意一个消费处理几个
* 适用场景: 适合消费端集群部署的情况下,每条消息只需要被处理一次的场景
- 广播消费 : 会将消息推送给集群内的的所有消费者,保证至少每个消费者都能消费一次
* 适用场景:适合消费端集群部署的情况下,每条消费都要被集群下的每个消费者处理的场景

## 消息过滤
-表达式过滤:
* Tag过滤:
消费者 :配置消息过滤规则,selectorType = SelectorType.TAG , selectorExpression = "tag1||tag2"
````
@Component
@RocketMQMessageListener(consumerGroup = "mengqizhang-group",
topic = "mengqizhang",
selectorExpression = "tag1||tag2",
selectorType = SelectorType.TAG)
@Slf4j
public class ConsumerFilter implements RocketMQListener {
@Override
public void onMessage(MessageExt message) {
String tag = message.getTags();
String msg = new String(message.getBody());
log.info("tag: {},msg :{} ",tag,msg);
}
}
````
生产者 :添加tag,使用topic:tag的方式
````
rocketMQTemplate.syncSend("topic:tag1",MessageBuilder.withPayload("消息过滤tag1").build());
rocketMQTemplate.syncSend("topic:tag2",MessageBuilder.withPayload("消息过滤tag2").build());
rocketMQTemplate.syncSend("topic:tag3",MessageBuilder.withPayload("消息过滤tag3").build());
````
* SQL过滤
- 表达式
````
数字比较,如>,>=,<,<=,BETWEEN,=;
字符比较,如:=,<>,IN;
IS NULL or IS NOT NULL;
逻辑运算符:AND, OR, NOT;
````
````
常量类型:
数值,如:123, 3.1415;
字符, 如:‘abc’, 必须使用单引号;
NULL,特殊常量
Boolean, TRUE or FALSE;
````
- 生产者
````
rocketMQTemplate.convertAndSend("mengqizhang-sql-topic",MessageBuilder.withPayload("sql92过滤:mengqizhang在广州").build(),new HashMap(){{put("name","mengqizhang");put("address","guangzhou");}});
rocketMQTemplate.convertAndSend("mengqizhang-sql-topic",MessageBuilder.withPayload("sql92过滤:mengqizhang在杭州").build(),new HashMap(){{put("name","mengqizhang");put("address","hangzhou");}});
rocketMQTemplate.convertAndSend("mengqizhang-sql-topic",MessageBuilder.withPayload("sql92过滤:mengqizhang在赣州").build(),new HashMap(){{put("name","mengqizhang");put("address","ganzhou");}});
rocketMQTemplate.convertAndSend("mengqizhang-sql-topic",MessageBuilder.withPayload("sql92过滤:mengqizhang在珠海").build(),new HashMap(){{put("name","mengqizhang");put("address","zhuhai");}});
````
- 消费者
selectorType:指明了消息过滤使用SQL92方式
selectorExpression:指明了只能接收消息属性(header)中a=1的消息, 默认值*,代表全部
messageModel:指明了消息消费的模式,
默认值为MessageModel.CLUSTERING(每条消息只能有一个消费者进行消费);
MessageModel.BROADCASTING(广播消息,所有订阅者都能收到消息)
````
@Component
@RocketMQMessageListener(consumerGroup = "mengqizhang-sql-group",
topic = "mengqizhang-sql-topic",
messageModel = MessageModel.BROADCASTING,
selectorExpression = "name= 'mengqizhang' AND address = 'hangzhou' ",
selectorType = SelectorType.SQL92)
@Slf4j
public class ConsumerFilterSQL implements RocketMQListener {
@Override
public void onMessage(MessageExt message) {
String tag = message.getTags();
String msg = new String(message.getBody());
log.info("sql filter : {},msg :{} ",tag,msg);
}
}
````
问题:
````
Caused by: java.lang.IllegalStateException: Failed to start RocketMQ push consumer
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start(DefaultRocketMQListenerContainer.java:281) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
at org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration.registerContainer(ListenerContainerConfiguration.java:120) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
... 13 common frames omitted
Caused by: org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at org.apache.rocketmq.client.impl.MQClientAPIImpl.checkClientInBroker(MQClientAPIImpl.java:2240) ~[rocketmq-client-4.7.1.jar:4.7.1]
at org.apache.rocketmq.client.impl.factory.MQClientInstance.checkClientInBroker(MQClientInstance.java:449) ~[rocketmq-client-4.7.1.jar:4.7.1]
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:654) ~[rocketmq-client-4.7.1.jar:4.7.1]
at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:698) ~[rocketmq-client-4.7.1.jar:4.7.1]
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start(DefaultRocketMQListenerContainer.java:279) ~[rocketmq-spring-boot-2.1.1.jar:2.1.1]
... 14 common frames omitted
````
解决 : 默认情况下broke没有开启对SQL语法的支持,需要修改配置,配置 broker.conf
````
enablePropertyFilter=true
````
# RocketMQ

server : 保存broker的信息,ip,端口,等元数据信息(类似注册中心)
broker : 实际处理消息
## 启动 namesrv
nohup sh bin/mqnamesrv -n 192.168.2.11:9876 > server.out &
## 启动 broker
nohup sh bin/mqbroker ‐n 192.168.2.11:9876 autoCreateTopicEnable=true ‐c conf/broker.conf > broker.out &
- conf/broker.conf
编辑broker配置文件 conf/broker.conf
````
namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.2.11
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
````
## 停止 namesrv 和 broker
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
## 测试启动消息生产者
export NAMESRV_ADDR="localhost:9876"
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
## 控制面板
docker pull apacherocketmq/rocketmq-dashboard:latest
docker run -d --name rocketmq-dashboard-MQZ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.2.11:9876" -p 8080:8080 -t apacherocketmq/rocketmq-dashboard:latest
## docker 启动
````
version: '3.5'
services:
rmqnamesrv:
image: rocketmqinc/rocketmq
container_name: rmqnamesrv
restart: always
ports:
- 9876:9876
environment:
JAVA_OPT_EXT: "-server -Xms1g -Xmx1g"
volumes:
- ./logs:/root/logs
command: sh mqnamesrv
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: rocketmqinc/rocketmq
container_name: rmqbroker
restart: always
depends_on:
- rmqnamesrv
ports:
- 10909:10909
- 10911:10911
volumes:
- ./logs:/root/logs
- ./store:/root/store
- /root/mqz/rocketmq/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
command: sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPT_EXT: "-server -Xms1g -Xmx1g -Xmn1g"
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rocketmq-console
restart: always
ports:
- 8080:8080
depends_on:
- rmqnamesrv
volumes:
- /etc/localtime:/etc/localtime:ro
- /home/rocketmq/console/logs:/root/logs
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
````