# SpringBootRocketMQ **Repository Path**: ZHAOB/SpringBootRocketMQ ## Basic Information - **Project Name**: SpringBootRocketMQ - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2017-02-05 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # RocketMQ ## 1. 下载 [rocketMQ下载地址][1] ## 2. 安装 解压即可使用 ## 3. 启动服务 ### 3.1 启动 NameServer ``` start/b bin/mqnamesrv.exe >D:\logs/mqnamesrv.log ``` ### 3.2 启动 broker ``` start/b bin/mqbroker.exe -n "127.0.0.1:9876">D:\logs/mqbroker.log ``` 注:也可以直接双击 exe 文件进行启动 # 项目实例 ## pom ``` com.alibaba.rocketmq rocketmq-client 3.2.6 com.alibaba.rocketmq rocketmq-all 3.2.6 pom ``` ## 生产者 ``` package com.example.producer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; /** * @Author: zhaoben * @Date: 2017/2/5 10:53 * @Desc: 生产者 */ public class Producer { public static void main(String[] args) throws MQClientException{ /* * 一个应用创建一个 Producer,由应用来维护此对象,可以设置为全局对象或者单例对象 * 注意:ProducerGroupName 需要由应用来保证唯一 * ProducerGroup 这个概念发送普通消息时,作用不大,但是发送分布式消息时,比较关键 * 因为服务器会回查这个 Group 下的 Producer * */ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("172.17.147.18:9876"); producer.setInstanceName("Producer"); /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可 * 注意:切记不可以在每次发送消息时,都调用start方法 */ producer.start(); for(int i=0; i<100; i++){ try{ { Message msg = new Message("TopicTest1", // topic "TagA", // tag "OrderID001", // key ("Hello World".getBytes()) // body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest2",// topic "TagB",// tag "OrderID0034",// key ("Hello World").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest3",// topic "TagC",// tag "OrderID061",// key ("Hello World").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } }catch (Exception e){ e.printStackTrace(); } } /** * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法 */ producer.shutdown(); } } ``` ## 消费者 ``` package com.example.consumer; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * @Author: zhaoben * @Date: 2017/2/5 10:54 * @Desc: 消费者 */ public class Consumer { public static void main(String[] args) throws InternalError, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("172.17.147.18:8976"); consumer.setInstanceName("Consumber"); /** * 订阅指定topic下tags分别等于TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); /** * 订阅指定topic下所有消息 * 注意:一个consumer对象可以订阅多个topic */ consumer.subscribe("TopicTest2", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 */ @Override public ConsumeConcurrentlyStatus consumeMessage( List msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 执行TopicTest1的消费逻辑 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 执行TagA的消费 System.out.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 执行TagC的消费 } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 执行TagD的消费 } } else if (msg.getTopic().equals("TopicTest2")) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer对象在使用之前必须要调用start初始化,初始化一次即可 */ consumer.start(); System.out.println("Consumer Started."); } } ``` ## 源码地址 [1]: https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz