# gallop-mq-spring-boot-starter
**Repository Path**: gallop-project/gallop-mq-spring-boot-starter
## Basic Information
- **Project Name**: gallop-mq-spring-boot-starter
- **Description**: 提供微服务MQ支持。
- **Primary Language**: Java
- **License**: MIT
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 1
- **Created**: 2022-10-27
- **Last Updated**: 2022-12-05
## Categories & Tags
**Categories**: Uncategorized
**Tags**: SpringCloud, mq, SpringBoot, starter
## README
# gallop-mq-spring-boot-starter
### 介绍
提供MQ通信支持的微服务组件,在微服务节点中引入此starter以扩展消息队列通信能力。
支持功能:
* 消息按`topic`异步发送;
* 消息按`topic`订阅;
* 基于消息队列的远程调用(rpc模式)
支持消息队列:
* `nats`: [NATS](https://nats.io/)
* `rabbit_mq`: [RabbitMQ](https://www.rabbitmq.com/)
* `active_mq`: [ActiveMQ](http://activemq.apache.org/)
* `kafka`: [Kafka](https://kafka.apache.org/)
* `custom`: 自定义MQ操作实现
### 开始使用
#### 引入
```xml
com.gallop
gallop-mq-spring-boot-starter
1.0.0
```
#### 配置
```yaml
gallop:
mq:
# 可不填scheme直接使用格式[ip:port],MQ集群时支持填写多个服务地址,用,分隔
# scheme:
# nats=nats://
# rabbitMQ/ActiveMQ= amqp://
# ActiveMQ 支持:amqp/mqtt/udp/http/nio/tcp... (参考 https://activemq.apache.org/configuring-transports.html)
# 组件会为地址默认追加failover
# kafka= ip1:port1,ip2:port2...
hosts: amqp://192.168.10.218:5672
# 连接MQ的用户名
user: guest
# 连接MQ的密码
secret: guest
# 将要使用的MQ类型
type: rabbit_mq
# 消息编解码器
codec: gson
# rpc调用默认超时(毫秒)。默认15s,<=0时无超时,阻塞等待直至响应
request-timeout: 20000
# 断线重连时间间隔(毫秒)。默认5s
reconnect-interval: 5000
```
#### 发送消息
```java
import com.gallop.mq.MQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Component
public class ProducerService {
@Autowired
private MQTemplate mqTemplate;
/**
* 异步发送消息到指定topic
* @param msg
*/
public void send(FooBarMessage msg) {
mqTemplate.send(msg, "FOO_BAR_TOPIC_SEND");
}
/**
* 发起RPC调用,失败时或无响应返回null
* @param msg 将要发送的消息
* @return 远程响应
*/
public FooBarResponse request(FooBarMessage msg) {
return mqTemplate.request(msg, "FOO_BAR_TOPIC_REQUEST1");
}
/**
* 发起RPC调用,使用函数式响应
* @param msg 请求消息
* @param onResponse 响应方法
*/
public void request(FooBarMessage msg, Consumer onResponse) {
mqTemplate.req(msg, "FOO_BAR_TOPIC_REQUEST2", onResponse,
// onError非必需
(msg, exception) -> exception.printStackTrace()
);
}
}
```
#### 监听消息
```java
import com.gallop.mq.annotation.MQMessageListener;
import com.gallop.mq.core.dispatcher.DispatchRpcCallable;
import org.springframework.stereotype.Component;
// 在类上增加注解,启动时扫描此bean
@MQMessageListener
@Component
public class FooBarMessageListener {
/**
* 监听消息send
* @param sent 目前必须放在参数第一位
*/
@MQMessageListener(topic = "FOO_BAR_TOPIC_SEND")
public void onMessage(FooBarMessage sent) {
// 响应mqTemplate#send
}
/**
* 响应RPC请求,并返回结果(MQTemplate会提供默认的响应逻辑)
* @param request 请求,目前必须放在参数第一位
* @return 响应结果
*/
@MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST1")
public FooBarResponse response(FooBarMessage request) {
// 响应mqTemplate#request
return new FooBarResponse();
}
/**
* 响应RPC请求,自定义RPC响应方式
* @param request RPC请求信息
* @param customCallback 自定义响应方式
* @return 响应结果
*/
@MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST_FUNC")
public FooBarResponse responseFunction(FooBarMessage request, DispatchRpcCallable customCallback) {
// 响应mqTemplate#request
return new FooBarResponse();
}
}
```
### 拦截器
作用:在消息到达服务节点的分发前后触发;
在注解`MQMessageListener`中填写实现了`MQListenerInterceptor`接口的`bean class`。按声明顺序调用。
例:
```java
import com.gallop.mq.core.MQListenerInterceptor;
import org.springframework.stereotype.Component;
@Component
public class Test {
@MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST_FUNC",
interceptor = {A.class, B.class, C.class})
public void onMessage(FooBarMessage sent) {
//
}
public static class A implements MQListenerInterceptor {
// ...
}
public static class B implements MQListenerInterceptor {
// ...
}
public static class C implements MQListenerInterceptor {
// ...
}
}
```
| 方法 | 描述 |
|-----------------------------------------------|-----------------------------------------|
| beforeInvoke(Object message) | 消息分发前调用,参数为收到的MQ消息 |
| afterInvoke(Object message,Object result) | 消息分发处理后调用,参数message: 收到的消息;result:返回的结果 |
| onError(Object message, Exception exception) | 消息分发时出现异常触发 |
### 自定义MQ实现
#### 自定义MQTemplate
1. 配置MQ类型`gallop.mq.type`选择`custom`;
2. 提供实现`MQTemplateInitializer`接口的`spring bean`;
3. 在`MQTemplateInitializer`实现类的`init`方法中提供实例化并返回自定义`MQTemplate`;
例:
```java
import com.gallop.mq.MQTemplate;
import com.gallop.mq.core.MQTemplateInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CustomConfigExample {
@Bean
public MQTemplateInitializer customInitializer() {
// 参数: MQ配置,消息分发器,消息编解码器,MQ工作线程池(可选)
return (properties, dispatcher, codec, executorProvider)
-> new CustomMQTemplate();
}
public static class CustomMQTemplate implements MQTemplate {
// 实现略
}
}
```
#### 自定义编解码器
提供实现`MQMessageCodec`接口的`spring bean`,以覆盖优先级更低的`gallop.mq.codec`配置指定的编解码器。
#### 自定义工作线程池
提供实现了`MQExecutorProvider`接口的`spring bean`,支持`FunctionalInterface`。
缺省使用 `corePoolSize = 当前核数 * 2 + 1` 的线程池
#### 自定义消息分发器
提供实现了`MQMessageDispatcher`接口的`spring bean`,以覆盖缺省设置。
#### 自定义监听
组件内除了扫描`MQMessageListener`注解注册成为消息监听以外,
还可以通过提供实现`ExtendListenerScanner`接口的`spring bean`批量注册监听容器;
与注解形式相比,此途径更偏向为运行时动态创建消息监听。
扫描注解获得的监听集合,与`ExtendListenerScanner`提供的监听集合都会加入`MQMessageDispatcher`消息分发器中。
### 可靠投递
...
### 里程碑
v1.0.0
* ~~支持Kafka~~
* ~~支持ActiveMQ~~
* ~~支持RabbitMQ~~
* ~~MQ消息监听统一封装处理器 MessageDeliveryHandlerRegistry~~
* ~~MQ具体监听器使用动态代理~~
* ~~RPC模式统一流程封装~~(取消)
* ~~DispatchContext增加correlationId,启用自定义回调~~
* ZMQ