# flink-real-time-rule-alert **Repository Path**: Jing_jun_yang/flink-real-time-rule-alert ## Basic Information - **Project Name**: flink-real-time-rule-alert - **Description**: 基于Flink的实时规则告警,动态监听MySQL中的告警规则,发送满足规则的告警告警记录到AlertManager - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 15 - **Created**: 2023-02-28 - **Last Updated**: 2023-02-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 一、背景需求介绍 实时监控系统需要满足对多种来源的数据进行告警,为提升系统的可扩展行和灵活性,采用动态规则配置来实现多种数据源、多种告警规则的实时告警 ### 1、数据来源 车机应用端数据源: | 数据源内容 | host | port | topic | | --------------------- | -------------------------------------------------------- | ---- | ---------------- | | Sentry 中的异常、奔溃 | tsp3dev07.para.com | 9092 | events | | 车机端埋点数据 | tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com | 9092 | data-monitor-ivi | 云端数据源: | 数据源内容 | host | port | topic | | -------------- | -------------------------------------------------------- | ---- | ------------------ | | 云端埋点数据 | tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com | 9092 | cloud_cp_data | | 云端微服务日志 | tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com | 9092 | data-monitor-cloud | ## 二、系统架构设计 #### 1、系统分层架构设计 image-20210714152954146 本着高内聚低耦合的原则,实时告警系统采用分层设计的思想对整体的功能模块进行组合,其中: 1、Flink DataStream 层的功能是数据流在Flink内部的整体流向DAG图,如`addSource`、`connect`、`process`、`addSink`; 2、Flink Function 层的功能是对function的具体实现,如`AlertManagerSinkFunction`、`CustomMysqlSourceFunction`、`RuleMatchBroadCastProcessFunction`等; 3、Service 层是业务的处理过程,如负责向`AlertManager`传输数据的`AlertManagerService`、负责规则同步、更新、维护、转化、匹配的 `RulesService`。 #### 2、业务模块设计 image-20210714143353351 说明:业务上,需要告警的数据源目前有4中数据来源,分别是远端日志、云端微服务日志、车机端埋点、Sentry异常奔溃,其中Sentry 中的数据需要通过告警规则的筛选后发送到kafka中用于实时监控。设计上首先通过Driver中的class 路由到**通用JSON告警模块**或者**Sentry异常奔溃业务处理模块**,其次通过app.type 选择kafka中的数据源。 #### 3、Flink DataStream 处理流程图 ![image-20210714144358649](images/Flink DataStream 处理流程图.png) 说明:**DataStream 处理流程图**展示的是数据从Kafka消费后再Flink Function 中的流向关系,Driver 负责Flink程序的启动,通过class筛选路由到**通用JSON告警**或者**Sentry异常崩溃模块**,其中内部的逻辑比较相似: 1、首先Mysql中的配置通过**自定义数据源模块**会被解析成**配置流**; 2、其次kafka topic 会被解析成**数据流**,通过广播连接,配置流会被广播到每个数据流的TaskManager; 3、通过**规则匹配模块**对数据流和规则流进行匹配; 4、匹配到数据筛选出非Sentry中的数据分别发送到AlertManager实时告警、MySQL告警统计、kafka 实时监控 #### 4、规则匹配模块设计 image-20210709113130222 规则匹配模块核心使用的是Avaitor规则引擎表达式进行规则匹配,匹配的内容来源于: 1、数据流的JSON通过flattenAsMap转成map; 2、规则流中有效的Rule中获取得到的规则表达式。 #### 5、规则设计 规则存储在MySQL中便于管理和修改,表结构如下: ![image-20210714151501773](images/规则表结构.png) 其中,各字段解释如下: id:为唯一的规则id,一个程序中id被认为是同一条规则; name:规则的名称,用于规则的解释和告警输出; exp:规则表达式,用于和数据流匹配; update_time:规则变化时的更新时间; create_time:规则的创建时间; is_valid:是否有效规则,匹配的时候只使用有效的规则; app_type:应用类型,只有当数据流中的ap_type和程序启动时的app.type 相同时才有可能匹配成功。 建表语句为: ```sql CREATE TABLE IF NOT EXISTS `flink`.`flink-alert-rule123` ( `id` int(16) NOT NULL AUTO_INCREMENT COMMENT '主键id', `name` varchar(255) DEFAULT NULL COMMENT '规则名称', `exp` varchar(1020) DEFAULT NULL COMMENT '规则表达式', `update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新规则时间', `create_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '创建规则时间', `is_valid` int(1) NOT NULL COMMENT '规则是否有效,无效不会告警', `app_type` varchar(255) NOT NULL COMMENT '规则适用的应用类型,值必须是AppType的枚举值,忽略大小写', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8; ``` #### 6、规则表达说明 1、规则表中的exp字段用来存储规则表达式,用aviator表达式引擎执行JSON是否满足**表达式**,返回boolean值, 2、规则的最简单格式为:`key 比较运算符 value` - `key` 是要匹配的JSON中的全路径key,不支持特殊字符,支持`.` - `比较运算符` 可以是 `==`、`!=`、`<` 、`>` 等参考:https://www.yuque.com/boyan-avfmj/aviatorscript/lvabnw - `value` 是数字或者字符串,包含特殊字符用`''`包裹 3、规则的例子: - JSON格式的数据源: ```json { "common_data":{ "appPackage":"ltd.qisi.sotasupportapp", "appVersion":"3.03.01.000", "collectedTime":1625240289781, "behaviorId":"50026003", "qisiuiVersion":"0.2.02", "uid":"1924427992", "regionCode":"659001", "eventName":"mock", "vin":"MOCK1TELWMOMZRQAWO", "hardwareVersion":"3.03.01.000", "carseries":"E115", "pdsn":"47556519116431", "displayId":"0" }, "gather_data":{ "key1":"value5", "key2":"69", "key3":"0" } } ``` - 告警规则 `common_data.appPackage == 'ltd.qisi.sotasupportapp'` 表示`common_data.appPackage`字段等于'`td.qisi.sotasupportapp` #### 7、输出业务告警数据格式设计 1、车机端告警统计格式 ```sql CREATE TABLE `flink`.`flink-alert-data` ( `app_package` varchar(255) comment 'app包名' ,`collected_time` bigint(16) comment '数据' ,`behavior_id` varchar(255) comment '' ,`qisiui_version` varchar(255) comment '' ,`uid` varchar(255) comment '用户id' ,`region_code` varchar(255) comment '' ,`os_version` varchar(255) comment '' ,`event_name` varchar(255) comment '' ,`vin` varchar(255) comment 'vin码' ,`hardware_version` varchar(255) comment '' ,`carseries` varchar(255) comment '' ,`pdsn` varchar(255) comment '' ,`display_id` varchar(255) comment '屏幕id[ 0:主控屏;1:副驾屏;2:左后排屏;3:右后排屏;-1:未知]' ,`rule_name` varchar(255) comment '' ,`rule_id` varchar(255) comment '' ,`rule_exp` varchar(255) comment '' ) ENGINE=InnoDB DEFAULT CHARSET=utf8; ``` 2、云端告警统计格式 ```sql CREATE TABLE `flink`.`flink-cloud-alert-data` ( `microservice` varchar(255) comment '' ,`reqPath` varchar(255) comment '' ,`clicnetIP` varchar(255) comment '' ,`resultCode` varchar(255) comment '' ,`createDate` varchar(255) comment '' ,`ctime` varchar(255) comment '' ,`rule_name` varchar(255) comment '' ,`rule_id` varchar(255) comment '' ,`rule_exp` varchar(255) comment '' ) ENGINE=InnoDB DEFAULT CHARSET=utf8; ``` #### 8、AlertManager告警模块设计 对接AlertManager的模块为`ISendService`的实现类,通过调用`void send(AlertManagerData data)`方法把数据发送的出去,其其实类会调用AlertManager的post请求发送json数据,请求的基本格式为: ```shell curl -XPOST http://localhost:9093/api/v1/alerts -d ' [ { "labels": { "alertname": "DiskRunningFull", "dev": "sda1", "instance": "中文测试", "route": "WEBHOOK" }, "annotations": { "info": "The disk sda1 is running full", "summary": "please check the instance example1" }, "Source": { "link": "http://www.baidu.com" } } ] ' ``` ## 三、提交 yarn 命令 flink任务的执行流程为: 1、mvn打包; ```shell $ mvn clean package ``` 2、把打包好的jar包上传到hdfs,路径为 /flink/jobs/; ```shell # 删除久的jar $ hadoop fs -rm /flink/jobs/flink-AlertManager-v1.1.jar # 上传jar包到hdfs $ hadoop fs -put ./flink-AlertManager-v1.1.jar /flink/jobs/ ``` 3、提交yarn-applicationn ```shell # 提交yarn-applicationn $ flink run-application -t yarn-applicationn -Dyarn.application.name="Flink Alert System" hdfs:///flink/jobs/flink-AlertManager-v0.1.jar MysqlAndAlertManagerSink --app.type CLIENT_EVENT ``` #### 必填参数: - 执行的class类 , MysqlAndAlertManagerSink - 应用类型 --app.type AppType的枚举值,忽略大小写 #### 执行任务可选的参数: ###### 1、Mysql 可选参数 ```shell --mysql.conf.file alert/mysql.properties --mysql.host sit.dbaas.private --mysql.port 13500 --mysql.db flink --mysql.table flink.flink-alert-rule --mysql.username tsp --mysql.passwd TspMysql2020! ``` ###### 2、AlertManager 可选参数 ```shell --am.route --am.host 10.6.215.39 --am.port 9093 ``` ###### 3、Kafka 可选参数 ```shell --kafka.sink.conf.file alert/producer.properties --kafka.source.conf.file alert/kafka.properties --kafka.sink.topic sentry-sink-topic-test ```