# gallop-mq-spring-boot-starter **Repository Path**: gallop-project/gallop-mq-spring-boot-starter ## Basic Information - **Project Name**: gallop-mq-spring-boot-starter - **Description**: 提供微服务MQ支持。 - **Primary Language**: Java - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2022-10-27 - **Last Updated**: 2022-12-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: SpringCloud, mq, SpringBoot, starter ## README # gallop-mq-spring-boot-starter

license jdk springcloud springboot release

### 介绍 提供MQ通信支持的微服务组件,在微服务节点中引入此starter以扩展消息队列通信能力。 支持功能: * 消息按`topic`异步发送; * 消息按`topic`订阅; * 基于消息队列的远程调用(rpc模式) 支持消息队列: * `nats`: [NATS](https://nats.io/) * `rabbit_mq`: [RabbitMQ](https://www.rabbitmq.com/) * `active_mq`: [ActiveMQ](http://activemq.apache.org/) * `kafka`: [Kafka](https://kafka.apache.org/) * `custom`: 自定义MQ操作实现 ### 开始使用 #### 引入 ```xml com.gallop gallop-mq-spring-boot-starter 1.0.0 ``` #### 配置 ```yaml gallop: mq: # 可不填scheme直接使用格式[ip:port],MQ集群时支持填写多个服务地址,用,分隔 # scheme: # nats=nats:// # rabbitMQ/ActiveMQ= amqp:// # ActiveMQ 支持:amqp/mqtt/udp/http/nio/tcp... (参考 https://activemq.apache.org/configuring-transports.html) # 组件会为地址默认追加failover # kafka= ip1:port1,ip2:port2... hosts: amqp://192.168.10.218:5672 # 连接MQ的用户名 user: guest # 连接MQ的密码 secret: guest # 将要使用的MQ类型 type: rabbit_mq # 消息编解码器 codec: gson # rpc调用默认超时(毫秒)。默认15s,<=0时无超时,阻塞等待直至响应 request-timeout: 20000 # 断线重连时间间隔(毫秒)。默认5s reconnect-interval: 5000 ``` #### 发送消息 ```java import com.gallop.mq.MQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.function.Consumer; @Component public class ProducerService { @Autowired private MQTemplate mqTemplate; /** * 异步发送消息到指定topic * @param msg */ public void send(FooBarMessage msg) { mqTemplate.send(msg, "FOO_BAR_TOPIC_SEND"); } /** * 发起RPC调用,失败时或无响应返回null * @param msg 将要发送的消息 * @return 远程响应 */ public FooBarResponse request(FooBarMessage msg) { return mqTemplate.request(msg, "FOO_BAR_TOPIC_REQUEST1"); } /** * 发起RPC调用,使用函数式响应 * @param msg 请求消息 * @param onResponse 响应方法 */ public void request(FooBarMessage msg, Consumer onResponse) { mqTemplate.req(msg, "FOO_BAR_TOPIC_REQUEST2", onResponse, // onError非必需 (msg, exception) -> exception.printStackTrace() ); } } ``` #### 监听消息 ```java import com.gallop.mq.annotation.MQMessageListener; import com.gallop.mq.core.dispatcher.DispatchRpcCallable; import org.springframework.stereotype.Component; // 在类上增加注解,启动时扫描此bean @MQMessageListener @Component public class FooBarMessageListener { /** * 监听消息send * @param sent 目前必须放在参数第一位 */ @MQMessageListener(topic = "FOO_BAR_TOPIC_SEND") public void onMessage(FooBarMessage sent) { // 响应mqTemplate#send } /** * 响应RPC请求,并返回结果(MQTemplate会提供默认的响应逻辑) * @param request 请求,目前必须放在参数第一位 * @return 响应结果 */ @MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST1") public FooBarResponse response(FooBarMessage request) { // 响应mqTemplate#request return new FooBarResponse(); } /** * 响应RPC请求,自定义RPC响应方式 * @param request RPC请求信息 * @param customCallback 自定义响应方式 * @return 响应结果 */ @MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST_FUNC") public FooBarResponse responseFunction(FooBarMessage request, DispatchRpcCallable customCallback) { // 响应mqTemplate#request return new FooBarResponse(); } } ``` ### 拦截器 作用:在消息到达服务节点的分发前后触发; 在注解`MQMessageListener`中填写实现了`MQListenerInterceptor`接口的`bean class`。按声明顺序调用。 例: ```java import com.gallop.mq.core.MQListenerInterceptor; import org.springframework.stereotype.Component; @Component public class Test { @MQMessageListener(topic = "FOO_BAR_TOPIC_REQUEST_FUNC", interceptor = {A.class, B.class, C.class}) public void onMessage(FooBarMessage sent) { // } public static class A implements MQListenerInterceptor { // ... } public static class B implements MQListenerInterceptor { // ... } public static class C implements MQListenerInterceptor { // ... } } ``` | 方法 | 描述 | |-----------------------------------------------|-----------------------------------------| | beforeInvoke(Object message) | 消息分发前调用,参数为收到的MQ消息 | | afterInvoke(Object message,Object result) | 消息分发处理后调用,参数message: 收到的消息;result:返回的结果 | | onError(Object message, Exception exception) | 消息分发时出现异常触发 | ### 自定义MQ实现 #### 自定义MQTemplate 1. 配置MQ类型`gallop.mq.type`选择`custom`; 2. 提供实现`MQTemplateInitializer`接口的`spring bean`; 3. 在`MQTemplateInitializer`实现类的`init`方法中提供实例化并返回自定义`MQTemplate`; 例: ```java import com.gallop.mq.MQTemplate; import com.gallop.mq.core.MQTemplateInitializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class CustomConfigExample { @Bean public MQTemplateInitializer customInitializer() { // 参数: MQ配置,消息分发器,消息编解码器,MQ工作线程池(可选) return (properties, dispatcher, codec, executorProvider) -> new CustomMQTemplate(); } public static class CustomMQTemplate implements MQTemplate { // 实现略 } } ``` #### 自定义编解码器 提供实现`MQMessageCodec`接口的`spring bean`,以覆盖优先级更低的`gallop.mq.codec`配置指定的编解码器。 #### 自定义工作线程池 提供实现了`MQExecutorProvider`接口的`spring bean`,支持`FunctionalInterface`。 缺省使用 `corePoolSize = 当前核数 * 2 + 1` 的线程池 #### 自定义消息分发器 提供实现了`MQMessageDispatcher`接口的`spring bean`,以覆盖缺省设置。 #### 自定义监听 组件内除了扫描`MQMessageListener`注解注册成为消息监听以外, 还可以通过提供实现`ExtendListenerScanner`接口的`spring bean`批量注册监听容器; 与注解形式相比,此途径更偏向为运行时动态创建消息监听。 扫描注解获得的监听集合,与`ExtendListenerScanner`提供的监听集合都会加入`MQMessageDispatcher`消息分发器中。 ### 可靠投递 ... ### 里程碑 v1.0.0 * ~~支持Kafka~~ * ~~支持ActiveMQ~~ * ~~支持RabbitMQ~~ * ~~MQ消息监听统一封装处理器 MessageDeliveryHandlerRegistry~~ * ~~MQ具体监听器使用动态代理~~ * ~~RPC模式统一流程封装~~(取消) * ~~DispatchContext增加correlationId,启用自定义回调~~ * ZMQ