# BizEvents-Framework
**Repository Path**: javacoo/biz-events-framework
## Basic Information
- **Project Name**: BizEvents-Framework
- **Description**: No description available
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 0
- **Created**: 2021-11-08
- **Last Updated**: 2022-03-16
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## 业务领域事件框架(BizEvents-Framework)
[TOC]
事件驱动架构(Event Driven Architecture,EDA)一个事件驱动框架(EDA)定义了一个设计和实现一个应用系统的方法学,在这个系统里事件可传输于松散耦合的组件和服务之间。一个事件驱动系统典型地由事件消费者和事件产生者组成。事件消费者向事件管理器订阅事件,事件产生者向事件管理器发布事件。当事件管理器从事件产生者那接收到一个事件时,事件管理把这个事件转送给相应的事件消费者。如果这个事件消费者是不可用的,事件管理者将保留这个事件,一段间隔之后再次转送该事件消费者------百度百科
### 一,简介
- 业务领域事件框架是一个基于事件驱动架构的基础设施,是一个可以最大程度减少耦合度,很好地扩展与适配不同类型的服务组件。框架分为BizEvents-Framework和BizEvent-SDK两部分,分别扮演服务端和客户端组件的角色,集成BizEvents-Framework能够让应用拥有领域事件的生成,存储,发布,过滤与推送的能力。集成BizEvent-SDK能够让应用拥有订阅目标系统领域事件,并收到目标系统推送领域事件的能力。
- 业务领域事件框架源于我的另一篇文章《[基于RabbitMQ+XXLJob+EventBus的进件平台设计与实现](https://www.jianshu.com/p/02959b5634b9)》,当时由于一些原因,进件平台项目未能开发完成就暂停了,自己便做了一个阶段性项目结项,总结了项目的经验教训,于是便有了上面这篇文章。虽然项目暂停了,但是对进件平台的思考从未间断,结合工作中遇到的实际问题,发现的项目痛点与弊端,经过反复的分析,重构,验证,不断完善进件平台架构设计。并在此基础上采取剥离具体业务逻辑,抽象公共处理流程,引入事件机制等,重构进件平台,使其不再仅仅是一个进件平台,更是一个可以最大程度减少耦合度,很好地扩展与适配不同类型的服务组件,是一个基于事件驱动架构的基础设施...慢慢地便有了业务领域事件框架的雏形。
### 二,交互模型
业务系统发布业务领域事件,分为领域内事件和跨领域事件,领域内事件通过本地事件处理机制处理,跨领域事件通过Runtime首先由Register经过过滤和排序查找订阅了此事件的客户端,再由Stroe对事件进行存储,然后推送到MQ中指定客户端队列,再由MQ客户端通道消费端调用Remote,Remote将事件包装成约定好的协议,通过HTTP/RPC方式推送到对应的客户端,如果推送失败,则重试指定次数,如果任然失败,则后续由Task定时任务择机处理,客户端则通过集成BizEvent-SDK,实现服务端的接入,接收推送的事件通知。

### 三,核心特性与能力
- 基于事件驱动架构
事件驱动架构是一种用于设计应用的软件架构和模型。对于事件驱动系统而言,事件的捕获、通信、处理和持久保留是解决方案的核心结构。事件驱动架构可以最大程度减少耦合度,很好地扩展与适配不同类型的服务组件,因此是现代化分布式应用架构的理想之选。
- 基于插件化设计
基于[xkernel](https://www.jianshu.com/p/cc1f08c8aed6) 提供的SPI机制实现,扩展灵活,使用方便。
- 基于推送的消息传输机制
客户端无需轮询就可以接收更新,事件在到达事件存储后就会通知给客户端,客户端可以随时接收更新,这对于动态数据转换、分析和数据科学处理非常有用。
- 支持事件过滤
事件的分发与接收支持根据事件类型(eventType),事件主题(topic),事件版本(version)以及事件对象中字段值过滤。
### 四,软件架构说明
#### 1,BizEvents-Framework
- BizEvents-Framework是一个基于事件驱动架构的基础组件,事件驱动架构是一种用于设计应用的软件架构和模型。对于事件驱动系统而言,事件的捕获、通信、处理和持久保留是解决方案的核心结构,业务系统只需依赖BizEvents-Framework基础组件,便可拥有领域事件的生成,存储,发布与推送的能力。
- Runtime 整合了以插件化的形式运行的相关插件:

插件基于[xkernel](https://www.jianshu.com/p/cc1f08c8aed6) 提供的SPI机制实现,扩展非常方便,大致步骤如下:
- 实现插件扩展接口:
如实现注册插件接口,com.javacoo.events.register.api.service.BizEventRegistryService
```java
@Spi(value = BizEventRegistryConfig.DEFAULT_IMPL)
public interface BizEventRegistryService {
...
}
```
基于配置文件实现:com.javacoo.events.register.service.BizEventFileRegistryService
```java
@Slf4j
public class BizEventFileRegistryService implements BizEventRegistryService {
...
}
```
- 配置自定义插件扩展接口:

- 在项目resource目录新建包->META-INF->ext->internal。
- 创建以插件扩展接口类全局限定名命名的文件,文件内容:实现名称=实现类的全局限定名,如:
文件名=com.javacoo.events.register.api.service.BizEventRegistryService
内容:
```properties
file=com.javacoo.events.register.service.BizEventFileRegistryService
```
- 修改配置文件,添加如下内容:
```properties
#业务领域事件注册实现,默认内部实现
biz.event.registry.plugin.impl = file
```
------
1. **Stroe插件:**主要负责事件的存储,支持对接多种事件存储机制,默认实现:JPA
类结构如下:

系统第一次启动将创建务领域事件信息表,业务领域事件MQ异常信息表,业务领域事件任务表,插件通过业务领域事件持久化服务对外提供服务,关键代码如下:
JpaConfig
```java
/**
* JPA配置
*
* @author duanyong@jccfc.com
* @date 2021/11/13 17:02
*/
@Configuration
@ConfigurationProperties(prefix = JpaConfig.PREFIX)
@ConditionalOnProperty(prefix = JpaConfig.PREFIX, value = JpaConfig.ENABLED, matchIfMissing = true)
@EntityScan(basePackages = "com.javacoo.events.store.jpa.domain")
@EnableJpaRepositories(
basePackages = "com.javacoo.events.store.jpa.repository",
entityManagerFactoryRef = "entityManagerFactoryBean",
transactionManagerRef = "transactionManager")
@EnableTransactionManagement
public class JpaConfig {
public static final String PREFIX = "biz.event.store.plugin.jpa";
public static final String ENABLED = "enabled";
/**是否可用*/
private String enabled = ENABLED;
/**数据源*/
@Autowired
private DataSource dataSource;
/**jpa其他参数配置*/
@Autowired
private JpaProperties jpaProperties;
@Autowired
private EntityManagerFactoryBuilder factoryBuilder;
public String getEnabled() {
return enabled;
}
public void setEnabled(String enabled) {
this.enabled = enabled;
}
@Bean(name = "entityManagerFactoryBean")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryBean() {
return factoryBuilder.dataSource(dataSource)
//这一行的目的是加入jpa的其他配置参数比如(ddl-auto: update等)
//当然这个参数配置可以在事务配置的时候也可以
.properties(jpaProperties.getProperties())
.packages("com.javacoo.events.store.jpa.domain")
.persistenceUnit("persistenceUnit")
.build();
}
/**
* EntityManager
*
* @author duanyong@jccfc.com
* @date 2021/11/13 17:00
* @return: javax.persistence.EntityManager
*/
@Bean(name = "entityManager")
public EntityManager entityManager() {
return entityManagerFactoryBean().getObject().createEntityManager();
}
/**
* jpa事务管理
*
* @author duanyong@jccfc.com
* @date 2021/11/13 16:59
* @return: org.springframework.orm.jpa.JpaTransactionManager
*/
@Bean(name = "transactionManager")
public JpaTransactionManager transactionManager() {
JpaTransactionManager jpaTransactionManager = new JpaTransactionManager();
jpaTransactionManager.setEntityManagerFactory(entityManagerFactoryBean().getObject());
return jpaTransactionManager;
}
}
```
业务领域事件信息:
```java
/**
* 业务领域事件信息
*
* @author duanyong@jccfc.com
* @date 2021/11/13 10:35
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@DynamicUpdate
@Entity
@Table(name = "BIZ_EVENT_INFO",uniqueConstraints = @UniqueConstraint(columnNames= {"EVENT_GROUP", "BIZ_SERIAL_ID"}))
public class BizEventInfo {
@Id
@Column(name = "ID")
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 业务流水号
*/
@Column(name = "BIZ_SERIAL_ID",length = 64,nullable = false)
private String bizSerialId;
/**
* 事件类型
*/
@Column(name = "EVENT_TYPE",length = 200,nullable = false)
private String eventType;
/**
* 事件分组
*/
@Column(name = "EVENT_GROUP",length = 100,nullable = false)
private String eventGroup;
/**
* 事件主题
*/
@Column(name = "EVENT_TOPIC",length = 100)
private String eventTopic;
/**
* 事件版本
*/
@Column(name = "EVENT_VERSION",length = 10)
private String eventVersion;
/**
* 事件ID
*/
@Column(name = "EVENT_ID",length = 64,nullable = false)
private String eventId;
/**
* 状态:1->初始状态,2->就绪状态,3->推送到MQ成功,4->业务处理成功,5->业务处理失败
*/
@Column(name = "STATE",length = 2,nullable = false)
private String state;
/**
* 备注
*/
@Column(name = "REMARKS",length = 200)
private String remarks;
/**
* 数据有效性:0->无效,1->有效
*/
@Column(name = "STATUS",nullable = false)
private Boolean status;
/**
* 数据创建者
*/
@Column(name = "CREATED",length = 20,nullable = false)
private String created;
/**
* 数据创建时间
*/
@Column(name = "CREATED_DATE",nullable = false)
private Date createdDate;
/**
* 数据修改者
*/
@Column(name = "MODIFIED",length = 20)
private String modified;
/**
* 更新时间
*/
@Column(name = "MODIFIED_DATE")
private Date modifiedDate;
/**
* 事件内容
*/
@Column(name = "CONTENT",columnDefinition="MEDIUMTEXT",nullable = false)
private String content;
/**
* 事件时间戳
*/
@Column(name = "TIMESTAMP",length = 20,nullable = false)
private String timestamp;
}
```
jpa配置:
```properties
## 业务领域事件-存储插件配置
biz.event.store.plugin.jpa.enabled=true
## JPA配置
spring.jpa.show-sql=true
spring.jpa.generate-ddl=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
```
2. **MQ插件:**主要负责事件的发布与订阅,支持对接多种消息中间件,默认实现:RabbitMQ。
类结构如下:

基于RabbitMQ 的延迟队列+死信队列实现进件信息的可靠投递与消费,支持根据不同分组生成不同分组交换机,路由KEY和队列,以实现不同分组业务处理的隔离。关键代码如下:
- 分组消费者接口:接口继承ChannelAwareMessageListener ,并新增了启动,停止监听功能,方便后续对不同渠道监听器的灵活控制。
```java
/**
* 分组消费者
* 说明:
*
*
* @author duanyong@jccfc.com
* @date 2021/6/3 22:39
*/
public interface IGroupConsumer extends ChannelAwareMessageListener {
/**
* 重启
*
* @author duanyong@jccfc.com
* @date 2021/6/4 13:47
* @return: void
*/
default void shutdown() {}
/**
* 启动消费者监听
*
* @author duanyong@jccfc.com
* @date 2021/6/4 13:48
* @return: void
*/
default void start(){}
/**
* 停止消费者监听
*
* @author duanyong@jccfc.com
* @date 2021/6/4 13:48
* @return: void
*/
default void stop(){}
}
```
- 分组消费者实现类:实现了onMessage方法,定义了消息处理的主流程:接收消息->转换消息为MessageWrapper对象->获取message对象->调用抽象方法process执行消息推送->如果成功则更新状态为成功->如果失败则调用消息重试方法->如果发送消息过程中出现异常,则调用消息重试方法->最后此条消息要么消费成功,要么从回消息队列。
```java
/**
* 分组消费者实现类
* 说明:
*
*
* @author duanyong@jccfc.com
* @date 2021/6/3 22:56
*/
@Slf4j
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class GroupConsumer implements IGroupConsumer {
/** 分组号 */
protected String group;
/** 分组参数 */
protected GroupParams groupParams;
/** SimpleMessageListenerContainer */
protected SimpleMessageListenerContainer container;
/** 重试生产者对象 */
protected RetryProdcer retryProdcer;
/** 业务领域事件远程调用服务 */
private BizEventRemoteService bizEventRemoteService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取消息
String msg = new String(message.getBody(), Charset.defaultCharset());
log.info("分组:{}消费者,收到消息:{}", group,msg);
//执行推送
Long startTime = System.currentTimeMillis();
MessageWrapper messageWrapper = FastJsonUtil.toBean(msg,MessageWrapper.class);
try {
log.info("执行推送消息->:开始处理...");
String pushMsg = FastJsonUtil.toJSONString(messageWrapper.getMessage());
log.info("执行推送消息到分组:{}->参数:{},申请数据:{}", group,pushMsg);
process(messageWrapper);
log.info("执行推送消息到分组:{},是否成功:{}", group, messageWrapper.isSuccess());
if(messageWrapper.isSuccess()){
//更新申请状态
updateStateSuccess(messageWrapper);
log.info("执行推送消息到分组:{},推送成功, 耗时:{}秒", group,(System.currentTimeMillis() - startTime)/1000.0);
}else{
retry(messageWrapper);
}
}catch(Exception e) {
e.printStackTrace();
log.error("分组消费者->消息处理失败进入重试队列, 耗时:{}秒",(System.currentTimeMillis() - startTime)/1000.0,e);
retry(messageWrapper);
}finally {
//如果消费成功或者重试成功,则确认消息已经消费
if(messageWrapper.isSuccess() || messageWrapper.isRetry()){
//确认消息已经消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("分组消费者->确认消息已经消费:{}, 耗时:{}秒",FastJsonUtil.toJSONString(messageWrapper),(System.currentTimeMillis() - startTime)/1000.0);
}else{
//消息消费失败从回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
log.info("分组消费者->消息消费失败从回队列:{}, 耗时:{}秒",FastJsonUtil.toJSONString(messageWrapper),(System.currentTimeMillis() - startTime)/1000.0);
}
}
}
/**
* 消息重试
*
* @author duanyong@jccfc.com
* @date 2021/6/28 9:56
* @param messageWrapper: 消息封装对象
* @return: void
*/
private void retry(MessageWrapper messageWrapper) {
try{
log.error("执行推送消息到分组:{},推送失败,进入重试队列", group);
retryProdcer.convertAndSend(messageWrapper);
//重试成功
messageWrapper.setRetry(true);
}catch (Exception re){
log.error("分组消费者->消息处理失败进入重试队列失败:{}", group,re);
}
}
/**
* 重启
*
*
* @author duanyong@jccfc.com
* @date 2021/6/4 13:47
* @return: void
*/
@Override
public void shutdown() {
log.info("分组:{},重启", group);
this.container.shutdown();
}
/**
* 启动消费者监听
*
*
* @author duanyong@jccfc.com
* @date 2021/6/4 13:48
* @return: void
*/
@Override
public void start() {
log.info("分组:{},启动消费者监听", group);
this.container.start();
}
/**
* 停止消费者监听
*
*
* @author duanyong@jccfc.com
* @date 2021/6/4 13:48
* @return: void
*/
@Override
public void stop() {
log.info("分组:{},停止消费者监听", group);
this.container.stop();
}
/**
* 执行操作
*
* @author duanyong@jccfc.com
* @date 2021/6/4 14:06
* @param messageWrapper: mq消息包装对象
*/
private void process(MessageWrapper messageWrapper){
log.info("执行推送消息->:开始处理...");
String pushMsg = FastJsonUtil.toJSONString(messageWrapper.getMessage());
log.info("通过{}方式,执行推送消息到分组:{},申请数据:{}", groupParams.getRemoteType().getCode(), group,pushMsg);
//获取请求对象
BizEventReq bizEventReq = getBizEventReq(messageWrapper);
//执行通知
BaseResp baseResp = bizEventRemoteService.notice(bizEventReq);
log.info("通过{}方式,执行推送消息到分组:{},响应:{}", groupParams.getRemoteType(), group, baseResp);
if(baseResp != null && baseResp.getState()){
messageWrapper.setSuccess(true);
}
if(baseResp != null){
messageWrapper.setRemark(StringUtils.join(baseResp.getCode(),"_",baseResp.getDesc()));
}
}
/**
* 设置消息监听
*
* @author duanyong@jccfc.com
* @date 2021/6/8 9:31
* @return: com.jccfc.fund.mq.consumer.ChannelConsumer
*/
public GroupConsumer setMessageListener(){
this.container.setMessageListener(this);
return this;
}
/**
* 更新状态成功
*
* @author duanyong@jccfc.com
* @date 2021/6/8 10:46
* @param messageWrapper: mq消息包装对象
* @return: void
*/
private void updateStateSuccess(MessageWrapper messageWrapper){
BizEventServiceFactory.getBizEventService().ifPresent(bizEventService -> {
log.info("更新状态成功");
BizEventInfoDto bizEventInfoDto = new BizEventInfoDto();
bizEventInfoDto.setBizSerialId(messageWrapper.getMessageId());
bizEventInfoDto.setEventGroup(messageWrapper.getGroupId());
bizEventInfoDto.setState(MqStateEnum.STATE_MQ_RETURN_SUCCESS.getCode());
long count = bizEventService.updateBizEventInfo(bizEventInfoDto);
log.info("更新业务领域事件信息:{},状态->{},影响记录数:{}",messageWrapper.getMessageId(),FastJsonUtil.toJSONString(bizEventInfoDto),count);
BizEventMqErrorInfoDto bizEventMqErrorInfoDto = new BizEventMqErrorInfoDto();
bizEventMqErrorInfoDto.setBizSerialId(messageWrapper.getMessageId());
bizEventMqErrorInfoDto.setEventGroup(messageWrapper.getGroupId());
bizEventMqErrorInfoDto.setState(BizEventService.STATE_4);
count = bizEventService.updateBizEventMqErrorInfo(bizEventMqErrorInfoDto);
log.info("更新业务领域事件MQ异常信息:{},状态->{},影响记录数:{}",messageWrapper.getMessageId(),FastJsonUtil.toJSONString(bizEventInfoDto),count);
});
}
/**
* 获取业务领域事件请求对象
*
* @author duanyong@jccfc.com
* @date 2021/11/13 16:45
* @param messageWrapper:mq包装对象
* @return: com.javacoo.events.request.BizEventReq
*/
private BizEventReq getBizEventReq(MessageWrapper messageWrapper) {
return FastJsonUtil.toBean(FastJsonUtil.toJSONString(messageWrapper.getMessage()),BizEventReq.class);
}
}
```
- 分组消费工厂:应用启动时,根据接入的分组信息动态创建分组消费者对象并启动监听
```java
/**
* 分组消费工厂
* 说明:
*
*
* @author duanyong@jccfc.com
* @date 2021/6/3 23:13
*/
@Slf4j
public class GroupConsumerFactory {
/**
* 重试生产者对象
*/
@Autowired
private RetryProdcer retryProdcer;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private RabbitAdmin rabbitAdmin;
/** MQ配置*/
@Autowired
private BizEventMQPluginConfig mqPluginConfig;
@PostConstruct
private void init(){
mqPluginConfig.getGroups().forEach((key,value)-> doStartGroupConsumer(key,value));
}
/**
* 执行启动消费者
*
* @author duanyong@jccfc.com
* @date 2021/6/4 14:33
* @param group: 分组
* @param groupParams: 参数
* @return: void
*/
private void doStartGroupConsumer(String group , GroupParams groupParams) {
try {
log.info("执行启动消费者:{},分组参数:{}",group, groupParams);
//根据远程调用类型查找实现类
Optional bizEventRemoteServiceOptional = BizEventRemoteServiceFactory.getBizEventRemoteService(
groupParams.getRemoteType().getCode());
if(!bizEventRemoteServiceOptional.isPresent()){
log.error("执行启动消费者:{},远程调用类型:{},未注册对应的实现",group, groupParams.getRemoteType().getCode());
return;
}
//构建动态消费者容器工厂
DynamicConsumerContainerFactory
fac = DynamicConsumerContainerFactory.builder()
.channelNo(group)
.rabbitAdmin(rabbitAdmin)
.connectionFactory(connectionFactory)
.build();
//创建消费者
IGroupConsumer consumer = GroupConsumer.builder()
.channelNo(group)
.bizEventRemoteService(bizEventRemoteServiceOptional.get())
.groupParams(groupParams)
.container(fac.getObject())
.retryProdcer(retryProdcer)
.build()
.setMessageListener();
//启动消费者
consumer.start();
} catch (Exception e) {
e.printStackTrace();
log.error("启动消费者异常:{}",group,e);
}
}
}
```
- 动态消费者容器工厂:实现了FactoryBean接口getObject方法,定义了工作,重试相关交换机,队列,实现了根据接入渠道编号生成对应的交换机和队列,路由key,如:
"分组_work_direct_exchange","分组_work_queue","分组_work_routing_key"
并动态申明到RabbitAdmin 中,最后构建SimpleMessageListenerContainer对象并设置相关参数
```java
/**
* 动态消费者容器工厂
* 说明:
*
*
* @author duanyong@jccfc.com
* @date 2021/6/3 23:03
*/
@Slf4j
@Data
@Builder
public class DynamicConsumerContainerFactory implements FactoryBean {
/** 分组号*/
private String group;
/** 连接工厂 */
private ConnectionFactory connectionFactory;
/** rabbitAdmin */
private RabbitAdmin rabbitAdmin;
/** 消费者数量 */
private Integer concurrentNum;
// 消费方
private IGroupConsumer consumer;
/**
* 构建工作队列
*
* @author duanyong@jccfc.com
* @date 2021/6/4 9:40
* @return: org.springframework.amqp.core.Queue
*/
private Queue buildWorkQueue() {
if (StringUtils.isBlank(group)) {
throw new IllegalArgumentException("分组号不能为空");
}
//队列名称
String queue = StringUtils.join(group, Constants.WORK_QUEUE);
//交换机
String exchange = StringUtils.join(group, Constants.WORK_ERROR_DIRECT_EXCHANGE);
//路由key
String routingKey = StringUtils.join(group, Constants.WORK_ROUTING_KEY);
log.info("构建工作队列,队列名称:{},交换机:{},路由key:{}",queue,exchange,routingKey);
return QueueBuilder.durable(queue)
// DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器
.withArgument(Constants.DEAD_LETTER_EXCHANGE_KEY, exchange)
// dead letter携带的routing key,配置处理队列的路由key
.withArgument(Constants.DEAD_LETTER_ROUTING_KEY, routingKey)
.build();
}
/**
* 构建工作定向队列交换机
*
* @author duanyong@jccfc.com
* @date 2021/6/4 9:48
* @return: org.springframework.amqp.core.Exchange
*/
private Exchange buildWorkExchange() {
//交换机
String exchange = StringUtils.join(group, Constants.WORK_DIRECT_EXCHANGE);
log.info("构建工作定向队列交换机,交换机:{}}",exchange);
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.directExchange(exchange).durable(true).build();
}
/**
* 队列绑定交换机
*
* @author duanyong@jccfc.com
* @date 2021/6/4 9:51
* @param queue: 信息队列
* @param exchange: 定向队列交换机
* @return: org.springframework.amqp.core.Binding
*/
private Binding bindWork(Queue queue, Exchange exchange) {
//路由key
String routingKey = StringUtils.join(group, Constants.WORK_ROUTING_KEY);
return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
}
/**
* 重试的队列
* 超时,死信队列,实现重试机制
* @author duanyong@jccfc.com
* @date 2021/6/4 9:22
* @return: org.springframework.amqp.core.Queue
*/
public Queue buildRetryQueue() {
//队列名称
String queue = StringUtils.join(group, Constants.RETRY_QUEUE);
//交换机
String exchange = StringUtils.join(group, Constants.WORK_DIRECT_EXCHANGE);
//路由key
String routingKey = StringUtils.join(group, Constants.WORK_ROUTING_KEY);
log.info("构建重试的队列,队列名称:{},交换机:{},路由key:{}",queue,exchange,routingKey);
// 设置超时队列
return QueueBuilder.durable(queue)
// DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器
.withArgument(Constants.DEAD_LETTER_EXCHANGE_KEY, exchange)
// dead letter携带的routing key,配置处理队列的路由key
.withArgument(Constants.DEAD_LETTER_ROUTING_KEY, routingKey)
// 设置过期时间
//.withArgument(X_MESSAGE_TTL_KEY, QUEUE_EXPIRATION)
.build();
}
/**
* 申请重试的交换器。
*
* @author duanyong@jccfc.com
* @date 2021/6/4 9:20
* @return: org.springframework.amqp.core.Exchange
*/
public Exchange buildRetryExchange() {
//交换机
String exchange = StringUtils.join(group, Constants.RETRY_EXCHANGE);
log.info("构建工作定向队列交换机,交换机:{}}",exchange);
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.directExchange(exchange).durable(true).build();
}
/**
* 绑定重试队列到重试交换机
*
* @author duanyong@jccfc.com
* @date 2021/6/4 9:27
* @param queue: 重试队列
* @param exchange: 重试交换机
* @return: org.springframework.amqp.core.Binding
*/
public Binding buildrRetryBinding(Queue queue,Exchange exchange){
//路由key
String routingKey = StringUtils.join(group, Constants.RETRY_KEY);
return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
}
/**
* 校验
*
* @author duanyong@jccfc.com
* @date 2021/6/4 9:54
* @return: void
*/
private void check() {
if (null == rabbitAdmin || null == connectionFactory) {
throw new IllegalArgumentException("rabbitAdmin,connectionFactory 不能为空!");
}
}
@Override
public SimpleMessageListenerContainer getObject() throws Exception {
//校验
check();
//构建工作队列
Queue workQueue = buildWorkQueue();
//构建工作定向队列交换机
Exchange workExchange = buildWorkExchange();
//队列绑定交换机
Binding workBinding = bindWork(workQueue, workExchange);
//申明队列,交换机,绑定
rabbitAdmin.declareQueue(workQueue);
rabbitAdmin.declareExchange(workExchange);
rabbitAdmin.declareBinding(workBinding);
//构建重试队列
Queue retryQueue = buildRetryQueue();
//构建重试定向队列交换机
Exchange retryExchange = buildRetryExchange();
//队列绑定交换机
Binding retryBinding = buildrRetryBinding(retryQueue, retryExchange);
//申明队列,交换机,绑定
rabbitAdmin.declareQueue(retryQueue);
rabbitAdmin.declareExchange(retryExchange);
rabbitAdmin.declareBinding(retryBinding);
//构建SimpleMessageListenerContainer
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setAmqpAdmin(rabbitAdmin);
container.setConnectionFactory(connectionFactory);
container.setQueues(workQueue,retryQueue);
container.setPrefetchCount(20);
container.setConcurrentConsumers(concurrentNum == null ? 1 : concurrentNum);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
if (null != consumer) {
container.setMessageListener(consumer);
}
return container;
}
@Override
public Class> getObjectType() {
return SimpleMessageListenerContainer.class;
}
}
```
3. **Remote插件:**主要负责向客户端推送订阅的领域事件,支持HTTP和RPC模式。默认实现分别是:Fegin和Dubbo。
类结构如下:

为MQ插件提供远程调用服务,实现了HTTP和RPC(dubbo)方式调用,关键代码如下:
业务领域远程调用服务:
```java
/**
* 业务领域远程调用服务
*
*
* @author: duanyong@jccfc.com
* @since: 2021/11/13 11:24
*/
@Spi(value = BizEventRemotePluginConfig.DEFAULT_IMPL)
public interface BizEventRemoteService {
/**
* 通知业务领域事件
*
* @author duanyong@jccfc.com
* @date 2021/11/13 14:14
* @param bizEventReq:业务领域事件请求
* @return: com.javacoo.events.response.BaseResp
*/
BaseResp notice(BizEventReq bizEventReq);
}
```
业务领域远程调用服务抽象实现类:主要实现了业务领域远程调用服务,定义了抽象执行通知方法,由子类具体实现,并基于Sentinel实现了熔断机制。
```java
/**
* 业务领域远程调用服务抽象实现类
*
*
* @author: duanyong@jccfc.com
* @since: 2021/12/18 10:32
*/
@Slf4j
public abstract class AbstractBizEventRemoteService implements BizEventRemoteService{
@Autowired
private BizEventRemotePluginConfig bizEventRemotePluginConfig;
/**
* 通知业务领域事件
*
*
* @param bizEventReq :业务领域事件请求
* @author duanyong@jccfc.com
* @date 2021/11/11 14:14
* @return: com.javacoo.events.response.BaseResp
*/
@Override
public final BaseResp notice(BizEventReq bizEventReq) {
//资源名称
String resourceName = BizEventRemotePluginConfig.RES_KEY+bizEventReq.getEventGroup();
//初始化规则
initRules(resourceName);
Entry entry = null;
try {
entry = SphU.entry(resourceName);
BaseResp baseResp = doNotice(bizEventReq);
return baseResp;
} catch (BlockException blockException) {
return fallback(bizEventReq, blockException);
}catch (Exception exception) {
log.error("执行通知业务领域事件异常",exception);
Tracer.traceEntry(exception,entry);
throw exception;
} finally {
if (entry != null) {
entry.exit();
}
}
}
/**
* 执行通知
*
* @author duanyong@jccfc.com
* @date 2021/12/17 10:33
* @param bizEventReq:业务领域事件请求
* @return: com.javacoo.events.response.BaseResp
*/
protected abstract BaseResp doNotice(BizEventReq bizEventReq);
/**
* 降级处理
*
*
* @param bizEventReq : 业务领域事件请求
* @param e :异常
* @author duanyong@jccfc.com
* @date 2021/12/16 16:44
* @return: com.javacoo.events.response.BaseResp
*/
@Override
public BaseResp fallback(BizEventReq bizEventReq, Throwable e) {
log.error("降级处理:{}",e);
return BaseResp.fail(RespStateCodeEnum.TIMEOUT,"执行通知失败-降级处理");
}
/**
* 初始化熔断规则
*
* @param resourceName :资源名称
* @author duanyong@jccfc.com
* @date 2021/12/17 11:14
* @return: void
*/
private void initRules(String resourceName) {
List rules = new ArrayList<>();
DegradeRule rule = new DegradeRule(resourceName)
.setGrade(bizEventRemotePluginConfig.getGrade())
.setCount(bizEventRemotePluginConfig.getCount())
.setTimeWindow(bizEventRemotePluginConfig.getTimeWindow())
.setSlowRatioThreshold(bizEventRemotePluginConfig.getSlowRatioThreshold())
.setMinRequestAmount(bizEventRemotePluginConfig.getMinRequestAmount())
.setStatIntervalMs(bizEventRemotePluginConfig.getStatIntervalMs());
rules.add(rule);
// 将控制规则载入到 Sentinel
DegradeRuleManager.loadRules(rules);
}
}
```
业务领域远程调用服务工厂:实现了HTTP插件和RPC插件的初始化
```java
/**
* 业务领域远程调用服务工厂
*
*
* @author: duanyong@jccfc.com
* @since: 2021/11/13 11:16
*/
@Slf4j
@Component
@ConditionalOnBean(BizEventRemotePluginConfig.class)
public class BizEventRemoteServiceFactory {
static Map bizEventRemoteServiceMap = new HashMap<>(2);
@Autowired
private BizEventRemotePluginConfig bizEventRemotePluginConfig;
/**
* 构建业务领域远程调用服务实现
*
* @author duanyong@jccfc.com
* @date 2021/11/13 11:32
* @return: com.javacoo.events.remote.api.service.BizEventRemoteService
*/
@Bean
public BizEventRemoteService createBizEventRemoteService(){
//加载自定义插件
BizEventRemoteService bizEventRemoteService = ExtensionLoader.getExtensionLoader(BizEventRemoteService.class).getExtension(bizEventRemotePluginConfig.getImpl());
addBizEventRemoteService(bizEventRemotePluginConfig.getImpl(),bizEventRemoteService);
return bizEventRemoteService;
}
@Bean
public BizEventRemoteService createHttpBizEventRemoteService(){
//加载HTTP插件
BizEventRemoteService bizEventRemoteService = ExtensionLoader.getExtensionLoader(BizEventRemoteService.class).getExtension(
RemoteType.HTTP.getCode());
addBizEventRemoteService(RemoteType.HTTP.getCode(),bizEventRemoteService);
return bizEventRemoteService;
}
@Bean
public BizEventRemoteService createRpcBizEventRemoteService(){
//加载RPC插件
BizEventRemoteService bizEventRemoteService = ExtensionLoader.getExtensionLoader(BizEventRemoteService.class).getExtension(
RemoteType.RPC.getCode());
addBizEventRemoteService(RemoteType.RPC.getCode(),bizEventRemoteService);
return bizEventRemoteService;
}
/**
* 获取业务领域远程调用服务实现
*
* @author duanyong@jccfc.com
* @date 2021/11/13 13:37
* @param impl: 实现名称
* @return: java.util.Optional
*/
public static Optional getBizEventRemoteService(String impl) {
return Optional.ofNullable(bizEventRemoteServiceMap.get(impl));
}
/**
* 添加业务领域远程调用服务实现
*
* @author duanyong@jccfc.com
* @date 2021/11/12 17:21
* @param impl: 实现名称
* @param bizEventRemoteService: 业务领域远程调用服务实现
* @return: void
*/
void addBizEventRemoteService(String impl,BizEventRemoteService bizEventRemoteService) {
if(!bizEventRemoteServiceMap.containsKey(impl) && bizEventRemoteService != null){
bizEventRemoteServiceMap.put(impl,bizEventRemoteService);
}
}
}
```
HTTP业务领域事件远程调用服务:依赖 [远程服务组件](https://www.jianshu.com/p/0beaffc0b06d)实现远程服务的调用
```java
/**
* 业务领域事件远程调用服务
*
*
* @author: duanyong@jccfc.com
* @since: 2021/11/13 14:06
*/
@Slf4j
public class HttpBizEventRemoteService extends AbstractBizEventRemoteService {
/** http配置 */
@Autowired
private HttpConfig httpConfig;
/** 远程服务客户端工厂接口 */
@Autowired
protected RemoteClientFactory remoteClientFactory;
/**
* 执行通知
*
*
* @param bizEventReq :业务领域事件请求
* @author duanyong@jccfc.com
* @date 2021/12/18 10:33
* @return: com.javacoo.events.response.BaseResp
*/
@Override
protected BaseResp doNotice(BizEventReq bizEventReq) {
String pushMsg = FastJsonUtil.toJSONString(bizEventReq);
GroupHttpParams groupHttpParams = httpConfig.getGroups().get(bizEventReq.getEventGroup());
log.info("HttpBizEventRemoteService执行通知业务领域事件到分组:{}->参数:{},申请数据:{}", bizEventReq.getEventGroup(),
groupHttpParams,pushMsg);
TargetConfig targetConfig = TargetConfig.builder()
.url(groupHttpParams.getPushUrl())
.connectionTimeout(groupHttpParams.getConnectionTimeout())
.period(groupHttpParams.getPeriod())
.retryCount(groupHttpParams.getRetryCount())
.socketTimeout(groupHttpParams.getSocketTimeout())
.build();
JSONObject resultJson = remoteClientFactory.getClient(BizEventRemoteClient.class,targetConfig).notice(pushMsg);
log.info("HttpBizEventRemoteService执行通知业务领域事件到分组:{},响应:{}", bizEventReq.getEventGroup(), resultJson);
if(resultJson != null && Constants.LOAN_APPLY_PUSH_RETURN_STATE_CODE.equals(resultJson.getString(Constants.LOAN_APPLY_PUSH_RETURN_STATE))){
return BaseResp.ok();
}
if(resultJson != null){
return BaseResp.fail(resultJson.getString(Constants.LOAN_APPLY_PUSH_RETURN_CODE),resultJson.getString(Constants.LOAN_APPLY_PUSH_RETURN_DESC));
}
return BaseResp.fail("执行通知失败");
}
}
```
Dubbo服务帮助类:此类启动时根据分组配置参数初始化 分组->业务领域事件通知服务MAP,主要是根据分组RPC配置参数对dubbo的ReferenceConfig对象进行配置,实现不同分组绑定不同的ReferenceConfig对象,从而达到根据分组调用不同实现的目的。
```java
/**
* 业务领域事件远程调用服务
*
*
* @author: duanyong@jccfc.com
* @since: 2021/11/13 14:06
*/
@Slf4j
public class RpcBizEventRemoteService extends AbstractBizEventRemoteService {
/** 远程服务客户端工厂接口 */
@Autowired
protected DubboServiceHelper dubboServiceHelper;
/**
* 执行通知
*
*
* @param bizEventReq :业务领域事件请求
* @author duanyong@jccfc.com
* @date 2021/12/18 10:33
* @return: com.javacoo.events.response.BaseResp
*/
@Override
protected BaseResp doNotice(BizEventReq bizEventReq) {
String pushMsg = FastJsonUtil.toJSONString(bizEventReq);
log.info("RpcBizEventRemoteService执行通知业务领域事件到分组:{}->申请数据:{}", bizEventReq.getEventGroup(),pushMsg);
if(!dubboServiceHelper.getGroupBizEventNoticeServiceMap().containsKey(bizEventReq.getEventGroup())){
log.error("未配置分组:{},通知服务",bizEventReq.getEventGroup());
return BaseResp.fail("未配置分组:"+bizEventReq.getEventGroup()+"通知服务");
}
//获取通知服务
IBizEventNoticeService bizEventNoticeService = dubboServiceHelper.getGroupBizEventNoticeServiceMap().get(bizEventReq.getEventGroup()).get();
if(bizEventNoticeService == null){
log.error("无法获取分组:{},配置的通知服务",bizEventReq.getEventGroup());
return BaseResp.fail(("无法获取分组:"+bizEventReq.getEventGroup()+"配置的通知服务"));
}
BaseResp baseResp = bizEventNoticeService.notice(bizEventReq);
log.info("RpcBizEventRemoteService执行通知业务领域事件到分组:{},响应:{}", bizEventReq.getEventGroup(), FastJsonUtil.toJSONString(baseResp));
return baseResp;
}
}
```
RPC业务领域事件远程调用服务:
```java
/**
* 业务领域事件远程调用服务
*
*
* @author: duanyong@jccfc.com
* @since: 2021/11/13 14:06
*/
@Slf4j
public class RpcBizEventRemoteService implements BizEventRemoteService {
/** 远程服务客户端工厂接口 */
@Autowired
protected DubboServiceHelper dubboServiceHelper;
/**
* 通知业务领域事件
*
*
* @param bizEventReq :业务领域事件请求
* @author duanyong@jccfc.com
* @date 2021/11/13 14:14
* @return: com.javacoo.events.response.BaseResp
*/
@Override
public BaseResp notice(BizEventReq bizEventReq) {
String pushMsg = FastJsonUtil.toJSONString(bizEventReq);
log.info("RpcBizEventRemoteService执行通知业务领域事件到分组:{}->申请数据:{}", bizEventReq.getEventGroup(),pushMsg);
if(!dubboServiceHelper.getGroupBizEventNoticeServiceMap().containsKey(bizEventReq.getEventGroup())){
log.error("未配置分组:{},通知服务",bizEventReq.getEventGroup());
return BaseResp.fail("未配置分组:"+bizEventReq.getEventGroup()+"通知服务");
}
//获取通知服务
IBizEventNoticeService bizEventNoticeService = dubboServiceHelper.getGroupBizEventNoticeServiceMap().get(bizEventReq.getEventGroup()).get();
if(bizEventNoticeService == null){
log.error("无法获取分组:{},配置的通知服务",bizEventReq.getEventGroup());
return BaseResp.fail(("无法获取分组:"+bizEventReq.getEventGroup()+"配置的通知服务"));
}
BaseResp baseResp = bizEventNoticeService.notice(bizEventReq);
log.info("RpcBizEventRemoteService执行通知业务领域事件到分组:{},响应:{}", bizEventReq.getEventGroup(), FastJsonUtil.toJSONString(baseResp));
return baseResp;
}
}
```
4. **Task插件:**主要负责定时从推送失败消息表中获取推送失败的领域事件消息,重新发送到MQ中,继续推送到指定客户端。默认实现:XXLJob
类结构如下:

通过定时任务获取推送异常的业务事件信息,重新推送到MQ中,执行推送流程,默认采用XXLJOB。关键代码如下:
XXLJob配置:
```java
/**
* XXLJob配置
*
*
* @author: duanyong@jccfc.com
* @since: 2021/12/18 11:00
*/
@Data
@Slf4j
@Configuration
@ConfigurationProperties(prefix = XXLJobConfig.PREFIX)
@ConditionalOnProperty(prefix = XXLJobConfig.PREFIX, value = XXLJobConfig.ENABLED, matchIfMissing = true)
public class XXLJobConfig {
public static final String PREFIX = "biz.event.task.plugin.xxljob";
public static final String ENABLED = "enabled";
/**是否可用*/
private String enabled = ENABLED;
/** 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册; */
private String adminAddresses;
/** 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册 */
private String appName;
/** 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务" */
private String ip;
/** 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口 */
private int port;
/** 执行器通讯TOKEN [选填]:非空时启用 */
private String accessToken;
/** 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径 */
private String logPath;
/** 执行器日志保存天数 [选填] :值大于3时生效,启用执行器Log文件定期清理功能,否则不生效 */
private int logRetentionDays;
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init,AdminAddress:{}",adminAddresses);
XxlJobExecutor xxlJobExecutor = new XxlJobExecutor();
xxlJobExecutor.setAdminAddresses(adminAddresses);
xxlJobExecutor.setAppName(appName);
xxlJobExecutor.setIp(ip);
xxlJobExecutor.setPort(port);
xxlJobExecutor.setAccessToken(accessToken);
xxlJobExecutor.setLogPath(logPath);
xxlJobExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobExecutor;
}
}
```
业务领域事件任务处理器接口XXLJob实现类
```java
/**
* 业务领域事件任务处理器接口XXLJob实现类
*
*
* @author: duanyong@jccfc.com
* @since: 2021/12/18 13:39
*/
@Slf4j
@JobHandler(value = "bizEventTaskJobHandler")
public class XXLJobBizEventTaskJobHandler extends BaseJobHandler implements BizEventTaskJobHandler {
/**
* 业务领域事件服务
*/
@Autowired
private BizEventService bizEventService;
/**
* 业务领域MQ消息生产者服务
*/
@Autowired
private BizEventMQProducer bizEventMQProducer;
/**
* 获取JOB任务
*
*
* @param param :控制台参数
* @author duanyong@jccfc.com
* @date 2020/7/27 9:31
* @return: java.util.Optional>
*/
@Override
protected Optional> getJobTask(String param) {
log.info("XXLJob执行定时任务-bizevent-job:{}",param);
//获取控制台参数
JSONObject paramJSONObject = getParamJSONObject(param);
//得到最大执行次数
int maxExecNum = paramJSONObject.containsKey(MAX_EXEC_NUM_KEY) ? paramJSONObject.getIntValue(MAX_EXEC_NUM_KEY) : MAX_EXEC_NUM;
//得到查询参数state
final String state = paramJSONObject.containsKey(QUERY_STATE_KEY) ? paramJSONObject.getString(QUERY_STATE_KEY) : QUERY_STATE;
//查询指定状态的异常信息
BizEventMqErrorInfoDto bizEventMqErrorInfoDtoQuery = new BizEventMqErrorInfoDto();
bizEventMqErrorInfoDtoQuery.setState(state);
List bizEventMqErrorInfoDtos = bizEventService.getList(bizEventMqErrorInfoDtoQuery);
if(CollectionUtils.isEmpty(bizEventMqErrorInfoDtos)){
return Optional.empty();
}
//过滤
bizEventMqErrorInfoDtos = bizEventMqErrorInfoDtos.stream()
.filter(bizEventMqErrorInfoDto -> bizEventMqErrorInfoDto.getExecNum() < maxExecNum)
.collect(Collectors.toList());
return CollectionUtils.isEmpty(bizEventMqErrorInfoDtos) ? Optional.empty() : Optional.of(bizEventMqErrorInfoDtos);
}
/**
* 执行单个任务
*
*
* @param bizEventMqErrorInfoDto :任务
* @author duanyong@jccfc.com
* @date 2020/7/27 9:35
* @return: void
*/
@Override
protected void doExecute(BizEventMqErrorInfoDto bizEventMqErrorInfoDto) {
BizEventReq bizEventReq = FastJsonUtil.toBean(bizEventMqErrorInfoDto.getContent(),BizEventReq.class);
try {
//推送
bizEventMQProducer.send(bizEventReq,null);
//修改状态为2
bizEventMqErrorInfoDto.setState(BizEventService.STATE_2);
bizEventService.updateBizEventMqErrorInfo(bizEventMqErrorInfoDto);
} catch (Exception e) {
log.error("推送业务领域事件到MQ并修改状态失败",e);
}
}
/**
* 是否支持并行执行
* 默认false
*
* @author duanyong@jccfc.com
* @date 2020/7/27 13:48
* @return: boolean
*/
@Override
protected boolean parallel() {
return true;
}
}
```
JOB任务抽象基类
```java
/**
* JOB任务抽象基类
*
*
* @author: duanyong@jccfc.com
* @since: 2020/7/27 8:30
*/
@Slf4j
public abstract class BaseJobHandler extends IJobHandler {
@Override
public final ReturnT execute(String param) throws Exception {
getJobTask(param).ifPresent(this::accept);
return ReturnT.SUCCESS;
}
/**
* 获取JOB任务
*
* @author duanyong@jccfc.com
* @date 2020/7/27 9:31
* @param param:控制台参数
* @return: java.util.Optional>
*/
protected abstract Optional> getJobTask(String param);
/**
* 执行单个任务
*
* @author duanyong@jccfc.com
* @date 2020/7/27 9:35
* @param task:任务
* @return: void
*/
protected void doExecute(T task){};
/**
* 批量执行任务
*
* @author duanyong@jccfc.com
* @date 2020/7/28 8:35
* @param taskList: 任务集合
* @return: void
*/
protected void doExecute(List taskList){};
/**
* 是否支持并行执行
* 默认false
* @author duanyong@jccfc.com
* @date 2020/7/27 13:48
* @return: boolean
*/
protected boolean parallel(){
return false;
}
/**
* 是否支持批量执行
* 默认false
* @author duanyong@jccfc.com
* @date 2020/7/28 8:37
* @return: boolean
*/
protected boolean batch(){
return false;
}
/**
* 将xxljob控制台传递的参数封装到JSONObject中
* 控制台参数输入JSON格式:{a:xxx,b:xxx}
* @author duanyong@jccfc.com
* @date 2020/7/27 9:15
* @param param: 参数
* @return: com.alibaba.fastjson.JSONObject
*/
protected JSONObject getParamJSONObject(String param){
if(StringUtils.isBlank(param)){
param = "{}";
}
return JSONObject.parseObject(param);
}
/**
* 接收参数并执行
*
* @author duanyong@jccfc.com
* @date 2020/7/27 9:42
* @param taskList:
* @return: void
*/
private void accept(List taskList) {
if(batch()){
doExecute(taskList);
}else{
if(parallel()){
taskList.parallelStream().forEachOrdered(task -> doExecute(task));
}else{
taskList.stream().forEachOrdered(task -> doExecute(task));
}
}
}
}
```
5. **Register插件:**主要负责客户端系统注册业务领域事件,默认实现:基于配置文件
类结构如下:

通过客户端系统注册信息实现事件的过滤,达到精准推送,按需推送的目的,关键代码如下:
业务领域事件注册配置:
```java
/**
* 业务领域事件注册配置
*
*
* @author: duanyong@jccfc.com
* @since: 2021/11/29 15:51
*/
@Slf4j
@Data
@ToString
@Configuration(proxyBeanMethods = false)
@ConfigurationProperties(prefix = FileRegisterConfig.PREFIX)
@ConditionalOnProperty(prefix = FileRegisterConfig.PREFIX, value = FileRegisterConfig.ENABLED, matchIfMissing = true)
public class FileRegisterConfig {
public static final String PREFIX = "biz.event.registry.plugin.file";
public static final String ENABLED = "enabled";
/**是否可用*/
private String enabled = ENABLED;
/**分组-业务领域事件集合*/
private Map> groups = new LinkedHashMap<>();
}
```
业务领域事件信息:
```java
/**
* 业务领域事件信息
*
*
* @author: duanyong@jccfc.com
* @since: 2021/11/29 14:05
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class BizEventInfo implements Serializable {
private static final long serialVersionUID = -4904238050272564504L;
/**
* 领域业务类型名称:
*/
private String eventType;
/**
* 业务事件主题:
*/
@Builder.Default
private String topic = "";
/**
* 业务事件版本:
*/
@Builder.Default
private String version = "";
/**
* 业务事件对象参数匹配:EL表达式->格式:EL表达式=取值
*/
@Builder.Default
private String paramEL = "";
/**
* 获取EL表达式
* 说明:
*
* @author duanyong@jccfc.com
* @date 2021/12/11 21:28
* @return el表达式
*/
public String getEl(){
return paramEL != "" ? paramEL.split("=")[0] : "";
}
/**
* 获取需要匹配的值
* 说明:
* 支持正则表达式
* @author duanyong@jccfc.com
* @date 2021/12/11 21:28
* @return el表达式取值
*/
public String getElValue(){
return paramEL != "" ? paramEL.split("=")[1] : "";
}
@Override
public String toString() {
return eventType + topic + version;
}
}
```
#### 2,BizEvent-SDK
- BizEvent-SDK是与BizEvents-Framework配套的客户端SDK,事件驱动的体系结构中,客户端无需轮询就可以接收更新,事件在到达事件存储后就会通知给客户端,客户端可以随时接收更新,这对于动态数据转换、分析和数据科学处理非常有用。其他业务系统只需集成SDK,便可拥有订阅目标系统领域事件,并收到目标系统推送领域事件的能力。类结构图如下:

接下来分两部分来说明:
1. **SDK初始化**
系统集成BizEvent-SDK ,系统启动中根据SDK包中spring.factories自动装配相关组件(根据接入配置(见安装教程->客户端),初始化如 HTTP和RPC(dubbo),过滤器,解析器等),spring.factories配置如下:
```properties
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.javacoo.events.client.config.BizEventClientConfig,\
com.javacoo.events.client.rpc.dubbo.config.DubboConfig,\
com.javacoo.events.client.rpc.dubbo.DubboAutoConfiguration,\
com.javacoo.events.client.rpc.dubbo.CustomDubboAutoConfiguration,\
com.javacoo.events.client.http.HttpAutoConfiguration,\
com.javacoo.events.client.BizEventListenerRegistrar
```
- BizEventClientConfig:完成了过滤器,解析器的初始化
**解析器:**默认实现基于Spring SpelExpressionParser实现。EL表达式介绍及使用文档参见 [Spring官网](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#expressions)。这里需要注意是,SDK约定是基于对象方式访问,对象名称为eventObject,具体使用参见 使用说明->客户端,关键代码如下:
```java
/**
* 获取上下文
*
* @author duanyong@jccfc.com
* @date 2021/12/11 9:58
* @param root:存储表达式对应值的上下文
* @return: org.springframework.expression.EvaluationContext
*/
private EvaluationContext getContext(Object root){
//设置上下文
EvaluationContext context = new StandardEvaluationContext();
context.setVariable("eventObject",root);
return context;
}
```
**过滤器:**过滤器基于自定义函数式接口BizEventFilter实现,定义了匹配方法及辅助方法,支持并操作,或操作,非操作,可实现不同规则下过滤器的灵活组装。默认提供了事件类型过滤器,事件主题过滤器,事件版本过滤器,事件对象参数过滤器,其中前三个过滤器默认实现是简单等值比较,事件对象参数过滤器则是通过解析器解析领域事件中事件对象,获取对象中指定字段的值,与配置的值比较,支持正则表达式匹配。这些过滤器最终通过事件过滤器规则对象组合在一起,默认规则是并操作,关键代码如下:
业务领域事件过滤接口
```java
/**
* 业务领域事件过滤接口
*
*
* @author: duanyong@jccfc.com
* @since: 2021/12/11 10:59
*/
@FunctionalInterface
public interface BizEventFilter{
/**
* 匹配
*
* @author duanyong@jccfc.com
* @date 2021/12/11 9:14
* @param u: 待匹配参数
* @param t: 待匹配参数
* @return: boolean 是否匹配
*/
boolean matcher(U u,T t);
/**
* 并操作
*
* @author duanyong@jccfc.com
* @date 2021/12/11 9:16
* @param other: 另一个比较器
* @return: com.javacoo.events.filter.BizEventFilter
*/
default BizEventFilter and(BizEventFilter super U,? super T> other) {
Objects.requireNonNull(other);
return (u,t) -> matcher(u,t) && other.matcher(u,t);
}
/**
* 或操作
*
* @author duanyong@jccfc.com
* @date 2021/12/11 9:17
* @param other: 另一个比较器
* @return: com.javacoo.events.filter.BizEventFilter
*/
default BizEventFilter or(BizEventFilter super U,? super T> other) {
Objects.requireNonNull(other);
return (u,t) -> matcher(u,t) || other.matcher(u,t);
}
/**
* 非操作
*
* @author duanyong@jccfc.com
* @date 2021/12/11 9:17
* @return: com.javacoo.events.filter.BizEventFilter
*/
default BizEventFilter negate() {
return (u,t) -> !matcher(u,t);
}
}
```
事件对象参数过滤器
```java
/**
* 事件对象参数过滤器
*
*
* @author: duanyong@jccfc.com
* @since: 2021/12/11 10:11
*/
@Slf4j
public class ParamBizEventFilter implements BizEventFilter {
/**
* 匹配
*
*
* @param bizEventReq : 待匹配参数
* @param bizEventListenerInfo : 待匹配参数
* @author duanyong@jccfc.com
* @date 2021/12/11 9:14
* @return: boolean 是否匹配
*/
@Override
public boolean matcher(BizEventReq bizEventReq, BizEventListenerInfo bizEventListenerInfo) {
Objects.requireNonNull(bizEventReq);
Objects.requireNonNull(bizEventReq.getEventObject());
Objects.requireNonNull(bizEventListenerInfo);
log.info("开始表达式匹配:{}", bizEventListenerInfo.getParamEL());
if(StringUtils.isBlank(bizEventListenerInfo.getParamEL())){
return true;
}
//不支持表达式
if(!ExpressionParserHolder.getSpelExpressionParser().isPresent()){
log.warn("不支持表达式匹配");
return false;
}
//获取表达式解析器
ExpressionParser expressionParser = ExpressionParserHolder.getSpelExpressionParser().get();
//获取参数class对象
Class paramClass = bizEventListenerInfo.getTargetMethod().getParameterTypes()[0];
//获取业务领域事件对象JSON字符串
final String eventObjectJsonString = JSONObject.toJSONString(bizEventReq.getEventObject());
//得到目标参数对象
Object eventObject = JSONObject.parseObject(eventObjectJsonString,paramClass);
//获取值
Object valueObj = expressionParser.getValue(bizEventListenerInfo.getEl(),eventObject);
log.info("取值表达式:{},取值结果:{}", bizEventListenerInfo.getEl(),valueObj);
if(valueObj == null){
return false;
}
boolean matches = DataUtil.matcher(bizEventListenerInfo.getElValue(),valueObj.toString());
log.info("值匹配表达式:{},待匹配值:{},匹配结果:{}", bizEventListenerInfo.getElValue(),valueObj.toString(),matches);
return matches;
}
}
```
事件类型过滤器
```java
/**
* 事件类型过滤器
*
*
* @author: duanyong@jccfc.com
* @since: 2021/12/11 9:59
*/
@Slf4j
public class EventTypeBizEventFilter implements BizEventFilter {
/**
* 匹配
*
*
* @param bizEventReq : 待匹配参数
* @param bizEventListenerInfo : 待匹配参数
* @author duanyong@jccfc.com
* @date 2021/12/11 9:14
* @return: boolean 是否匹配
*/
@Override
public boolean matcher(BizEventReq bizEventReq, BizEventListenerInfo bizEventListenerInfo) {
Objects.requireNonNull(bizEventReq);
Objects.requireNonNull(bizEventListenerInfo);
log.info("开始事件类型匹配,事件中的类型:{},注册的类型:{}",bizEventReq.getEventType(), bizEventListenerInfo.getEventType());
if(StringUtils.isBlank(bizEventListenerInfo.getEventType())){
return false;
}
return bizEventReq.getEventType().equals(bizEventListenerInfo.getEventType());
}
}
```
事件过滤器规则
```java
/**
* 事件过滤器规则
*
*
* @author: duanyong@jccfc.com
* @since: 2021/12/11 9:56
*/
@Slf4j
public class SimpleBizEventFilterRule implements BiFunction {
/** 事件类型过滤器 */
private BizEventFilter eventTypeBizEventFilter;
/** 事件主题过滤器 */
private BizEventFilter topicBizEventFilter;
/** 事件版本过滤器 */
private BizEventFilter versionBizEventFilter;
/** 事件对象参数过滤器 */
private BizEventFilter paramBizEventFilter;
@PostConstruct
public void init(){
eventTypeBizEventFilter = new EventTypeBizEventFilter();
topicBizEventFilter = new TopicBizEventFilter();
versionBizEventFilter = new VersionBizEventFilter();
paramBizEventFilter = new ParamBizEventFilter();
log.info("事件过滤器规则对象初始化完成");
}
/**
* Applies this function to the given arguments.
*
* @param bizEventReq the first function argument
* @param bizEventListenerInfo the second function argument
* @return the function result
*/
@Override
public Boolean apply(BizEventReq bizEventReq, BizEventListenerInfo bizEventListenerInfo) {
return eventTypeBizEventFilter.and(topicBizEventFilter).and(versionBizEventFilter).and(paramBizEventFilter).matcher(bizEventReq,
bizEventListenerInfo);
}
}
```
- DubboConfig,DubboAutoConfiguration,CustomDubboAutoConfiguration三个类,实现了RPC方式接收事件推送通知。
CustomDubboAutoConfiguration自动配置类,在客户端系统未使用dubbo的情况下使用,需要配置dubbo参数,具体配置参见 安装教程->客户端 代码如下:
```java
/**
* DUBBO自动配置类
* 需要配置dubbo参数
*
* @author: duanyong@jccfc.com
* @since: 2021/11/19 10:35
*/
@Slf4j
@Configuration(proxyBeanMethods = false)
@DubboComponentScan("com.javacoo.events.client.rpc.dubbo.service")
@EnableConfigurationProperties(value = BizEventDubboConfig.class)
@ConditionalOnClass(BizEventDubboConfig.class)
@ConditionalOnProperty(prefix = BizEventDubboConfig.PREFIX, name=BizEventDubboConfig.CUSTOM_KEY,havingValue = BizEventDubboConfig.CUSTOM_TRUE)
public class CustomDubboAutoConfiguration {
/**
* Dubbo配置
*/
@Autowired
private DubboConfig dubboConfig;
/**
* 当前应用配置
*/
@Bean("biz-event-annotation-provider")
public ApplicationConfig applicationConfig() {
return dubboConfig.getApplication();
}
/**
* 当前连接注册中心配置
*/
@Bean("biz-event-registry")
public RegistryConfig registryConfig() {
return dubboConfig.getRegistry();
}
/**
* 当前协议配置
*/
@Bean("biz-event-protocol")
public ProtocolConfig protocolConfig() {
return dubboConfig.getProtocol();
}
/**
* 当前配置中心配置
*/
@Bean("biz-event-centerConfig")
public ConfigCenterConfig configCenterConfig(){
return dubboConfig.getConfigCenter();
}
@Bean
public BizEventDispatcher bizEventDispatcher(){
return new BizEventDispatcher();
}
}
```
业务事件通知实现类
```java
/**
* 业务事件通知实现类
* 基于dubbo实现
* version:版本,根据约定,需与接入平台保持一致
* group:分组,根据约定,需与接入平台保持一致
*
* @author: duanyong@jccfc.com
* @since: 2021/11/18 15:31
*/
@Slf4j
@DubboService(interfaceClass = IBizEventNoticeService.class, version = "${biz.event.client.rpc.dubbo.service.version}", retries = -1,group = "${biz.event.client.rpc.dubbo.service.group}")
public class DubboBizEventNoticeService implements IBizEventNoticeService {
/**
* 业务领域远程调用服务
*/
@Autowired
private BizEventPersistence bizEventPersistence;
/**
* 事件分发器
*/
@Autowired
private BizEventDispatcher bizEventDispatcher;
/**
* 通知业务领域事件
*
*
* @param bizEventReq : 业务领域事件
* @author duanyong@jccfc.com
* @date 2021/11/13 9:24
* @return: com.javacoo.events.response.BaseResp
*/
@Override
public BaseResp notice(BizEventReq bizEventReq) {
log.info("收到业务领域事件通知:{}",bizEventReq);
try{
//持久化业务领域事件
bizEventPersistence.persist(bizEventReq);
//分发业务领域事件
bizEventDispatcher.dispatch(bizEventReq);
}catch (Exception e){
log.error("业务领域事件分发失败",e);
return BaseResp.fail(e.getMessage());
}
return BaseResp.ok();
}
}
```
DubboAutoConfiguration自动配置类,在客户端已经使用dubbo的情况下使用,主要思想是利用ServiceConfig暴露服务,代码如下:
```java
/**
* dubbo自动配置
* 说明:
* 依赖已有配置,注册服务
*
* @author duanyong@jccfc.com
* @date 2021/12/1 22:10
*/
@Slf4j
@Configuration(proxyBeanMethods = false)
@AutoConfigureOrder(5)
@EnableConfigurationProperties(value = BizEventDubboConfig.class)
@ConditionalOnClass(value = {BizEventDubboConfig.class})
@ConditionalOnProperty(prefix = BizEventDubboConfig.PREFIX, name=BizEventDubboConfig.CUSTOM_KEY,havingValue = BizEventDubboConfig.CUSTOM_DEFAULT)
public class DubboAutoConfiguration {
/**
* Dubbo配置
*/
@Autowired
private BizEventDubboConfig bizEventDubboConfig;
/**
* applicationConfig
*/
@Autowired
private ApplicationConfig applicationConfig;
/**
* registryConfig
*/
@Autowired
private RegistryConfig registryConfig;
/**
* protocolConfig
*/
@Autowired
private ProtocolConfig protocolConfig;
/**
* configCenterConfig
*/
@Autowired
private ConfigCenterConfig configCenterConfig;
@Bean
public BizEventDispatcher bizEventDispatcher(){
return new BizEventDispatcher();
}
@Bean
public IBizEventNoticeService bizEventNoticeService(){
IBizEventNoticeService bizEventNoticeService = new DubboBizEventNoticeService();
ServiceConfig config = new ServiceConfig();
config.setApplication(applicationConfig);
List registryConfigs = new ArrayList<>(1);
registryConfigs.add(registryConfig);
config.setRegistries(registryConfigs);
config.setProtocol(protocolConfig);
config.setConfigCenter(configCenterConfig);
config.setGroup(bizEventDubboConfig.getService().getGroup());
config.setVersion(bizEventDubboConfig.getService().getVersion());
config.setInterface("com.javacoo.events.api.IBizEventNoticeService");
config.setRef(bizEventNoticeService);
config.export();
return bizEventNoticeService;
}
}
```
- HttpAutoConfiguration自动配置类,完成Http方式接入,实现很简单,按照约定对外暴露/notice/bizEvent接口方法,代码如下:
```java
/**
* http自动配置类
* 说明:
*
* @author duanyong@jccfc.com
* @date 2021/11/20 13:25
*/
@Slf4j
@Configuration
@EnableConfigurationProperties(value = HttpConfig.class)
@ConditionalOnClass(HttpConfig.class)
@ConditionalOnProperty(prefix = HttpConfig.PREFIX, value = HttpConfig.ENABLED, matchIfMissing = true)
public class HttpAutoConfiguration {
/**
* http配置
*/
@Autowired
private HttpConfig HttpConfig;
@Bean
public HttpBizEventNoticeController httpBizEventNoticeController() {
return new HttpBizEventNoticeController();
}
@Bean
public BizEventDispatcher bizEventDispatcher(){
return new BizEventDispatcher();
}
}
```
业务领域事件通知控制器
```java
/**
* 业务领域事件通知控制器
* 说明:
*
*
* @author duanyong@jccfc.com
* @date 2021/11/20 14:24
*/
@Slf4j
@RestController
@RequestMapping("/notice")
public class HttpBizEventNoticeController {
/**
* 业务领域远程调用服务
*/
@Autowired
private BizEventPersistence bizEventPersistence;
/**
* 事件分发器
*/
@Autowired
private BizEventDispatcher bizEventDispatcher;
/**
* 业务领域事件通知
* 说明:
*
* @param bizEventReq : 业务领域事件
* @author duanyong@jccfc.com
* @date 2021/11/20 14:35
*/
@RequestMapping("/bizEvent")
@ResponseBody
public BaseResp notice(@RequestBody BizEventReq bizEventReq) {
log.info("HttpBizEventNoticeController收到业务领域事件通知:{}",bizEventReq);
try{
//持久化业务领域事件
bizEventPersistence.persist(bizEventReq);
//分发业务领域事件
bizEventDispatcher.dispatch(bizEventReq);
}catch (Exception e){
log.error("业务领域事件分发失败",e);
return BaseResp.fail(e.getMessage());
}
return BaseResp.ok();
}
}
```
- BizEventListenerRegistrar业务领域事件监听对象注册服务:系统集成BizEvent-SDK (配置见安装教程->客户端),Spring容器启动之后,便会运行业务领域事件监听对象注册服务(BizEventListenerRegistrar),扫描Spring容器中包含@Service注解的服务,找到其中添加了@BizEventListener注解的方法,解析并组装业务领域事件监听信息(包括事件类型,事件主题,事件版本,事件对象参数匹配:EL表达式,目标方法),完成业务事件监听元数据集合的组装,供后续使用。关键代码如下:
业务领域事件监听对象注册服务
```java
/**
* 业务领域事件监听对象注册
*
*
* @author: duanyong@jccfc.com
* @since: 2021/11/18 16:21
*/
@Slf4j
@Component
public class BizEventListenerRegistrar implements ApplicationContextAware {
/** 业务事件监听元数据集合 */
private static List bizEventListenerMetaDataList = new ArrayList<>();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//设置上下文
ApplicationContextProvider.setApplicationContext(applicationContext);
//扫描Service注解,初始化业务事件监听元数据集合
initBizEventMetaDataList(Service.class);
}
/**
* 扫描指定注解服务,初始化业务事件监听元数据集合
*
* @author duanyong@jccfc.com
* @date 2021/11/19 16:36
* @param annClass:注解class对象
* @return: void
*/
private void initBizEventMetaDataList(Class extends Annotation> annClass){
//查找Service
Map serviceMap = ApplicationContextProvider.getApplicationContext().getBeansWithAnnotation(annClass);
for (Map.Entry entry : serviceMap.entrySet()) {
Class entryClass = AopUtils.getTargetClass(entry.getValue());
//获取业务领域事件监听信息集合
List bizEventListenerInfos = Arrays.stream(entryClass.getDeclaredMethods())
//获取本类 public方法
.filter(method -> Modifier.isPublic(method.getModifiers()))
//找到注解所在方法
.filter(method -> method.isAnnotationPresent(BizEventListener.class))
//只支持监听一个参数
.filter(method -> method.getParameterTypes().length == 1)
//排序
.sorted(Comparator.comparing(method -> method.getAnnotation(BizEventListener.class).order()))
//组装
.map(method -> getBizEventInfo(method))
.collect(Collectors.toList());
if(bizEventListenerInfos.isEmpty()){
continue;
}
bizEventListenerMetaDataList.add(BizEventListenerMetaData.builder().beanName(entry.getKey()).bizEventListenerInfos(bizEventListenerInfos).targetClass(entryClass).build());
}
log.info("业务领域事件监听对象注册数量:{},对象:{}", bizEventListenerMetaDataList.size(), bizEventListenerMetaDataList);
}
/**
* 获取方法上的注解信息,组装业务领域事件监听信息
*
* @author duanyong@jccfc.com
* @date 2021/12/11 15:11
* @param method:方法
* @return: com.javacoo.events.client.BizEventInfo
*/
private BizEventListenerInfo getBizEventInfo(Method method){
BizEventListener bizEventListener = method.getAnnotation(BizEventListener.class);
return BizEventListenerInfo.builder()
.eventType(bizEventListener.eventType())
.topic(bizEventListener.topic())
.version(bizEventListener.version())
.paramEL(bizEventListener.paramEl())
.targetMethod(method)
.build();
}
/**
* 获取业务事件监听元数据集合
*
* @author duanyong@jccfc.com
* @date 2021/11/19 16:46
* @return: java.util.Optional>
*/
public static Optional> getBizEventListenerMetaDataList(){
return bizEventListenerMetaDataList.isEmpty() ? Optional.empty() : Optional.of(bizEventListenerMetaDataList);
}
}
```
业务领域事件监听注解
```java
/**
* 业务领域事件监听注解
*
*
* @author: duanyong@jccfc.com
* @since: 2021/11/18 15:57
*/
@Inherited
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface BizEventListener {
/**
* 业务事件类型
*/
String eventType() default "";
/**
* 业务事件主题
*/
String topic() default "";
/**
* 业务事件版本
*/
String version() default "";
/**
* 参数EL匹配表达式->格式:EL表达式=取值
*/
String paramEl() default "";
/**
* 排序
*/
int order() default 0;
}
```
至此SDK初始化便完成。
2. **事件处理**
通知事件的处理,则通过业务领域事件持久化服务BizEventPersistence和业务领域事件分发器BizEventDispatcher完成,业务领域事件持久化服务完成事件的持久化处理,SDK提供默认持久化服务(什么都不做),客户端可根据业务需求,扩展实现。业务领域事件分发器主要是根据接收到业务领域事件通知信息,从系统启动时收集到的业务事件监听元数据集合中依次调用事件过滤器规则服务SimpleBizEventFilterRule找到匹配的事件监听方法,利用反射机制执行。执行分异步和同步执行,由下发的通知事件中async参数决定,默认是异步的,执行时,具有相同事件匹配规则的方法是按照order自然排序后,顺序执行的,且传递的事件对象是同一个对象,支持修改。 代码如下:
```java
/**
* 业务事件分发器
* 说明:
*
*
* @author duanyong@jccfc.com
* @date 2021/11/20 13:50
*/
@Slf4j
public class BizEventDispatcher {
/**
* 事件过滤器规则
*/
@Autowired
private SimpleBizEventFilterRule simpleBizEventFilterRule;
/**
* 任务执行器
*/
private TaskExecutor taskExecutor;
@PostConstruct
public void init(){
taskExecutor = new TaskExecutor() {
ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
@Override
public void execute(Runnable task) {
executorService.execute(task);
}
};
}
/**
* 业务领域事件分发
* 说明:
*
* @param bizEventReq : 业务领域事件
* @author duanyong@jccfc.com
* @date 2021/11/20 14:05
*/
public void dispatch(final BizEventReq bizEventReq){
//异步执行
if (bizEventReq.async()) {
log.info("异步分发业务领域事件");
taskExecutor.execute(()->doDispatch(bizEventReq));
} else {
log.info("同步分发业务领域事件");
doDispatch(bizEventReq);
}
}
/**
* 业务领域事件分发
* 说明:
*
* @param bizEventReq : 业务领域事件
* @date 2021/12/11 15:27
*/
private void doDispatch(final BizEventReq bizEventReq){
if(!BizEventListenerRegistrar.getBizEventListenerMetaDataList().isPresent()){
log.warn("未注册事件监听方法:{}",bizEventReq.getEventType());
return;
}
//获取业务事件监听元数据集合
List bizEventMetaDataList = BizEventListenerRegistrar.getBizEventListenerMetaDataList().get();
//获取业务领域事件对象JSON字符串
final String eventObjectJsonString = JSONObject.toJSONString(bizEventReq.getEventObject());
//业务领域事件对象Map:相同参数监听方法间传递同一业务领域事件对象
Map eventObjectMap = new HashMap<>();
//执行分发
bizEventMetaDataList.forEach(bizEventListenerMetaData -> {
//根据spring bean名称查找bean对象
final Object beanObject = ApplicationContextProvider.getBean(bizEventListenerMetaData.getBeanName());
bizEventListenerMetaData.getBizEventListenerInfos().parallelStream()
.filter(bizEventListenerInfo -> simpleBizEventFilterRule.apply(bizEventReq,bizEventListenerInfo))
.forEachOrdered(bizEventListenerInfo -> execute(eventObjectJsonString,eventObjectMap,beanObject,bizEventListenerInfo.getTargetMethod()));
});
}
/**
* 执行
* 说明:
*
* @param eventObjectJsonString : 业务领域事件对象JSON字符串
* @param eventObjectMap : 业务领域事件Map
* @param beanObject : bean对象
* @param method : 方法
* @author duanyong@jccfc.com
* @date 2021/11/24 21:38
*/
private void execute(String eventObjectJsonString, Map eventObjectMap, Object beanObject, Method method) {
Class paramClass = method.getParameterTypes()[0];
Object eventObject = null;
if (eventObjectMap.containsKey(paramClass.getName())) {
eventObject = eventObjectMap.get(paramClass.getName());
} else {
eventObject = JSONObject.parseObject(eventObjectJsonString,paramClass);
eventObjectMap.put(paramClass.getName(), eventObject);
}
try {
method.invoke(beanObject, eventObject);
} catch (IllegalAccessException e) {
e.printStackTrace();
log.error("方法访问异常",e);
} catch (InvocationTargetException e) {
e.printStackTrace();
log.error("方法调用异常",e);
}
}
}
```
#### 3,Protocol
- Protocol是BizEvents-Framework和BizEvent-SDK连接的桥梁。
| 字段 | 类型 | 描述 |
| ------------- | ------- | ----------------------------------------- |
| transactionSn | String | 交易流水号 |
| eventGroup | String | 事件组 |
| eventId | String | 事件ID |
| eventVersion | String | 事件版本 |
| eventTopic | String | 事件主题 |
| eventType | String | 事件类型:
约定:事件类型=事件对象类型 |
| eventObject | Object | 事件对象 |
| async | Boolean | 是否异步 |
| timestamp | String | 事件时间戳 |
### 五,安装教程
##### 1,服务端
1. 引入依赖包:
```xml
com.javacoo
bizevents-runtime
1.0.0
```
2. 配置参数:以下是接入的最小配置示例,其中一些基础依赖配置如:RabbitMQ,数据源等未列出,按照与SpringBoot 集成方式配置即可。
```properties
## =============================业务领域事件组件配置================开始
## 业务领域事件注册插件配置-默认配置文件方式
# TEST001 TEST001->表示接入的客户端系统ID,event-type->表示注册的业务领域事件
biz.event.registry.plugin.file.groups.TEST001[0].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
# TEST001 TEST001->表示接入的客户端系统ID,topic->事件对象参数EL表达式,格式:EL表达式=取值->取值精准匹配
biz.event.registry.plugin.file.groups.TEST001[0].param-eL=#eventObject.userType=A
# TEST001 TEST001->表示接入的客户端系统ID,event-type->表示注册的业务领域事件
biz.event.registry.plugin.file.groups.TEST001[1].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
# TEST001 TEST001->表示接入的客户端系统ID,topic->事件对象参数EL表达式,格式:EL表达式=取值->取值正则表达式匹配
biz.event.registry.plugin.file.groups.TEST001[1].param-eL=#eventObject.userName=.*徐.*
# TEST001 TEST001->表示接入的客户端系统ID,event-type->表示注册的业务领域事件
biz.event.registry.plugin.file.groups.TEST001[2].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
# TEST001 TEST001->表示接入的客户端系统ID,version->事件版本
biz.event.registry.plugin.file.groups.TEST001[2].version=1.0
# TEST001 TEST001->表示接入的客户端系统ID,event-type->表示注册的业务领域事件
biz.event.registry.plugin.file.groups.TEST001[3].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
# TEST001 TEST001->表示接入的客户端系统ID,topic->事件主题
biz.event.registry.plugin.file.groups.TEST001[3].topic=天下第二
# TEST001 TEST001->表示接入的客户端系统ID,event-type->表示注册的业务领域事件
biz.event.registry.plugin.file.groups.TEST001[4].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
# TEST001 TEST001->表示接入的客户端系统ID,version->事件版本
biz.event.registry.plugin.file.groups.TEST001[4].version=1.0
# TEST001 TEST001->表示接入的客户端系统ID,topic->事件主题
biz.event.registry.plugin.file.groups.TEST001[4].topic=桃花剑神
biz.event.registry.plugin.file.groups.TEST001[5].event-type=com.javacoo.xservice.example.bean.event.ServerExampleEvent
# TEST002 业务领域事件注册配置-配置文件方式-接入:其中:TEST002-表示接入的客户端系统ID,event-type->表示注册的业务领域事件
#biz.event.registry.plugin.file.groups.TEST002[0].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
#biz.event.registry.plugin.file.groups.TEST002[1].event-type=com.javacoo.xservice.example.bean.event.ServerExampleEvent
## 业务领域事件-MQ插件配置-默认RabbitMQ
# TEST001 推送类型配置-RPC
biz.event.mq.plugin.groups.TEST001.remote-type=RPC
# TEST002 推送类型配置-HTTP
biz.event.mq.plugin.groups.TEST002.remote-type=HTTP
## 业务领域事件-远程调用插件配置
# HTTP配置
# TEST001-推送地址
biz.event.remote.plugin.http.groups.TEST001.push-url=http://127.0.0.1:8188
# TEST001-请求超时时间
biz.event.remote.plugin.http.groups.TEST001.socket-timeout=60000
# TEST002-推送地址
biz.event.remote.plugin.http.groups.TEST002.push-url=http://127.0.0.1:8080
# TEST002-请求超时时间
biz.event.remote.plugin.http.groups.TEST002.socket-timeout=60000
# TEST001 RPC配置-zk地址
biz.event.remote.plugin.rpc.groups.TEST001.address=zookeeper://zookeeper01-dev.javacoo.com:22181?backup=zookeeper02-dev.javacoo.com:22181,zookeeper03-dev.javacoo.com:22181
# TEST001 RPC配置-服务分组:格式->{客户端系统ID}_NOTICE_SERVICE
biz.event.remote.plugin.rpc.groups.TEST001.group=TEST001_NOTICE_SERVICE
# TEST001 RPC配置-服务版本
biz.event.remote.plugin.rpc.groups.TEST001.version=1.0.0
# TEST001 RPC配置-请求超时时间
biz.event.remote.plugin.rpc.groups.TEST001.socket-timeout=60000
# TEST002 RPC配置-zk地址
biz.event.remote.plugin.rpc.groups.TEST002.address=zookeeper://zookeeper01-dev.javacoo.com:22181?backup=zookeeper02-dev.javacoo.com:22181,zookeeper03-dev.javacoo.com:22181
# TEST002 RPC配置-服务分组:格式->{客户端系统ID}_NOTICE_SERVICE
biz.event.remote.plugin.rpc.groups.TEST002.group=TEST002_NOTICE_SERVICE
# TEST003 RPC配置-服务版本
biz.event.remote.plugin.rpc.groups.TEST002.version=1.0.0
# TEST004 RPC配置-请求超时时间
biz.event.remote.plugin.rpc.groups.TEST002.socket-timeout=60000
## 业务领域事件-存储插件配置
biz.event.store.plugin.jpa.enabled=true
## JPA配置
spring.jpa.show-sql=true
spring.jpa.generate-ddl=true
spring.jpa.hibernate.ddl-auto=update
spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
## 业务领域事件-定时任务插件配置
biz.event.task.plugin.impl=xxlJob
## 业务领域事件-定时任务xxljob插件配置
### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
biz.event.task.plugin.xxljob.admin-addresses=https://xxl-job-admin-javacoo.com/
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
biz.event.task.plugin.xxljob.app-name=bizevent-job
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
biz.event.task.plugin.xxljob.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
biz.event.task.plugin.xxljob.port=9999
### 执行器通讯TOKEN [选填]:非空时启用;
biz.event.task.plugin.xxljob.access-token=
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
biz.event.task.plugin.xxljob.log-path=/volume_logs
### 执行器日志保存天数 [选填] :值大于3时生效,启用执行器Log文件定期清理功能,否则不生效;
biz.event.task.plugin.xxljob.log-retention-days=-1
## =============================业务领域事件组件配置================结束
```
##### 2,客户端
所谓客户端都是相对的,只要集成了BizEvent-SDK我们都叫他客户端,目前SDK支持HTTP方式和RPC(dubbo)方式接入,具体配置如下:
1. 引入依赖包:
```xml
com.javacoo
bizevents-sdk-java
1.0.0
```
2. 配置参数:
**http接入:**如果是http接入,则无需任何配置,只需要将客户端服务地址和端口告知服务端(目前是通过人工配置方式),在服务端配置即可,如:http://127.0.0.1:8080
**RPC接入:**如果是RPC接入,则分为2种情况:
- 当前客户端系统已经集成dubbo,则参数配置如下:
```properties
#=====================业务领域事件SDK rpc-dubbo 接入配置===========
#RPC服务配置
#是否自定义dubbo配置:false->表示系统已经集成dubbo,无需配置dubbo参数
biz.event.client.rpc.dubbo.custom=false
#dubbo服务分组:格式->{系统ID}_NOTICE_SERVICE,系统ID由服务端统一分配
biz.event.client.rpc.dubbo.service.group=TEST002_NOTICE_SERVICE
#dubbo服务版本:当前服务的版本
biz.event.client.rpc.dubbo.service.version=1.0.0
```
- 当前客户端系统未集成dubbo,则参数配置如下:
```properties
#=====================业务领域事件SDK rpc-dubbo 接入配置===========
#RPC服务配置
#是否自定义dubbo配置:false->表示系统已经集成dubbo,无需配置dubbo参数
biz.event.client.rpc.dubbo.custom=true
#dubbo服务分组:格式->{系统ID}_NOTICE_SERVICE,系统ID由服务端统一分配
biz.event.client.rpc.dubbo.service.group=TEST001_NOTICE_SERVICE
#dubbo服务版本:当前服务的版本
biz.event.client.rpc.dubbo.service.version=1.0.0
#dubbo配置-application
dubbo.application.name = fund-api
dubbo.application.id = fund-api
dubbo.application.qos-enable = false
#dubbo配置-registry
dubbo.registry.id = registry
dubbo.registry.address = zookeeper://zookeeper01-dev.javacoo.com:22181?backup=zookeeper02-dev.javacoo.com:22181,zookeeper03-dev.javacoo.com:22181
#dubbo配置-protocol
dubbo.protocol.name = dubbo
dubbo.protocol.id = dubbo
dubbo.protocol.port = 20887
dubbo.protocol.host = 0.0.0.0
dubbo.protocol.heartbeat = 30
dubbo.protocol.accesslog = /volume_logs/rpc-access.log
#dubbo配置-config-center
dubbo.config-center.timeout = 60000
```
### 六,使用说明
##### 1,服务端
服务端使用BizEventPublisher类直接发布业务领域事件即可,如:
服务端发布领域事件类型:com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent,协议如下:
| 字段 | 类型 | 说明 |
| -------- | ---------- | -------- |
| userName | String | 用户名 |
| userType | String | 用户分类 |
| amount | BigDecimal | 金额 |
对应的领域事件对象:com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
```java
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ServerLoanApplyEvent {
/**
* 用户名
*/
private String userName;
/**
* 用户分类
*/
private String userType;
/**
* 金额
*/
private BigDecimal amount;
}
```
使用BizEventPublisher发布事件,此时可根据具体业务,设置事件的版本,主题
```java
//发布普通领域事件
BizEventPublisher.publish(ServerLoanApplyEvent.builder()
.userName("徐凤年")
.userType("A")
.amount(new BigDecimal("30000"))
.build());
//发布普通领域事件
BizEventPublisher.publish(ServerLoanApplyEvent.builder()
.userName("徐渭熊")
.userType("B")
.amount(new BigDecimal("10000"))
.build());
//发布普通领域事件
BizEventPublisher.publish(ServerLoanApplyEvent.builder()
.userName("温华")
.userType("C")
.amount(new BigDecimal("20000"))
.build());
//发布带主题的领域事件
BizEventPublisher.publish(ServerLoanApplyEvent.builder()
.userName("王仙芝")
.userType("A")
.amount(new BigDecimal("100000"))
.build(),"天下第二");
//发布带版本的领域事件
BizEventPublisher.publish(ServerLoanApplyEvent.builder()
.userName("李淳罡")
.userType("B")
.amount(new BigDecimal("200000"))
.build(),"","1.0");
//发布带主题+版本的领域事件
BizEventPublisher.publish(ServerLoanApplyEvent.builder()
.userName("邓太阿")
.userType("C")
.amount(new BigDecimal("300000"))
.build(),"桃花剑神","1.0");
```
如发布带主题+版本的领域事件,生成协议JSON格式如下:
```json
{
"async":false,
"eventId":"EventId_2021120900000077",
"eventVersion":"1.0",
"eventObject":{
"amount":300000,
"userType":"C",
"userName":"邓太阿"
},
"eventTopic":"桃花剑神",
"eventType":"com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",
"eventGroup":"TEST001",
"transactionSn":"TransSn_2021120900000073",
"timestamp":"20211209150610183"
}
```
##### 2,客户端
客户端只需在事件处理方法上加上 @BizEventListener注解,填写服务端发布的领域业务事件类型,主题和版本,或者参数过滤条件,即可收到服务端推送的相应业务领域事件。如:
客户端订阅服务端发布的领域事件类型:com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent,则客户端首先按照事件类型协议,定义监听对象参数对象,参数对象字段与协议保持一致:
```
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ClientLoanApplyEvent {
/**
* 用户名
*/
private String userName;
/**
* 用户分类
*/
private String userType;
/**
* 金额
*/
private BigDecimal amount;
}
```
接着定义事件监听方法,如下:
在监听方法上添加@BizEventListener注解,填写服务端发布的领域业务事件类型,主题和版本,或者参数过滤条件
```java
@Slf4j
@Service
public class BizEventTestService {
/**
* 指定事件类型+事件版本
*
* @author duanyong@jccfc.com
* @date 2021/12/11 15:20
* @param clientLoanApplyEvent: 客户端贷款申请对象
* @return: void
*/
@BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",version = "1.0",order = 1)
public void test(ClientLoanApplyEvent clientLoanApplyEvent){
log.info("BizEventTestService收到版本事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
clientLoanApplyEvent.setAmount(new BigDecimal("50000"));
try {
Thread.sleep(3*1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
/**
* 指定事件类型+事件主题
*
* @author duanyong@jccfc.com
* @date 2021/12/11 15:20
* @param clientLoanApplyEvent: 客户端贷款申请对象
* @return: void
*/
@BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",topic = "天下第二",order = 1)
public void test1(ClientLoanApplyEvent clientLoanApplyEvent){
log.info("BizEventTestService收到主题事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
clientLoanApplyEvent.setAmount(new BigDecimal("70000"));
try {
Thread.sleep(3*1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
/**
* 指定事件类型+事件主题+事件主题
*
* @author duanyong@jccfc.com
* @date 2021/12/11 15:20
* @param clientLoanApplyEvent: 客户端贷款申请对象
* @return: void
*/
@BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",version = "1.0",topic = "桃花剑神",order = 1)
public void test2(ClientLoanApplyEvent clientLoanApplyEvent){
log.info("BizEventTestService收到版本+主题事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
try {
Thread.sleep(3*1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
/**
* 指定事件类型
*
* @author duanyong@jccfc.com
* @date 2021/12/11 15:20
* @param clientLoanApplyEvent: 客户端贷款申请对象
* @return: void
*/
@BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent")
public void test4(ClientLoanApplyEvent clientLoanApplyEvent){
log.info("BizEventTestService收到事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
try {
Thread.sleep(2*1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
/**
* 指定事件类型+值正则匹配
*
* @author duanyong@jccfc.com
* @date 2021/12/11 15:20
* @param clientLoanApplyEvent: 客户端贷款申请对象
* @return: void
*/
@BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",paramEl = "#eventObject.userName=.*徐.*")
public void test5(ClientLoanApplyEvent clientLoanApplyEvent){
log.info("BizEventTestService收到值正则匹配事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
try {
Thread.sleep(2*1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
/**
* 指定事件类型+值精准匹配
*
* @author duanyong@jccfc.com
* @date 2021/12/11 15:20
* @param clientLoanApplyEvent: 客户端贷款申请对象
* @return: void
*/
@BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",paramEl = "#eventObject.userType=A")
public void test6(ClientLoanApplyEvent clientLoanApplyEvent){
log.info("BizEventTestService收到值精准匹配事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
try {
Thread.sleep(2*1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
/**
* 指定事件类型
*
* @author duanyong@jccfc.com
* @date 2021/12/11 15:20
* @param clientExampleEvent: 客户端演示对象
* @return: void
*/
@BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerExampleEvent")
public void handleClientExampleEvent(ClientExampleEvent clientExampleEvent){
log.info("BizEventTestService->handleClientExampleEvent收到事件推送:{}", FastJsonUtil.toJSONString(clientExampleEvent));
try {
Thread.sleep(1*1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
```
**注意事项:**
1. 业务领域事件监听方法参数对象字段,需要与服务端推送的业务领域对象字段保持一致,即必须根据服务端发布的业务领域事件类型来定义客户端业务领域事件监听方法参数对象。
2. SDK约定业务领域事件监听方法所在服务类必须添加@Service注解。
**效果**
1. 值正则匹配

2. 值精准匹配

3. 事件类型+事件版本

4. 事件类型+事件主题

5. 事件类型+事件版本+事件主题

6. 事件类型匹配

### 七,后续规划
1. 增加BizEvent-SDK发布事件的能力
2. 增加预警能力
3. 升级框架为BizEvents-Bus
### 八,问题及局限性
1. 事件的发布,存储,推送MQ,MQ消费,推送都在一个工程,降低了系统的可靠性及可用性:后续可以考虑拆分,分别单独部署。
2. 事件消费,依赖推送是否成功,依赖客户端的处理能力,虽然目前SDK提供了持久化接口,由客户端决定是否持久化,且事件的分发默认是异步的,可以最大程度提高响应速度,但是客户端性能是不可控的,当遇到客户端响应超时,将导致MQ消息堆积,将影响服务端的性能。
3. 目前BizEvents-Framework还是一个雏形,是基于对现有业务的梳理,理解和抽象,结合现有相关系统特点加上自己的一些想法而得出的设计原型。这也算是我对于这段时间工作的梳理和总结,对接下来的工作任务也有参考和指导意义。由于本人涉及到的业务领域有限,其设计存在局限性和不合理的地方,仅供学习和交流,不足之处欢迎吐槽和指正,本人将不胜感激:)
#### 参与贡献
https://www.cnblogs.com/davenkin/p/microservices-and-domain-events.html
1. Fork 本仓库
2. 新建 Feat_xxx 分支
3. 提交代码
4. 新建 Pull Request
#### 特技
1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md
2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com)
3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目
4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help)
6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)