# Real-time Data Warehouse **Repository Path**: chen_shuai_jun/real-time-data-warehouse ## Basic Information - **Project Name**: Real-time Data Warehouse - **Description**: 本项目是使用flink作为实时计算引擎,使用维度建模理论,构建的电商实时数仓。主要进行用户域、流量域、交易域、互动域和工具域五个领域的数据分析,并最后生成实时的报表。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2023-08-04 - **Last Updated**: 2023-08-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Real-time Data Warehouse ## 项目描述 数据仓库是一个为数据分析而设计的企业级数据管理系统。数据仓库可集中、整合多个信息源的大量数据,借助数据仓库的分析能力,企业可从数据中获得宝贵的信息进而改进决策. 该项目主要包括有数据的采集,数仓的搭建、工作流调度和可视化展示。数据的采集主要有两部分组成,分别是日志数据的采集和业务数据的采集。数仓主要是有5层,分别是ods、 dwd、dim、dws、ads。工作流的调度采用的DolphinScheduler进行每日的调度。可视化采用的是superset进行展示。 ## 软件架构 ![输入图片说明](https://foruda.gitee.com/images/1691154447082661306/82e36431_8932359.png "屏幕截图") 使用到的技术:maxwell1.29、flume1.9、kafka2.12、flink1.13、redis6.0.8、Hbase2.0.5、clickhouse21.7、zookeeper3.5.7、mysql5.7.16、phoenix5.0 ## 项目具体介绍 ### 项目的价值 对于项目本身而言,项目计算的指标可以指导公司下一步的重点方向,比如计算用户的跳出率,可以分析出我们在app首页上推荐的内容是否能够吸引到用户,如果用户跳出率比较的 高,就说明我们在首页推荐的内容并不符合用户的口味,再比如计算支付成功的指标,可以分析出用户从下单到支付的比例,如果下单率比较的高支付的订单却比较的少,就可以分析 出,我们关于支付页面的设计是不是不太友好,才使得用户下单之后没有进行支付。 ### 项目的流程 项目整体分成四个部分:数据的采集 --> 维度建模 --> 指标体系建设 --> 报表可视化展示 #### 数据采集: 数据采集模块主要采集两部分的数据:(1)用户行为数据(前端埋点产生)(2)业务数据(来自于业务数据库) 数据采集通道是将数据从 日志服务器 \ -----采集-----> kafka 业务服务器 / 数据的采集使用到了下面的技术:flume、maxwell flume主要用于从日志服务器采集日志数据到kafak maxwell主要用于从业务服务器采集业务数据到kafka #### 维度建模: 这一部分主要按照维度建模理论,构建ODS层、DWD层、DIM层 (1)ODS层主要的目的是做数据的备份,保持数据的原貌不做修改、采用压缩减少空间的占用、采用分区防止全表扫描 (2)DWD层主要做两件事:数据清洗、脱敏和构建事实表 数据清洗: 主要清洗掉那些不符合规范的数据,比如json格式有误, 数据脱敏: 主要是对手机、证件号码进行md5加密 事实表: 按照维度建模理论,事实表分成事务型事实表、周期性快照事实表、累计型快照事实表。 对于实时数仓而言我们通常是追求数据的时效性,所以实时数仓中我们只用到了事务型事实表。 事务型事实表的构建流程是: 选择业务过程:选择我们需要感兴趣的业务过程,支付、下单 声明粒度:一行数据所表达的信息,比如一次下单、一天下单、一个月的下单,通常我们是保持最细的粒度,这样可以保证我们后续能够做更多 的统计需求 确认维度:确认该事实表所需要关联的维度数据 确认事实:其实就是度量值,比如次数、件数、金额... 最后将数据存储在kafka中 (3)DIM层主要的目的是构建维度表,对于实时数仓而言我们没有进行维度整合,因为一旦参与维度整合的某个表的字段发生的更改,我们需要重新查询并且更改维度 表中的数据。这样在后续的处理中较为的麻烦:比如离线数仓中的商品维度表,它是由:商品表、spu表、一级分类、二级分类、三级分类、商品平台属性表、商品 交易属性表、trademark表通过join得到,这就意味着这八张表中的任何一张表发生变化,这个商品维度表都需要重新join,所以这对于实时场景是不适用的。我们 将维度表存储在HBase中,为了方面后续维表的扩展,我们在mysql中存储了一张配置表,通过flinkCDC监控这张配置表的变化,从而实现维度表的动态扩展。 ##### 指标体系建设: 这里我们的ADS层主要分成了六个域主要进行下面指标的统计: 流量域: 各渠道流量统计:主要统计的是各个渠道、独立访客数、会话数、平均浏览页面数、平均会话时长、跳出率 流量分时统计:统计每小时的独立访客数和页面浏览数 新老访客流量统计:统计当日的新老访客数 关键词统计:统计搜索关键词 用户域: 用户变更统计:七日回流 新增用户统计:当日新增用户数 用户行为分析:漏斗分析---> 浏览首页-->详情页-->加购-->下单-->支付 人数的变化 商品域: 各品牌交易统计 各品类交易统计 交易域: 交易综合统计:下单人数、退单人数 各省份交易统计 优惠券域: 当日优惠券比贴率 活动域: 当日活动补贴率 为了减少join的次数,提高数据的复用率,我们这里将:相同业务过程、统计粒度、统计周期的需求抽离出来构建DWS层 比如为了服务ADS层的流量域,我们会在DWS层构建:流量域版本-渠道-地区-用户类别粒度页面浏览汇总表 会将DWD层的: 页面浏览事务型事实表:计算页面浏览时长、页面浏览会话数、页面浏览数 用户跳出事务型事实表:用户跳出 独立访客事务型事实表:独立访客 将这三张表进行join,然后将数据存储在ClickHouse中 可视化展示 通过Springboot开发数据接口,从clickhouse中查询数据,然后将数据封装成sugar所需要的格式,对接sugar进行周期性的访问。 ### 项目细节的叙述: 数据采集部分的细节: maxwell采集业务数据的细节: (1)为什么使用maxwell。 maxwell可以实现历史数据的同步,同时可以实现断点续传。maxwell的数据格式为json类型,数据比较的轻量,并且方便解析。 (2)maxwell的原理 maxwell利用mysql的主从复制,通过监控binlog来实现数据的同步。binlog有三个级别:row、statement、mixed row: 记录的是数据的真实变更 statement:记录的是引起数据变更的sql语句 mixed:对于不涉及到随机值的数据变更记录的是sql,反之记录的是变更的数据 我们采用的row主要的原因是: 我们下一级是kafka如果记录的是sql,kafka并不能根据SQL重现是数据的变更 (3)如何保证数据的一致性 这里涉及到数据一致性的问题:因为对于历史数据,我们使用的是maxwell的bootstrap功能进行的数据的同步,如果在同步历史数据的 时候,数据发生了变更,binlog会记录到变更的数据,这样kafka中则会出现两份数据,因为maxwell采集过来的数据携带的类型是 bootstrap-insert而binlog得到的数据变更的类
型是insert、delete、update所以如果是同一条数据我们会保留监控binlog得到的数据。 flume采集日志数据的细节 (1)flume的组成 flume由三部分组成:source、channel、sink source:我们采用的是taildir source,主要利用它的多目录和断点续传。 channel: channel分成三种,memory channel 、file channel 、kafka channel memory channel:基于内存 效率高 但是不安全,发生故障的时候会丢失100个event file channel: 基于磁盘 安全 但是效率低 发生故障的时候不会丢失数据 kafka channel 基于磁盘 安全 效率高于(memory channel + kafka sink) sink: sink我们使用hdfs将数据按照分区存储到hdfs中 (2) flume的三个器 拦截器 拦截器的定义:(1)实现interceptor接口,重写其中的四个方法:初始化、单event处理,多event的处理、关闭。构建内部类实现Builder接口 然后打包上传到flume的lib目录下,在配置文件中指定全类名$Builder 选择器 replicating:将数据发往下游的所有的channel load balancing:采用roundRobin的方式将数据发往下游的通道 multiplexing:将数据发往下游指定的channel 监控器 Ganglia主要是监控两个事务put、take put:source --> channel,如果我们发现尝试提交的次数远远的大于提交成功的次数,这就说明发生了事务的回滚,最有可< 能的原因就是资源是不够了,因为flume默认的内存的大小是48M,我们可以调整内存的大小为4-6G take: channel --> sink;出现大量提交失败的原因可能就是资源不够。 (3) 日志数据采集这一块解决的问题 1、校验数据是否完整 我们在flume中设置了ETL拦截器,通过fastJson来判断json格式是否完整,这样防止后续不完整的数据在链路中传播 2、hdfs小文件问题(离线,我再描述实时所以不会出现这个问题,但是这里也聊一下) hdfs sink向hdfs写入数据的时候如果我们不加以控制,会产生很多的小文件,我们解决的办法是,在最后一层flume加上三个参数 时间为30min 条数 == 0表示的是不限制 大小128M 维度建模 ODS层:ODS层的数据存储在kafka中,主要分成两个topic,topic_db和topic_log topic_db存储的是maxwell从业务数据库中采集过来的业务数据 topic_log存储的是flume从日志服务器中采集过来的日志数据 DIM层:DIM层的数据存储在Hbase中 DIM层的构建,为了后期能够动态扩展维度表,我们在mysql数据库中设计了一张配置表,通过flinkCDC来监控这张配置表的变化。 ******flinkCDC监控配置表的代码******** 1、我们会先对mysql的这张表开启binlog 2、在flink中写如下的代码: MySqlSource mySqlSource = MySqlSource.builder() .hostname("xxxx") .port(3306) .databaseList("databaseName") .tableList("tableName") .username("root") .password("xxxxxx") .deserializer(new JsonDebeziumDeserializationSchema()) // 解码的时候将对应的数据转换成json格式,方便我们后续的操作 .startupOptions(StartupOptions.initial()) .build(); 关于startupOptions这个参数的配置,下面做独立的说明: 这个配置一共是有以下几个选项: initial:这种读取方式,是在刚启动的时候会对数据库做全量的快照,同时也会进行binlog的读取-->这个是项目中的配置 earliest:不全量的读取表中的数据,只从最早的binlog开始读。 latest:只从binlog的末尾进行读取。 specificOffset:首次启动的时候不对表做全量的快照,直接从指定的位置读取binlog timestamp:首次启动的时候不对表做全量的快照,直接从指定的时间戳读取binlog ************** 配置表中含有如下的字段: 原始表名:mysql数据库中的表的名字 目标表名:Phoenix中表的名字 Phoenix中表的字段 主键 扩展字段 当flinkCDC读取到配置表的变化的时候,会将该数据转换成广播流,然后与主流进行连接。对于连接流我们采用process方法进行处理,对于广播 数据会根据配置表中的表名和字段在Phoenix中创建表,然后将数据广播出去。主流根据广播流中的指定的表名,将指定的数据插入到Phoenix对应的表中。 这里涉及到rowkey的设计原则: (1)长度原则:rowKey的长度最大是64KB,但是生产上我们设计rowkey的时候尽量不要超过10B,因为rowkey太长是会严重的影响效率 (2)散列原则:我们要尽量的使rowkey散列到多个regionServer上,这样可以防止数据热点问题 (3)唯一原则:rowKey必须是唯一的。 我们在项目中是如何设计的: 我们没有进行设计,对于mysql的key --> Phoenix key --> Hbase的rowkey 我们开启了Phoenix的盐表功能,通过将key加盐打散实现散列。 new_row_key = ((byte) (hash(original_key) % BUCKETS_NUMBER) + original_key ***********主流和广播流连接的时候可能会出现的问题*********** 如果主流滞后于广播流,那么就会产生主流数据丢失的问题。 解决的办法是,我们首次启动的时候,在open方法中,可以让主流等待广播流一段时间,等待广播流同步完成之后,在让主流开始进行处理 ******************************************************** DWD层 首先明确数据域 用户域 (1)注册 流量域 (1)利用侧输出流将数据拆分成:曝光、动作、页面、启动、错误 (2)新老用户校验 (3)用户跳出(使用的是flinkCEP来进行处理) (4)独立访客 交易域 (1)加购:从kafka的业务数据主题中,拿到加购的数据,然后使用Look Up join关联字典表,进行维度的退化。 这里即然讲到了维表关联,不妨说一下常见的维表join方案: (1) 预加载: 缺点延迟大、占用空间大 在open方法中通过定时任务,加载数据到集合中,这样会占用存储空间,并且会带来一定的延迟 (2) 热存储加载:缺点,数据存储到外部系统会产生一定的延迟 其实就是现查外部表,在本项目中统计用户-spu粒度的下单,就用到了这种维表join方案,因为 维度数据是存储在HBase中 ,为了加速查询,我们使用了:异步IO + 旁路缓存 异步IO:是由flink提供的,我们使用的时候AsyncDataStream.unorderewait(主流,处理数据的类) 处理数据的类 extent RichAsyncFunction{ open(){ } asyncInvoke(){ } timeOut(){ } } 旁路缓存主要是将数据缓存到Redis中。 (3) 流式join 将之前到来的所以的数据都缓存起来,这样主流的数据进行join的时候是非常的快的。 缺点:消耗大量的资源 (4) look up join join一些字典表,其实也是现查外部系统,但是它可以缓存一部分的数据,然后设置TTL。 (2)下单:筛选出下单的数据 (3)支付: 筛选出支付的数据与订单明细数据进行join,使用的是regular join中的inner join,主要的原因是,主要支付成功就一定有与之对应的订单明细数据。 既然说到了regular,我们不妨一聊,regular join包含三种join:inner join、left join、 right join left join 和 right join会产生回撤数据,因为我们是将join完的数据存储在kafka中,所以我们使用的连接器是upsert kafka connect 其次regular join缓存的数据是不会过期的,所以我们在这个业务中设置的过期时间是15min+5s,15min是业务上的时间(下单-->支付在业务上可以相差15min),5s是乱序时间。 (4)退单:筛选出退单的数据关联订单数据,使用的是inner join (5)退款:筛选出退款的数据、筛选出退单的数据进行inner join 工具域 (1)优惠券领取:读取优惠券使用表中插入的数据 (2)优惠券下单:读取优惠券使用表中更新的数据,且状态码从领取转换成了使用 (3)优惠券支付:读取优惠券使用表中更新的数据,且使用时间不为null 互动域 (1)点赞 (2)收藏 (3)评论 DWS层 DWS层的数据存在ClickHouse中。 主要是对ADS层相同业务过程、统计粒度、统计周期的数据做轻量级的聚合,增加数据的利用率。减少ADS层的join. DWS层主要计算了如下的指标: 流量域: (1)搜索关键词粒度页面浏览统计 使用的到技术是自定义UDTF函数,实现TableFunction接口,重写其中的eval方法,然后使用IK分词器对搜索关键字进行分词。然后注册临时函数使用。 (2)版本-渠道-地区-访客类别粒度 读取独立访客、用户跳出、页面访问三张表的数据,然后将数据转换成相同的数据结构,对三条流进行union,分组、开窗、写入到clickhouse中 (3)页面浏览统计-->统计当日首页-商品详情页的独立访客 交易域: (1)加购:每日独立加购人数 (2)下单:当日下单独立用户数、各省份下单、用户spu粒度下单 (3)支付:统计支付成功独立用户数 (4)退单:品牌-品类-用户粒度(什么粒度就按什么分组,比如这个需求分组就是按照(userID,category3ID,trademarkID) 用户域: (1)登录:当日独立用户、七日回流用户 (2)注册:当日注册用户数 DWS层的数据都是开10s钟的滚动窗口,然后将数据存储到ClickHouse中。 数据一致性: 为了解决数据的一致性,我们使用的ClickHouse的表引擎是ReplacingMergeTree,但是ReplacingMergeTree只能保证数据的最终一致性。 我们解决数据一致性的方法是: 因为建表的时候order by中指定的字段是(窗口的开始时间,窗口的结束时间)。 在查询数据的时候,我们使用group by进行分组,使用order by进行排序,然后取窗口的最大值。 也可以通过加Final来进行处理。 ## 项目难点 1、提交资源的优化 并行度的设置: 因为我们采用了分层(将复杂的问题拆分成多个简单的问题,所以每一步都没有复杂的计算,所以直接采用粗略的设置),所以并行度=kafka的分区个数。 内存设置: JM的内存我们给的是2G,因为JM不做任务的计算 TM的内存我们给4G。 slot数设置: 为了减少资源的浪费,我们指定一个TM中slot的个数,等于分区并行度这样才不会造成slot的浪费。 cpu: yarn默认的default资源计算器,为一个contain分配1CPU,当我们发现CPU的利率非常高的时候,说明计算资源是不够用的,我们可以将其更改为Dominant计算策略,来增大一个contain中cpu的数量。 2、反压 我们在进行各省订单统计的时候,因为各省份数据不均的导致了数据倾斜,进而导致了反压。 反压的原理,某个结点接收数据的速率大于,结点能够处理数据的能力,然后自动向上游传递压力。 之所以会向上游传递压力是因为采用的是credit机制,上游告诉上游,他还能接收多少数据,上游就像下游发送多少数据。 1.5之后是Credit机制 1.5之前是TCP机制 出现的现象: kafka数据积压,因为这种压力一直是会向上进行传播,所以到了source端,迟迟不拉去数据,导致kafka中数据积压,kafka中数据的积压是可以通过eagle来进行监控的,有一个积压的指标log checkp越来越慢==》超时--》失败==》job挂掉,主要的原因是数据的阻塞导致barrier无法向下传递 可能会产生OOM webUI可以观察到链路图出现灰色、黑色 定位 上游都是反压的,第一个不反压的节点就是出现反压的瓶颈节点 如果需要精确到是哪一个算子出现了反压,可以通过断开operator chain。 一般出现反压的原因有: 数据倾斜 资源设置不合理 与外部系统交互 barrier精准一次性比较的慢 3、大状态 dwd层访客明细:状态存储一天 dwd层新老用户校验,因为要缓存大量的用户信息,且这些信息用不过期,会导致checkpoint越来越慢,最后导致超时失败 dws层用户登录,要缓存用户最后一次登录的日期,做七日回流的计算 一开始我们用的是hashMap状态后端,发现checkpoint越来越慢,之后更改成了rockdb状态后端,因为rockdb状态后端,还可以增量检查点,rockdb我们使用默认的配置内存+磁盘 4、数据倾斜 统计各个省份订单的时候出现了数据倾斜,dws层的关键词统计出现数据倾斜 数据倾斜的现象,我们可以通过webUI查看到keyby之后的某个结点的某个subtask接收的数据量是其它结点subtask接收数据量的20倍之多 可能的原因: 数据源中的数据分布本身就是不均匀的。 解决数据倾斜的方法: (1) 数据源数据倾斜 在source端对数据进行重分区,rescale、reparation、shuffle、global、rebalance.... (2) 单表分组聚合出现数据倾斜 攒批 localglobal(类似于mr的提前预聚合) 实现攒批和localglobal DataStream API需要自己实现,其实也是很简单,通过一个hashMap攒一批的数据,然后计算完成之后发送到下游,清空当前缓存中的数据, Flink SQL:开启minbatch和localglobal (3)keyby之后的窗口聚合操作出现数据倾斜 采用两阶段聚合: 第一阶段在keyby之前进行攒批聚合。 如果还是数据倾斜,可以开窗实现攒批,然后进行两阶段处理:(1)拼上随机值进行聚合 (2)去掉随机值 + 窗口结束时间进行聚合 5、SQL优化 几个参数: TTL minibatch localglobal split-distinct: LocalGlobal 优化针对普通聚合(例如 SUM、COUNT、MAX、MIN 和 AVG)有较好的效果,对于 DISTINCT 的聚合(如 COUNT DISTINCT)收效不明显,因为 COUNT DISTINCT 在 Local 聚合时,对于 DISTINCT KEY 的去重率不高,导致在 Global 节点仍然存在热点 核心的原理是:两层聚合,HASH_CODE(distinct_key) % BUCKET_NUM 打散聚合之后,再进行汇总聚合。 多维精确去重 filter语法 6、为每个算子指定UID,这样当业务有所变动的时候,还可以恢复之前的状态。