# Real-time Data Warehouse
**Repository Path**: chen_shuai_jun/real-time-data-warehouse
## Basic Information
- **Project Name**: Real-time Data Warehouse
- **Description**: 本项目是使用flink作为实时计算引擎,使用维度建模理论,构建的电商实时数仓。主要进行用户域、流量域、交易域、互动域和工具域五个领域的数据分析,并最后生成实时的报表。
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2023-08-04
- **Last Updated**: 2023-08-05
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Real-time Data Warehouse
## 项目描述
数据仓库是一个为数据分析而设计的企业级数据管理系统。数据仓库可集中、整合多个信息源的大量数据,借助数据仓库的分析能力,企业可从数据中获得宝贵的信息进而改进决策.
该项目主要包括有数据的采集,数仓的搭建、工作流调度和可视化展示。数据的采集主要有两部分组成,分别是日志数据的采集和业务数据的采集。数仓主要是有5层,分别是ods、
dwd、dim、dws、ads。工作流的调度采用的DolphinScheduler进行每日的调度。可视化采用的是superset进行展示。
## 软件架构

使用到的技术:maxwell1.29、flume1.9、kafka2.12、flink1.13、redis6.0.8、Hbase2.0.5、clickhouse21.7、zookeeper3.5.7、mysql5.7.16、phoenix5.0
## 项目具体介绍
### 项目的价值
对于项目本身而言,项目计算的指标可以指导公司下一步的重点方向,比如计算用户的跳出率,可以分析出我们在app首页上推荐的内容是否能够吸引到用户,如果用户跳出率比较的
高,就说明我们在首页推荐的内容并不符合用户的口味,再比如计算支付成功的指标,可以分析出用户从下单到支付的比例,如果下单率比较的高支付的订单却比较的少,就可以分析
出,我们关于支付页面的设计是不是不太友好,才使得用户下单之后没有进行支付。
### 项目的流程
项目整体分成四个部分:数据的采集 --> 维度建模 --> 指标体系建设 --> 报表可视化展示
#### 数据采集:
数据采集模块主要采集两部分的数据:(1)用户行为数据(前端埋点产生)(2)业务数据(来自于业务数据库)
数据采集通道是将数据从
日志服务器 \
-----采集-----> kafka
业务服务器 /
数据的采集使用到了下面的技术:flume、maxwell
flume主要用于从日志服务器采集日志数据到kafak
maxwell主要用于从业务服务器采集业务数据到kafka
#### 维度建模:
这一部分主要按照维度建模理论,构建ODS层、DWD层、DIM层
(1)ODS层主要的目的是做数据的备份,保持数据的原貌不做修改、采用压缩减少空间的占用、采用分区防止全表扫描
(2)DWD层主要做两件事:数据清洗、脱敏和构建事实表
数据清洗:
主要清洗掉那些不符合规范的数据,比如json格式有误,
数据脱敏:
主要是对手机、证件号码进行md5加密
事实表:
按照维度建模理论,事实表分成事务型事实表、周期性快照事实表、累计型快照事实表。
对于实时数仓而言我们通常是追求数据的时效性,所以实时数仓中我们只用到了事务型事实表。
事务型事实表的构建流程是:
选择业务过程:选择我们需要感兴趣的业务过程,支付、下单
声明粒度:一行数据所表达的信息,比如一次下单、一天下单、一个月的下单,通常我们是保持最细的粒度,这样可以保证我们后续能够做更多 的统计需求
确认维度:确认该事实表所需要关联的维度数据
确认事实:其实就是度量值,比如次数、件数、金额...
最后将数据存储在kafka中
(3)DIM层主要的目的是构建维度表,对于实时数仓而言我们没有进行维度整合,因为一旦参与维度整合的某个表的字段发生的更改,我们需要重新查询并且更改维度
表中的数据。这样在后续的处理中较为的麻烦:比如离线数仓中的商品维度表,它是由:商品表、spu表、一级分类、二级分类、三级分类、商品平台属性表、商品
交易属性表、trademark表通过join得到,这就意味着这八张表中的任何一张表发生变化,这个商品维度表都需要重新join,所以这对于实时场景是不适用的。我们
将维度表存储在HBase中,为了方面后续维表的扩展,我们在mysql中存储了一张配置表,通过flinkCDC监控这张配置表的变化,从而实现维度表的动态扩展。
##### 指标体系建设:
这里我们的ADS层主要分成了六个域主要进行下面指标的统计:
流量域:
各渠道流量统计:主要统计的是各个渠道、独立访客数、会话数、平均浏览页面数、平均会话时长、跳出率
流量分时统计:统计每小时的独立访客数和页面浏览数
新老访客流量统计:统计当日的新老访客数
关键词统计:统计搜索关键词
用户域:
用户变更统计:七日回流
新增用户统计:当日新增用户数
用户行为分析:漏斗分析---> 浏览首页-->详情页-->加购-->下单-->支付 人数的变化
商品域:
各品牌交易统计
各品类交易统计
交易域:
交易综合统计:下单人数、退单人数
各省份交易统计
优惠券域:
当日优惠券比贴率
活动域:
当日活动补贴率
为了减少join的次数,提高数据的复用率,我们这里将:相同业务过程、统计粒度、统计周期的需求抽离出来构建DWS层
比如为了服务ADS层的流量域,我们会在DWS层构建:流量域版本-渠道-地区-用户类别粒度页面浏览汇总表
会将DWD层的:
页面浏览事务型事实表:计算页面浏览时长、页面浏览会话数、页面浏览数
用户跳出事务型事实表:用户跳出
独立访客事务型事实表:独立访客
将这三张表进行join,然后将数据存储在ClickHouse中
可视化展示
通过Springboot开发数据接口,从clickhouse中查询数据,然后将数据封装成sugar所需要的格式,对接sugar进行周期性的访问。
### 项目细节的叙述:
数据采集部分的细节:
maxwell采集业务数据的细节:
(1)为什么使用maxwell。
maxwell可以实现历史数据的同步,同时可以实现断点续传。maxwell的数据格式为json类型,数据比较的轻量,并且方便解析。
(2)maxwell的原理
maxwell利用mysql的主从复制,通过监控binlog来实现数据的同步。binlog有三个级别:row、statement、mixed
row: 记录的是数据的真实变更
statement:记录的是引起数据变更的sql语句
mixed:对于不涉及到随机值的数据变更记录的是sql,反之记录的是变更的数据
我们采用的row主要的原因是: 我们下一级是kafka如果记录的是sql,kafka并不能根据SQL重现是数据的变更
(3)如何保证数据的一致性
这里涉及到数据一致性的问题:因为对于历史数据,我们使用的是maxwell的bootstrap功能进行的数据的同步,如果在同步历史数据的
时候,数据发生了变更,binlog会记录到变更的数据,这样kafka中则会出现两份数据,因为maxwell采集过来的数据携带的类型是
bootstrap-insert而binlog得到的数据变更的类
型是insert、delete、update所以如果是同一条数据我们会保留监控binlog得到的数据。
flume采集日志数据的细节
(1)flume的组成
flume由三部分组成:source、channel、sink
source:我们采用的是taildir source,主要利用它的多目录和断点续传。
channel:
channel分成三种,memory channel 、file channel 、kafka channel
memory channel:基于内存 效率高 但是不安全,发生故障的时候会丢失100个event
file channel: 基于磁盘 安全 但是效率低 发生故障的时候不会丢失数据
kafka channel 基于磁盘 安全 效率高于(memory channel + kafka sink)
sink:
sink我们使用hdfs将数据按照分区存储到hdfs中
(2) flume的三个器
拦截器
拦截器的定义:(1)实现interceptor接口,重写其中的四个方法:初始化、单event处理,多event的处理、关闭。构建内部类实现Builder接口
然后打包上传到flume的lib目录下,在配置文件中指定全类名$Builder
选择器
replicating:将数据发往下游的所有的channel
load balancing:采用roundRobin的方式将数据发往下游的通道
multiplexing:将数据发往下游指定的channel
监控器
Ganglia主要是监控两个事务put、take
put:source --> channel,如果我们发现尝试提交的次数远远的大于提交成功的次数,这就说明发生了事务的回滚,最有可<
能的原因就是资源是不够了,因为flume默认的内存的大小是48M,我们可以调整内存的大小为4-6G
take: channel --> sink;出现大量提交失败的原因可能就是资源不够。
(3) 日志数据采集这一块解决的问题
1、校验数据是否完整
我们在flume中设置了ETL拦截器,通过fastJson来判断json格式是否完整,这样防止后续不完整的数据在链路中传播
2、hdfs小文件问题(离线,我再描述实时所以不会出现这个问题,但是这里也聊一下)
hdfs sink向hdfs写入数据的时候如果我们不加以控制,会产生很多的小文件,我们解决的办法是,在最后一层flume加上三个参数
时间为30min
条数 == 0表示的是不限制
大小128M
维度建模
ODS层:ODS层的数据存储在kafka中,主要分成两个topic,topic_db和topic_log
topic_db存储的是maxwell从业务数据库中采集过来的业务数据
topic_log存储的是flume从日志服务器中采集过来的日志数据
DIM层:DIM层的数据存储在Hbase中
DIM层的构建,为了后期能够动态扩展维度表,我们在mysql数据库中设计了一张配置表,通过flinkCDC来监控这张配置表的变化。
******flinkCDC监控配置表的代码********
1、我们会先对mysql的这张表开启binlog
2、在flink中写如下的代码:
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("xxxx")
.port(3306)
.databaseList("databaseName")
.tableList("tableName")
.username("root")
.password("xxxxxx")
.deserializer(new JsonDebeziumDeserializationSchema()) // 解码的时候将对应的数据转换成json格式,方便我们后续的操作
.startupOptions(StartupOptions.initial())
.build();
关于startupOptions这个参数的配置,下面做独立的说明:
这个配置一共是有以下几个选项:
initial:这种读取方式,是在刚启动的时候会对数据库做全量的快照,同时也会进行binlog的读取-->这个是项目中的配置
earliest:不全量的读取表中的数据,只从最早的binlog开始读。
latest:只从binlog的末尾进行读取。
specificOffset:首次启动的时候不对表做全量的快照,直接从指定的位置读取binlog
timestamp:首次启动的时候不对表做全量的快照,直接从指定的时间戳读取binlog
**************
配置表中含有如下的字段:
原始表名:mysql数据库中的表的名字
目标表名:Phoenix中表的名字
Phoenix中表的字段
主键
扩展字段
当flinkCDC读取到配置表的变化的时候,会将该数据转换成广播流,然后与主流进行连接。对于连接流我们采用process方法进行处理,对于广播
数据会根据配置表中的表名和字段在Phoenix中创建表,然后将数据广播出去。主流根据广播流中的指定的表名,将指定的数据插入到Phoenix对应的表中。
这里涉及到rowkey的设计原则:
(1)长度原则:rowKey的长度最大是64KB,但是生产上我们设计rowkey的时候尽量不要超过10B,因为rowkey太长是会严重的影响效率
(2)散列原则:我们要尽量的使rowkey散列到多个regionServer上,这样可以防止数据热点问题
(3)唯一原则:rowKey必须是唯一的。
我们在项目中是如何设计的:
我们没有进行设计,对于mysql的key --> Phoenix key --> Hbase的rowkey
我们开启了Phoenix的盐表功能,通过将key加盐打散实现散列。
new_row_key = ((byte) (hash(original_key) % BUCKETS_NUMBER) + original_key
***********主流和广播流连接的时候可能会出现的问题***********
如果主流滞后于广播流,那么就会产生主流数据丢失的问题。
解决的办法是,我们首次启动的时候,在open方法中,可以让主流等待广播流一段时间,等待广播流同步完成之后,在让主流开始进行处理
********************************************************
DWD层
首先明确数据域
用户域
(1)注册
流量域
(1)利用侧输出流将数据拆分成:曝光、动作、页面、启动、错误
(2)新老用户校验
(3)用户跳出(使用的是flinkCEP来进行处理)
(4)独立访客
交易域
(1)加购:从kafka的业务数据主题中,拿到加购的数据,然后使用Look Up join关联字典表,进行维度的退化。
这里即然讲到了维表关联,不妨说一下常见的维表join方案:
(1) 预加载: 缺点延迟大、占用空间大
在open方法中通过定时任务,加载数据到集合中,这样会占用存储空间,并且会带来一定的延迟
(2) 热存储加载:缺点,数据存储到外部系统会产生一定的延迟
其实就是现查外部表,在本项目中统计用户-spu粒度的下单,就用到了这种维表join方案,因为
维度数据是存储在HBase中 ,为了加速查询,我们使用了:异步IO + 旁路缓存
异步IO:是由flink提供的,我们使用的时候AsyncDataStream.unorderewait(主流,处理数据的类)
处理数据的类 extent RichAsyncFunction{
open(){
}
asyncInvoke(){
}
timeOut(){
}
}
旁路缓存主要是将数据缓存到Redis中。
(3) 流式join
将之前到来的所以的数据都缓存起来,这样主流的数据进行join的时候是非常的快的。
缺点:消耗大量的资源
(4) look up join
join一些字典表,其实也是现查外部系统,但是它可以缓存一部分的数据,然后设置TTL。
(2)下单:筛选出下单的数据
(3)支付:
筛选出支付的数据与订单明细数据进行join,使用的是regular join中的inner join,主要的原因是,主要支付成功就一定有与之对应的订单明细数据。
既然说到了regular,我们不妨一聊,regular join包含三种join:inner join、left join、 right join
left join 和 right join会产生回撤数据,因为我们是将join完的数据存储在kafka中,所以我们使用的连接器是upsert kafka connect
其次regular join缓存的数据是不会过期的,所以我们在这个业务中设置的过期时间是15min+5s,15min是业务上的时间(下单-->支付在业务上可以相差15min),5s是乱序时间。
(4)退单:筛选出退单的数据关联订单数据,使用的是inner join
(5)退款:筛选出退款的数据、筛选出退单的数据进行inner join
工具域
(1)优惠券领取:读取优惠券使用表中插入的数据
(2)优惠券下单:读取优惠券使用表中更新的数据,且状态码从领取转换成了使用
(3)优惠券支付:读取优惠券使用表中更新的数据,且使用时间不为null
互动域
(1)点赞
(2)收藏
(3)评论
DWS层
DWS层的数据存在ClickHouse中。
主要是对ADS层相同业务过程、统计粒度、统计周期的数据做轻量级的聚合,增加数据的利用率。减少ADS层的join.
DWS层主要计算了如下的指标:
流量域:
(1)搜索关键词粒度页面浏览统计
使用的到技术是自定义UDTF函数,实现TableFunction接口,重写其中的eval方法,然后使用IK分词器对搜索关键字进行分词。然后注册临时函数使用。
(2)版本-渠道-地区-访客类别粒度
读取独立访客、用户跳出、页面访问三张表的数据,然后将数据转换成相同的数据结构,对三条流进行union,分组、开窗、写入到clickhouse中
(3)页面浏览统计-->统计当日首页-商品详情页的独立访客
交易域:
(1)加购:每日独立加购人数
(2)下单:当日下单独立用户数、各省份下单、用户spu粒度下单
(3)支付:统计支付成功独立用户数
(4)退单:品牌-品类-用户粒度(什么粒度就按什么分组,比如这个需求分组就是按照(userID,category3ID,trademarkID)
用户域:
(1)登录:当日独立用户、七日回流用户
(2)注册:当日注册用户数
DWS层的数据都是开10s钟的滚动窗口,然后将数据存储到ClickHouse中。
数据一致性:
为了解决数据的一致性,我们使用的ClickHouse的表引擎是ReplacingMergeTree,但是ReplacingMergeTree只能保证数据的最终一致性。
我们解决数据一致性的方法是:
因为建表的时候order by中指定的字段是(窗口的开始时间,窗口的结束时间)。
在查询数据的时候,我们使用group by进行分组,使用order by进行排序,然后取窗口的最大值。
也可以通过加Final来进行处理。
## 项目难点
1、提交资源的优化
并行度的设置:
因为我们采用了分层(将复杂的问题拆分成多个简单的问题,所以每一步都没有复杂的计算,所以直接采用粗略的设置),所以并行度=kafka的分区个数。
内存设置:
JM的内存我们给的是2G,因为JM不做任务的计算
TM的内存我们给4G。
slot数设置:
为了减少资源的浪费,我们指定一个TM中slot的个数,等于分区并行度这样才不会造成slot的浪费。
cpu:
yarn默认的default资源计算器,为一个contain分配1CPU,当我们发现CPU的利率非常高的时候,说明计算资源是不够用的,我们可以将其更改为Dominant计算策略,来增大一个contain中cpu的数量。
2、反压
我们在进行各省订单统计的时候,因为各省份数据不均的导致了数据倾斜,进而导致了反压。
反压的原理,某个结点接收数据的速率大于,结点能够处理数据的能力,然后自动向上游传递压力。
之所以会向上游传递压力是因为采用的是credit机制,上游告诉上游,他还能接收多少数据,上游就像下游发送多少数据。
1.5之后是Credit机制
1.5之前是TCP机制
出现的现象:
kafka数据积压,因为这种压力一直是会向上进行传播,所以到了source端,迟迟不拉去数据,导致kafka中数据积压,kafka中数据的积压是可以通过eagle来进行监控的,有一个积压的指标log
checkp越来越慢==》超时--》失败==》job挂掉,主要的原因是数据的阻塞导致barrier无法向下传递
可能会产生OOM
webUI可以观察到链路图出现灰色、黑色
定位
上游都是反压的,第一个不反压的节点就是出现反压的瓶颈节点
如果需要精确到是哪一个算子出现了反压,可以通过断开operator chain。
一般出现反压的原因有:
数据倾斜
资源设置不合理
与外部系统交互
barrier精准一次性比较的慢
3、大状态
dwd层访客明细:状态存储一天
dwd层新老用户校验,因为要缓存大量的用户信息,且这些信息用不过期,会导致checkpoint越来越慢,最后导致超时失败
dws层用户登录,要缓存用户最后一次登录的日期,做七日回流的计算
一开始我们用的是hashMap状态后端,发现checkpoint越来越慢,之后更改成了rockdb状态后端,因为rockdb状态后端,还可以增量检查点,rockdb我们使用默认的配置内存+磁盘
4、数据倾斜
统计各个省份订单的时候出现了数据倾斜,dws层的关键词统计出现数据倾斜
数据倾斜的现象,我们可以通过webUI查看到keyby之后的某个结点的某个subtask接收的数据量是其它结点subtask接收数据量的20倍之多
可能的原因:
数据源中的数据分布本身就是不均匀的。
解决数据倾斜的方法:
(1) 数据源数据倾斜
在source端对数据进行重分区,rescale、reparation、shuffle、global、rebalance....
(2) 单表分组聚合出现数据倾斜
攒批
localglobal(类似于mr的提前预聚合)
实现攒批和localglobal
DataStream API需要自己实现,其实也是很简单,通过一个hashMap攒一批的数据,然后计算完成之后发送到下游,清空当前缓存中的数据,
Flink SQL:开启minbatch和localglobal
(3)keyby之后的窗口聚合操作出现数据倾斜
采用两阶段聚合:
第一阶段在keyby之前进行攒批聚合。
如果还是数据倾斜,可以开窗实现攒批,然后进行两阶段处理:(1)拼上随机值进行聚合 (2)去掉随机值 + 窗口结束时间进行聚合
5、SQL优化
几个参数:
TTL
minibatch
localglobal
split-distinct: LocalGlobal 优化针对普通聚合(例如 SUM、COUNT、MAX、MIN 和 AVG)有较好的效果,对于 DISTINCT 的聚合(如 COUNT DISTINCT)收效不明显,因为 COUNT DISTINCT 在 Local 聚合时,对于 DISTINCT KEY 的去重率不高,导致在 Global 节点仍然存在热点
核心的原理是:两层聚合,HASH_CODE(distinct_key) % BUCKET_NUM 打散聚合之后,再进行汇总聚合。
多维精确去重 filter语法
6、为每个算子指定UID,这样当业务有所变动的时候,还可以恢复之前的状态。