# SpringBootRocketMQ
**Repository Path**: ZHAOB/SpringBootRocketMQ
## Basic Information
- **Project Name**: SpringBootRocketMQ
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 0
- **Created**: 2017-02-05
- **Last Updated**: 2020-12-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# RocketMQ
## 1. 下载
[rocketMQ下载地址][1]
## 2. 安装
解压即可使用
## 3. 启动服务
### 3.1 启动 NameServer
```
start/b bin/mqnamesrv.exe >D:\logs/mqnamesrv.log
```
### 3.2 启动 broker
```
start/b bin/mqbroker.exe -n "127.0.0.1:9876">D:\logs/mqbroker.log
```
注:也可以直接双击 exe 文件进行启动
# 项目实例
## pom
```
com.alibaba.rocketmq
rocketmq-client
3.2.6
com.alibaba.rocketmq
rocketmq-all
3.2.6
pom
```
## 生产者
```
package com.example.producer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
/**
* @Author: zhaoben
* @Date: 2017/2/5 10:53
* @Desc: 生产者
*/
public class Producer {
public static void main(String[] args) throws MQClientException{
/*
* 一个应用创建一个 Producer,由应用来维护此对象,可以设置为全局对象或者单例对象
* 注意:ProducerGroupName 需要由应用来保证唯一
* ProducerGroup 这个概念发送普通消息时,作用不大,但是发送分布式消息时,比较关键
* 因为服务器会回查这个 Group 下的 Producer
* */
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("172.17.147.18:9876");
producer.setInstanceName("Producer");
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
for(int i=0; i<100; i++){
try{
{
Message msg = new Message("TopicTest1", // topic
"TagA", // tag
"OrderID001", // key
("Hello World".getBytes()) // body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest2",// topic
"TagB",// tag
"OrderID0034",// key
("Hello World").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest3",// topic
"TagC",// tag
"OrderID061",// key
("Hello World").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
*/
producer.shutdown();
}
}
```
## 消费者
```
package com.example.consumer;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @Author: zhaoben
* @Date: 2017/2/5 10:54
* @Desc: 消费者
*/
public class Consumer {
public static void main(String[] args) throws InternalError, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("172.17.147.18:8976");
consumer.setInstanceName("Consumber");
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("TopicTest2", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest1")) {
// 执行TopicTest1的消费逻辑
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 执行TagA的消费
System.out.println(new String(msg.getBody()));
} else if (msg.getTags() != null
&& msg.getTags().equals("TagC")) {
// 执行TagC的消费
} else if (msg.getTags() != null
&& msg.getTags().equals("TagD")) {
// 执行TagD的消费
}
} else if (msg.getTopic().equals("TopicTest2")) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
```
## 源码地址
[1]: https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz