# flink-real-time-rule-alert
**Repository Path**: Jing_jun_yang/flink-real-time-rule-alert
## Basic Information
- **Project Name**: flink-real-time-rule-alert
- **Description**: 基于Flink的实时规则告警,动态监听MySQL中的告警规则,发送满足规则的告警告警记录到AlertManager
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 15
- **Created**: 2023-02-28
- **Last Updated**: 2023-02-28
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
## 一、背景需求介绍
实时监控系统需要满足对多种来源的数据进行告警,为提升系统的可扩展行和灵活性,采用动态规则配置来实现多种数据源、多种告警规则的实时告警
### 1、数据来源
车机应用端数据源:
| 数据源内容 | host | port | topic |
| --------------------- | -------------------------------------------------------- | ---- | ---------------- |
| Sentry 中的异常、奔溃 | tsp3dev07.para.com | 9092 | events |
| 车机端埋点数据 | tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com | 9092 | data-monitor-ivi |
云端数据源:
| 数据源内容 | host | port | topic |
| -------------- | -------------------------------------------------------- | ---- | ------------------ |
| 云端埋点数据 | tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com | 9092 | cloud_cp_data |
| 云端微服务日志 | tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com | 9092 | data-monitor-cloud |
## 二、系统架构设计
#### 1、系统分层架构设计
本着高内聚低耦合的原则,实时告警系统采用分层设计的思想对整体的功能模块进行组合,其中:
1、Flink DataStream 层的功能是数据流在Flink内部的整体流向DAG图,如`addSource`、`connect`、`process`、`addSink`;
2、Flink Function 层的功能是对function的具体实现,如`AlertManagerSinkFunction`、`CustomMysqlSourceFunction`、`RuleMatchBroadCastProcessFunction`等;
3、Service 层是业务的处理过程,如负责向`AlertManager`传输数据的`AlertManagerService`、负责规则同步、更新、维护、转化、匹配的 `RulesService`。
#### 2、业务模块设计
说明:业务上,需要告警的数据源目前有4中数据来源,分别是远端日志、云端微服务日志、车机端埋点、Sentry异常奔溃,其中Sentry 中的数据需要通过告警规则的筛选后发送到kafka中用于实时监控。设计上首先通过Driver中的class 路由到**通用JSON告警模块**或者**Sentry异常奔溃业务处理模块**,其次通过app.type 选择kafka中的数据源。
#### 3、Flink DataStream 处理流程图

说明:**DataStream 处理流程图**展示的是数据从Kafka消费后再Flink Function 中的流向关系,Driver 负责Flink程序的启动,通过class筛选路由到**通用JSON告警**或者**Sentry异常崩溃模块**,其中内部的逻辑比较相似:
1、首先Mysql中的配置通过**自定义数据源模块**会被解析成**配置流**;
2、其次kafka topic 会被解析成**数据流**,通过广播连接,配置流会被广播到每个数据流的TaskManager;
3、通过**规则匹配模块**对数据流和规则流进行匹配;
4、匹配到数据筛选出非Sentry中的数据分别发送到AlertManager实时告警、MySQL告警统计、kafka 实时监控
#### 4、规则匹配模块设计
规则匹配模块核心使用的是Avaitor规则引擎表达式进行规则匹配,匹配的内容来源于:
1、数据流的JSON通过flattenAsMap转成map;
2、规则流中有效的Rule中获取得到的规则表达式。
#### 5、规则设计
规则存储在MySQL中便于管理和修改,表结构如下:

其中,各字段解释如下:
id:为唯一的规则id,一个程序中id被认为是同一条规则;
name:规则的名称,用于规则的解释和告警输出;
exp:规则表达式,用于和数据流匹配;
update_time:规则变化时的更新时间;
create_time:规则的创建时间;
is_valid:是否有效规则,匹配的时候只使用有效的规则;
app_type:应用类型,只有当数据流中的ap_type和程序启动时的app.type 相同时才有可能匹配成功。
建表语句为:
```sql
CREATE TABLE IF NOT EXISTS `flink`.`flink-alert-rule123` (
`id` int(16) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`name` varchar(255) DEFAULT NULL COMMENT '规则名称',
`exp` varchar(1020) DEFAULT NULL COMMENT '规则表达式',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新规则时间',
`create_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '创建规则时间',
`is_valid` int(1) NOT NULL COMMENT '规则是否有效,无效不会告警',
`app_type` varchar(255) NOT NULL COMMENT '规则适用的应用类型,值必须是AppType的枚举值,忽略大小写',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
```
#### 6、规则表达说明
1、规则表中的exp字段用来存储规则表达式,用aviator表达式引擎执行JSON是否满足**表达式**,返回boolean值,
2、规则的最简单格式为:`key 比较运算符 value`
- `key` 是要匹配的JSON中的全路径key,不支持特殊字符,支持`.`
- `比较运算符` 可以是 `==`、`!=`、`<` 、`>` 等参考:https://www.yuque.com/boyan-avfmj/aviatorscript/lvabnw
- `value` 是数字或者字符串,包含特殊字符用`''`包裹
3、规则的例子:
- JSON格式的数据源:
```json
{
"common_data":{
"appPackage":"ltd.qisi.sotasupportapp",
"appVersion":"3.03.01.000",
"collectedTime":1625240289781,
"behaviorId":"50026003",
"qisiuiVersion":"0.2.02",
"uid":"1924427992",
"regionCode":"659001",
"eventName":"mock",
"vin":"MOCK1TELWMOMZRQAWO",
"hardwareVersion":"3.03.01.000",
"carseries":"E115",
"pdsn":"47556519116431",
"displayId":"0"
},
"gather_data":{
"key1":"value5",
"key2":"69",
"key3":"0"
}
}
```
- 告警规则 `common_data.appPackage == 'ltd.qisi.sotasupportapp'` 表示`common_data.appPackage`字段等于'`td.qisi.sotasupportapp`
#### 7、输出业务告警数据格式设计
1、车机端告警统计格式
```sql
CREATE TABLE `flink`.`flink-alert-data` (
`app_package` varchar(255) comment 'app包名'
,`collected_time` bigint(16) comment '数据'
,`behavior_id` varchar(255) comment ''
,`qisiui_version` varchar(255) comment ''
,`uid` varchar(255) comment '用户id'
,`region_code` varchar(255) comment ''
,`os_version` varchar(255) comment ''
,`event_name` varchar(255) comment ''
,`vin` varchar(255) comment 'vin码'
,`hardware_version` varchar(255) comment ''
,`carseries` varchar(255) comment ''
,`pdsn` varchar(255) comment ''
,`display_id` varchar(255) comment '屏幕id[ 0:主控屏;1:副驾屏;2:左后排屏;3:右后排屏;-1:未知]'
,`rule_name` varchar(255) comment ''
,`rule_id` varchar(255) comment ''
,`rule_exp` varchar(255) comment ''
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
```
2、云端告警统计格式
```sql
CREATE TABLE `flink`.`flink-cloud-alert-data` (
`microservice` varchar(255) comment ''
,`reqPath` varchar(255) comment ''
,`clicnetIP` varchar(255) comment ''
,`resultCode` varchar(255) comment ''
,`createDate` varchar(255) comment ''
,`ctime` varchar(255) comment ''
,`rule_name` varchar(255) comment ''
,`rule_id` varchar(255) comment ''
,`rule_exp` varchar(255) comment ''
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
```
#### 8、AlertManager告警模块设计
对接AlertManager的模块为`ISendService`的实现类,通过调用`void send(AlertManagerData data)`方法把数据发送的出去,其其实类会调用AlertManager的post请求发送json数据,请求的基本格式为:
```shell
curl -XPOST http://localhost:9093/api/v1/alerts -d '
[
{
"labels": {
"alertname": "DiskRunningFull",
"dev": "sda1",
"instance": "中文测试",
"route": "WEBHOOK"
},
"annotations": {
"info": "The disk sda1 is running full",
"summary": "please check the instance example1"
},
"Source": {
"link": "http://www.baidu.com"
}
}
]
'
```
## 三、提交 yarn 命令
flink任务的执行流程为:
1、mvn打包;
```shell
$ mvn clean package
```
2、把打包好的jar包上传到hdfs,路径为 /flink/jobs/;
```shell
# 删除久的jar
$ hadoop fs -rm /flink/jobs/flink-AlertManager-v1.1.jar
# 上传jar包到hdfs
$ hadoop fs -put ./flink-AlertManager-v1.1.jar /flink/jobs/
```
3、提交yarn-applicationn
```shell
# 提交yarn-applicationn
$ flink run-application -t yarn-applicationn -Dyarn.application.name="Flink Alert System" hdfs:///flink/jobs/flink-AlertManager-v0.1.jar MysqlAndAlertManagerSink --app.type CLIENT_EVENT
```
#### 必填参数:
- 执行的class类 , MysqlAndAlertManagerSink
- 应用类型 --app.type AppType的枚举值,忽略大小写
#### 执行任务可选的参数:
###### 1、Mysql 可选参数
```shell
--mysql.conf.file alert/mysql.properties
--mysql.host sit.dbaas.private
--mysql.port 13500
--mysql.db flink
--mysql.table flink.flink-alert-rule
--mysql.username tsp
--mysql.passwd TspMysql2020!
```
###### 2、AlertManager 可选参数
```shell
--am.route
--am.host 10.6.215.39
--am.port 9093
```
###### 3、Kafka 可选参数
```shell
--kafka.sink.conf.file alert/producer.properties
--kafka.source.conf.file alert/kafka.properties
--kafka.sink.topic sentry-sink-topic-test
```