# dapeng-event-bus
**Repository Path**: barrywang/dapeng-event-bus
## Basic Information
- **Project Name**: dapeng-event-bus
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2019-05-10
- **Last Updated**: 2020-12-19
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## 概览
- 案例
- 事件发布
- 前置条件
- IDL定义
- 关键性配置
- 发布任务服务实现
- 事件发布
- 事件订阅
- 依赖
- 作为订阅者
- 既是事件发送者,也是订阅者?
- 示例项目
## 案例
- 假设一个 A 服务为事件发送方,B 服务为事件订阅方
- 假设 A 服务中的 register 接口入库操作后,会发送 RegisteredEvent
- 假设 B 服务订阅了该事件消息,由订阅者自行处理订阅到的消息
## 事件发布
### 前置条件
- 依赖项
```xml
"com.today" % "event-bus_2.12" % "0.1-SNAPSHOT"
```
- 数据库存储支持,需在业务数据库中加入此表
```SET NAMES utf8;
SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS `dp_common_event`;
CREATE TABLE `dp_common_event` (
`id` bigint(20) NOT NULL COMMENT '事件id,全局唯一, 可用于幂等操作',
`event_type` varchar(255) DEFAULT NULL COMMENT '事件类型',
`event_binary` blob DEFAULT NULL COMMENT '事件内容',
`updated_at` timestamp NOT NULL DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Table structure for `event_lock`
-- ----------------------------
DROP TABLE IF EXISTS `dp_event_lock`;
CREATE TABLE `dp_event_lock` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of `event_lock`
-- ----------------------------
BEGIN;
INSERT INTO `dp_event_lock` VALUES ('1', 'event_lock');
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
```
### IDL定义
- 以事件双方约定的消息内容定义IDL结构体
- 规定必须为每个事件定义事件ID,以便消费者做消息幂等
`==> events.thrift`
```thrift
namespace java com.github.dapeng.user.events
/**
* 注册成功事件, 由于需要消费者做幂等,故加上事件Id
**/
struct RegisteredEvent {
/**
* 事件Id
**/
1: i64 id,
/**
* 用户id
**/
2: i64 userId
}
...more
```
#### IDL服务接口事件声明
- 接口可能会触发一个或多个事件
`== >user_service.thrift`
```thrfit
namespace java com.github.dapeng.user.service
include "user_domain.thrift"
include "events.thrift"
/**
* 事件发送端业务服务
**/
service UserService{
/**
# 用户注册
## 事件
注册成功事件,激活事件
**/
string register(user_domain.User user)
(events="events.RegisteredEvent,events.ActivedEvent")
...more
}(group="EventTest")
```
#### IDL事件消息发布任务服务
新版本不需要在thrift里面定义定时发布消息的任务
### 关键性配置(定时任务)
`==> spring/services.xml`
注意`init-method`指定 startScheduled
```xml
```
- topic kafka消息topic,领域区分(建议:领域_版本号_event)
- kafkaHost kafka集群地址(如:127.0.0.1:9091,127.0.0.1:9092)
- tidPrefix kafka事务id前缀,领域区分
- dataSource 使用业务的 dataSource
`==>config_user_service.properties`
```xml
# event config
kafka_topic=user_1.0.0_event
kafka_producer_host=127.0.0.1:9092
kafka_tid_prefix=user_1.0.0
```
在dapeng.properties中配置:
```
soa.eventbus.publish.period=500 //代表轮询数据库消息库时间,如果对消息及时性很高,请将此配置调低,建议最低为100ms,默认配置是1000ms
```
### 事件触发
- 在做事件触发前,你需要实现 `AbstractEventBus` ,并将其交由spring托管,来做自定义的本地监听分发
`==>commons/EventBus.scala`
```scala
object EventBus extends AbstractEventBus {
/**
* 事件在触发后,可能存在本地的监听者,以及跨领域的订阅者
* 本地监听者可以通过实现该方法进行分发
* 同时,也会将事件发送到其他领域的事件消息订阅者
* @param event
*/
override def dispatchEvent(event: Any): Unit = {
event match {
case e:RegisteredEvent =>
// do somthing
case _ =>
LOGGER.info(" nothing ")
}
}
override def getInstance: EventBus.this.type = this
}
```
- 当本地无任何监听时==>
```scala
override def dispatchEvent(event: Any): Unit = {}
```
`==> spring/services.xml`
```xml
```
- 事件发布
```scala
EventBus.fireEvent(RegisteredEvent(event_id,user.id))
```
---
# 事件定时发布修改:
在dapeng.properties加入环境变量配置
```
//每次轮询间隔事件为100ms
soa.eventbus.publish.period=100
```
在业务系统的`services.xml`中配置,指定初始化方法,即定时轮询任务的方法:
```
```
## 重点: 配置轮询发布消息的时间间隔,以ms为单位,在dapeng.properties中配置
```
soa.eventbus.publish.period=500 //代表500ms
```
# 生产方因为轮询数据库发布消息,如果间隔很短,会产生大量的日志,需要修改级别,在logback下进行如下配置:
```
true
注意: 这里detail- 后面 加自己系统的名字。 例如这里的 goods
${soa.base}/logs/detail-goods-eventbus.%d{yyyy-MM-dd}.log
30
%d{MM-dd HH:mm:ss SSS} %t %p - %m%n
```
---
## 事件订阅
### 依赖
```xml
com.today
event-bus_2.12
0.1-SNAPSHOT
com.today
user-api_2.12
0.1-SNAPSHOT
"com.today" % "event-bus_2.12" % "0.1-SNAPSHOT",
"com.today" % "user-api_2.12" % "0.1-SNAPSHOT"
```
注解支持配置:
```xml
```
附(kafka日志级别调整):
`==>logback.xml`
```
```
作为一个订阅者
```java
// java
@KafkaConsumer(groupId = "eventConsumer1", topic = "user_1.0.0_event",kafkaHostKey = "kafka.consumer.host"))
public class EventConsumer {
@KafkaListener(serializer = RegisteredEventSerializer.class)
public void subscribeRegisteredEvent(RegisteredEvent event){
LOGGER.info("Subscribed RegisteredEvent ==> {}",event.toString());
}
...
}
```
## 注意: 订阅方在消费消息时,如果处理消息可能会抛出业务异常(就是业务有关的异常,如前置检查不通过,等等),在消费消息时,需要捕获业务系统。
```
@KafkaListener(serializer = classOf[ModifySkuBuyingPriceEventSerializer])
def modifySkuBuyingPriceEvent(event: ModifySkuBuyingPriceEvent): Unit = {
// 重点
try {
logger.info(s"=====> ModifySkuBuyingPriceEvent")
val ModifySkuBuyingPriceItemList = event.modifySkuBuyingPriceEventItems.map(
x => build[ModifySkuBuyingPriceConsumer](x)()
)
val result = consumer.modifySkuBuyingPrice(ModifySkuBuyingPriceItemList) // 收到事件后调用业务接口示例
logger.info(s"收到消息$event =>成功修改sku进价, ${result} ")
} catch {
//logger的写法自己定义
case e: SoaException => logger.error("业务抛出的异常,消息不会重试", e)
}
}
```
//scala
```
serializer = classOf[RegisteredEventSerializer]
```
#### @KafkaConsumer
- groupId 订阅者领域区分
- topic 订阅的 kafka 消息 topic
- kafkaHostKey 可自行配置的kafka地址,默认值为`dapeng.kafka.consumer.host`。可以自定义以覆盖默认值
- 用户只要负责把这些配置放到env或者properties里面
- 如:`System.setProperty("kafka.consumer.host","127.0.0.1:9092");`
#### @KafkaListener
- serializer 事件消息解码器,由事件发送方提供.
## 既是消费者也是订阅者?
> 如果服务既有接口会触发事件,也存在订阅其他领域的事件情况。只要增加缺少的配置即可
## 重点可以看如下发布者demo
```
https://github.com/leihuazhe/publish-demo
```
## 示例项目
- [事件发送端demo(sbt)](https://github.com/leihuazhe/publish-demo)
- [事件订阅端demo(sbt)](http://pms.today36524.com.cn:8083/basic-services/consumer-demo)
- [事件订阅端demo(maven)](https://github.com/StruggleYang/event-consumer)
- [eventbus](http://pms.today36524.com.cn:8083/basic-services/eventBus)
---
[原地址](http://pms.today36524.com.cn:8083/basic-services/eventBus
)