# cloud-zk-rocketmq
**Repository Path**: ly-springcloud/cloud-zk-rocketmq
## Basic Information
- **Project Name**: cloud-zk-rocketmq
- **Description**: SpringCloud + ZK + RocketMQ
- **Primary Language**: Java
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2021-12-02
- **Last Updated**: 2025-04-29
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# cloud-zk-rocketmq
#### 介绍
SpringCloud + ZK + RocketMQ简单demo
#### 软件架构
微服务注册中心为ZK,使用Feign作为分布式事务之间的消息传送,消息中间件为RocketMQ
#### 安装教程
1. 下载rocketmq
windows下安装消息服务器,这里使用4.9.2版本,[下载地址](https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip),下载后解压到没有空格的目录下,并设置环境变量

2. 启动rocketmq
a.进入到bin目录下,使用cmd命令 start mqnamesrv.cmd ,最终执行的是 runserver.cmd 这个文件,如果报报类找不到的错误,编辑该文件(runserver.cmd),将 %JAVA_HOME% 和%CLASSPATH% 都使用英文双引号 "%JAVA_HOME%"

b.进入到bin目录下,使用cmd命令 start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true 启动 mqbroker.cmd,最终执行的是 runbroker.cmd 这个文件,如果报类找不到的错误,编辑该文件(runbroker.cmd),将 %JAVA_HOME% 和%CLASSPATH% 都使用英文双引号

3. 下载可视化控制台
a.下载软件
[下载地址](https://github.com/apache/rocketmq-externals), 切换分支到release-rocketmq-console-1.0.0,[最终下载地址](https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0),打包下载
b.解压软件
进入到resources文件目录下application.properties文件 更改配置文件 插件端口号,和mq服务器地址

将rocktmq-console打成jar包,得到rocketmq-console-ng-1.0.0.jar
```
mvn clean package -Dmaven.test.skip=true
```
也可以不改配置文件,在启动jar包时指定插件端口号和mq服务地址
```
java -jar rocketmq-console-ng-1.0.0.jar --server.port=9870 --rocketmq.config.namesrvAddr=127.0.0.1:9876
```

#### 使用说明
本demo模拟简单的消息发送和监听,cloud-rocketmq-order为订单项目程序,sql包下有对应的数据库,cloud-rocketmq-storage为仓储项目程序,流程为:用户创建订单时,通过将订单的信息发送给消息中间件,由消息中间件的消费者监听消息,调用远程仓储服务,完成订单-仓储的扣减过程。
1. 引入依赖
```
org.apache.rocketmq
rocketmq-spring-boot-starter
2.1.1
```
2. application-local.yml配置文件配置rocketmq功能
```
#rocket配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: my-group
```
3. 消息生产者
```
public void create(OrderCreateDto createDto) {
log.info("---------->开始交易");
//创建订单
log.info("创建订单开始!");
this.save(createDto);
log.info("创建订单结束!");
/**
* 消息生产者
* 扣减库存交给消息中间件,发送异步消息
* my-topic 为主题,自定义,但是必须保证和消费者保持一致
*/
//扣减库存
log.info("扣减库存开始");
rocketMQTemplate.syncSend("my-topic", JSON.toJSONString(new DecreaseStorageDto(createDto.getProductId(),createDto.getCount())));
log.info("扣减库存结束");
log.info("交易结束!");
}
```
4. 消息消费者
```
/**
* @author 蚂蚁会花呗
* @date 2021/12/2 15:28
* 扣减库存消息监听,消费者
*/
@Component
@RocketMQMessageListener(topic = "my-topic",consumerGroup = "my-group")
@Slf4j
public class StorageConsumer implements RocketMQListener {
@Autowired
private StorageFeignService storageFeignService;
@Override
public void onMessage(String s) {
log.info("消费消息:" + s);
/**
* 消费消息,远程调用库存系统
*/
storageFeignService.decreaseStorage(JSON.toJavaObject(JSON.parseObject(s), DecreaseStorageDto.class));
}
}
```