# flink-etl **Repository Path**: cch-bigdata/flink-etl ## Basic Information - **Project Name**: flink-etl - **Description**: 电商行业实时数仓项目作业 - **Primary Language**: Scala - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2021-11-29 - **Last Updated**: 2022-09-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 实时数仓作业 ## 业务数据表 ### lagou_area ```sql CREATE TABLE `flink_demo`.`lagou_area` ( `id` int(11) NOT NULL COMMENT '区域主键', `name` varchar(40) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '区域名称', `pid` int(11) NULL DEFAULT NULL COMMENT '区域上级标识', `sname` varchar(40) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '地名简称', `level` int(11) NULL DEFAULT NULL COMMENT '区域等级', `citycode` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '区域编码', `yzcode` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '邮政编码', `mername` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '组合名称', `Lng` float NULL DEFAULT NULL, `Lat` float NULL DEFAULT NULL, `pinyin` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; ``` ### lagou_trade_orders ```sql CREATE TABLE `flink_demo`.`lagou_trade_orders` ( `orderId` bigint(11) NOT NULL DEFAULT 0 COMMENT '订单id', `orderNo` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '订单编号', `userId` bigint(11) NOT NULL COMMENT '用户id', `status` tinyint(4) NOT NULL DEFAULT -2 COMMENT '订单状态 -3:用户拒收 -2:未付款的订单 -1:用户取消 0:待发货 1:配送中 2:用户确认收货', `productMoney` decimal(11, 2) NOT NULL COMMENT '商品金额', `totalMoney` decimal(11, 2) NOT NULL COMMENT '订单金额(包括运费)', `payMethod` tinyint(4) NOT NULL DEFAULT 0 COMMENT '支付方式,0:未知;1:支付宝,2:微信;3、现金;4、其他', `isPay` tinyint(4) NOT NULL DEFAULT 0 COMMENT '是否支付 0:未支付 1:已支付', `areaId` int(11) NOT NULL COMMENT '区域最低一级', `tradeSrc` tinyint(4) NOT NULL DEFAULT 0 COMMENT '订单来源 0:商城 1:微信 2:手机版 3:安卓App 4:苹果App', `tradeType` int(11) NULL DEFAULT 0 COMMENT '订单类型', `isRefund` tinyint(4) NOT NULL DEFAULT 0 COMMENT '是否退款 0:否 1:是', `dataFlag` tinyint(4) NOT NULL DEFAULT 1 COMMENT '订单有效标志 -1:删除 1:有效', `createTime` varchar(25) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '下单时间', `payTime` varchar(25) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '支付时间', `modifiedTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '订单更新时间', PRIMARY KEY (`orderId`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; ``` ## 实时数仓项目的系统架构 ![经典离线数仓架构](asset/offline_framework.png) ![实时数仓架构](asset/lamda_framework.png) ![实时数仓架构](asset/kappa_framework.png) ![实时数仓架构](asset/olap_framework.png) ![实时数仓架构](asset/compare.png) ## canal应用 官方文档: https://github.com/alibaba/canal https://github.com/alibaba/canal/wiki/Docker-QuickStart ### canal安装 #### canal-admin安装 下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz 1. 解压至目录 ```sh drwxr-xr-x 6 agapple staff 204B 8 31 15:37 bin drwxr-xr-x 8 agapple staff 272B 8 31 15:37 conf drwxr-xr-x 90 agapple staff 3.0K 8 31 15:37 lib drwxr-xr-x 2 agapple staff 68B 8 31 15:26 logs ``` 2. 修改配置文件 ```shell vi conf/application.yml ``` ```yaml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin ``` 3. 创建canal_manager数据库,初始化表 ```shell mysql -h127.1 -uroot -p # 导入初始化SQL > source conf/canal_manager.sql ``` 4. 启动canal_manager ```shell sh bin/startup.sh ``` 5. 查看UI http://localhost:8089/ ![ui](asset/canal-manager-ui-1.png) 用户名:admin 密码:123456 ![ui](asset/canal-manager-ui-2.png) #### canal-server安装 下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz 1. 解压至目录 ![canal-server](asset/canal-server-1.png) 2. 修改配置文件 使用canal_local.properties的配置覆盖canal.properties ```properties # register ip canal.register.ip = # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = ``` 启动admin-server即可。 或在启动命令中使用参数:sh bin/startup.sh local 指定配置 3. 启动canal-server ``` sh bin/startup.sh 或 sh bin/startup.sh local conf/canal_local.properties ``` 4. ![ui](asset/canal-manager-ui-3.png) #### ### canal配置采集MySQL 1. 选择server管理菜单,在右侧server列表中选择需要配置的canal-server,选择操作中的配置选项 ​ ![ui](asset/canal_server_manage.png) 2. 在上方的按钮处选择载入模板,在配置内容最下方添加以下配置 ```properties ... # 可选项: tcp(默认), kafka, RocketMQ canal.serverMode = kafka # ... # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 127.0.0.1:9092 canal.mq.retries = 0 # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下请将该值改大, 建议50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all # kafka消息投递是否使用事务 canal.mq.transaction = false ``` ![ui](asset/canal_server_template.png) 3. 配置instance ​ 在左侧菜单出选择instance管理,在右侧列表中选择需要配置的instance,选择操作菜单中的配置选项 ​ ![ui](asset/instance_manage.png) 在页面中选择对应的server,然后在下方配置以下内容 ```properties # 按需修改成自己的数据库信息 ################################################# ... canal.instance.master.address=nb-aliyun:3306 # username/password,数据库的用户名和密码 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... # mq config canal.mq.topic=topic_lagou_area # 针对库名或者表名发送动态topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #库名.表名: 唯一主键,多个表之间用逗号分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id ################################################# ``` ![ui](asset/instance_config.png) ​ 4. 在server管理和instance管理中分别查看启动日志 5. 发现启动报错,无法连接kafka,我们启动我们的zookeeper和kafka服务 server启动日志(部分) ``` 2021-11-25 23:49:48.182 [New I/O server worker #3-12] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server. 2021-11-25 23:49:48.182 [New I/O server worker #3-12] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.1.12(192.168.1.12):11111] 2021-11-25 23:49:48.184 [New I/O server worker #3-12] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ...... 2021-11-25 23:49:48.659 [canal-instance-scan-0] INFO com.alibaba.otter.canal.deployer.CanalController - auto notify start my_instance_001 successful. ``` instance启动日志(部分) ``` 2021-11-25 23:49:48.657 [canal-instance-scan-0] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-my_instance_001 2021-11-25 23:49:48.658 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$ 2021-11-25 23:49:48.658 [canal-instance-scan-0] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 2021-11-25 23:49:48.658 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful.... 2021-11-25 23:49:48.790 [destination = my_instance_001 , address = nb-aliyun/101.200.179.150:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2021-11-25 23:49:48.790 [destination = my_instance_001 , address = nb-aliyun/101.200.179.150:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"nb-aliyun","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000004","position":10316352,"serverId":223344,"timestamp":1637854419000}} 2021-11-25 23:49:48.811 [destination = my_instance_001 , address = nb-aliyun/101.200.179.150:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=10316352,serverId=223344,gtid=,timestamp=1637854419000] cost : 21ms , the next step is binlog dump ``` 6. 查询kafka的topic ```shell kafka-topics.sh --zookeeper localhost:2181 --list ``` ```shell caochuanhong@cch-macbook-pro ~ % kafka-topics.sh --zookeeper localhost:2181 --list __consumer_offsets __transaction_state example example_001 caochuanhong@cch-macbook-pro ~ % ``` 发现canal配置中的topic被创建了出来 7. 启动kafka消费程序 ```shell kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic example_001 ``` 8. 发现消费程序打印数据,事件信息如下 - DDL ```json { "data": null, "database": "flink_demo", "es": 1637856009000, "id": 2, "isDdl": true, "mysqlType": null, "old": null, "pkNames": null, "sql": "DROP TABLE IF EXISTS `lagou_area` /* generated by server */", "sqlType": null, "table": "lagou_area", "ts": 1637856006846, "type": "ERASE" } ``` ```sql { "data": null, "database": "flink_demo", "es": 1637856009000, "id": 2, "isDdl": true, "mysqlType": null, "old": null, "pkNames": null, "sql": "CREATE TABLE `lagou_area` (\n `id` int(11) NOT NULL COMMENT '区域主键',\n `name` varchar(40) DEFAULT NULL COMMENT '区域名称',\n `pid` int(11) DEFAULT NULL COMMENT '区域上级标识',\n `sname` varchar(40) DEFAULT NULL COMMENT '地名简称',\n `level` int(11) DEFAULT NULL COMMENT '区域等级',\n `citycode` varchar(20) DEFAULT NULL COMMENT '区域编码',\n `yzcode` varchar(20) DEFAULT NULL COMMENT '邮政编码',\n `mername` varchar(100) DEFAULT NULL COMMENT '组合名称',\n `Lng` float DEFAULT NULL,\n `Lat` float DEFAULT NULL,\n `pinyin` varchar(100) DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8", "sqlType": null, "table": "lagou_area", "ts": 1637856006846, "type": "CREATE" } ``` - INSERT ```json { "data": [{ "id": "100000", "name": "中国", "pid": "0", "sname": "中国", "level": "0", "citycode": "0", "yzcode": "0", "mername": "中国", "Lng": "116.368", "Lat": "39.9151", "pinyin": "China" }], "database": "flink_demo", "es": 1637856187000, "id": 3, "isDdl": false, "mysqlType": { "id": "int(11)", "name": "varchar(40)", "pid": "int(11)", "sname": "varchar(40)", "level": "int(11)", "citycode": "varchar(20)", "yzcode": "varchar(20)", "mername": "varchar(100)", "Lng": "float", "Lat": "float", "pinyin": "varchar(100)" }, "old": null, "pkNames": ["id"], "sql": "", "sqlType": { "id": 4, "name": 12, "pid": 4, "sname": 12, "level": 4, "citycode": 12, "yzcode": 12, "mername": 12, "Lng": 7, "Lat": 7, "pinyin": 12 }, "table": "lagou_area", "ts": 1637856185722, "type": "INSERT" } ``` - UPDATE ```json { "data": [{ "id": "100000", "name": "大中国", "pid": "0", "sname": "中国", "level": "0", "citycode": "0", "yzcode": "0", "mername": "中国", "Lng": "116.368", "Lat": "39.9151", "pinyin": "big China" }], "database": "flink_demo", "es": 1637856246000, "id": 4, "isDdl": false, "mysqlType": { "id": "int(11)", "name": "varchar(40)", "pid": "int(11)", "sname": "varchar(40)", "level": "int(11)", "citycode": "varchar(20)", "yzcode": "varchar(20)", "mername": "varchar(100)", "Lng": "float", "Lat": "float", "pinyin": "varchar(100)" }, "old": [{ "name": "中国", "pinyin":"China" }], "pkNames": ["id"], "sql": "", "sqlType": { "id": 4, "name": 12, "pid": 4, "sname": 12, "level": 4, "citycode": 12, "yzcode": 12, "mername": 12, "Lng": 7, "Lat": 7, "pinyin": 12 }, "table": "lagou_area", "ts": 1637856244673, "type": "UPDATE" } ``` - DELETE ```json { "data": [{ "id": "100000", "name": "大中国", "pid": "0", "sname": "中国", "level": "0", "citycode": "0", "yzcode": "0", "mername": "中国", "Lng": "116.368", "Lat": "39.9151", "pinyin": "China" }], "database": "flink_demo", "es": 1637856280000, "id": 5, "isDdl": false, "mysqlType": { "id": "int(11)", "name": "varchar(40)", "pid": "int(11)", "sname": "varchar(40)", "level": "int(11)", "citycode": "varchar(20)", "yzcode": "varchar(20)", "mername": "varchar(100)", "Lng": "float", "Lat": "float", "pinyin": "varchar(100)" }, "old": null, "pkNames": ["id"], "sql": "", "sqlType": { "id": 4, "name": 12, "pid": 4, "sname": 12, "level": 4, "citycode": 12, "yzcode": 12, "mername": 12, "Lng": 7, "Lat": 7, "pinyin": 12 }, "table": "lagou_area", "ts": 1637856278715, "type": "DELETE" } ``` 9. 经过验证,canal数据采集部分完成 ## Flink实现mysql业务数据同步至Hbase 维表:lagou_area Hbase:lagou_area 1. docker安装hbase ![ui](asset/hbase-main.png) 2. 创建对应的维表:lagou_area ```shell create 'lagou_area', 'area' ``` 新建一个名为lagou_area,列族为area的表 ```shell hbase(main):001:0> create 'lagou_area', 'area' 0 row(s) in 1.4030 seconds => Hbase::Table - lagou_area hbase(main):002:0> list TABLE lagou_area 1 row(s) in 0.0200 seconds => ["lagou_area"] hbase(main):003:0> ``` 3. 编写程序消费kafka数据,并做etl处理 4. 主程序 ```scala package com.cch.bigdata.flink.ods import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import com.cch.bigdata.flink.model.{AreaEvent} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.scala._ import java.util import java.util.Properties //同步lagou_area表示例代码 object SyncDataDemo { val databaseName: String = "flink_demo" val tableName: String = "lagou_area" val opTypeList = new util.ArrayList[String]() def main(args: Array[String]): Unit = { opTypeList.add("INSERT") opTypeList.add("UPDATE") opTypeList.add("DELETE") //获取执行环境 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //模拟kafka数据,不必每次都去库里操作生成数据 val kafkaDataStream: DataStream[String] = environment.readTextFile("data/insert.txt") /** * //获取kafka消费者程序 * val flinkKafkaConsumer: FlinkKafkaConsumer[String] = getKafkaSource() * flinkKafkaConsumer.setStartFromLatest() * * val kafkaDataStream: DataStream[String] = environment.addSource(flinkKafkaConsumer) * */ //对源源不断的数据做解析处理 val mappedDataStream: DataStream[util.ArrayList[AreaEvent]] = kafkaDataStream.map(x => { //将事件数据转换为json对象 val eventData: JSONObject = JSON.parseObject(x) //获取本次事件的操作类型 val opType: String = eventData.getString("type") //事件处理结果集合 val datas = new util.ArrayList[AreaEvent]() //匹配操作事件类型,不符合的不处理 if (opTypeList.contains(opType)) { //符合规范的事件数据会进入处理逻辑 val database: String = eventData.getString("database") //获取表名 val table: String = eventData.getString("table") //获取事件详情 val data: JSONArray = eventData.getJSONArray("data") //遍历事件详情数组,如果是update或delete,可能会存在极多数据 //例如 delete from table1 where status=2 或者 update table1 set status=1 where status=2 data.forEach(x=>{ val model: AreaEvent = AreaEvent(database, table, opType, x.toString) datas.add(model) }) } datas }) mappedDataStream.addSink(new HbaseSink) environment.execute() } def getKafkaSource(): FlinkKafkaConsumer[String] = { val props = new Properties() props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "consumer-group-001") props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.setProperty("auto.offset.reset", "latest") new FlinkKafkaConsumer[String]("topic_lagou_area", new SimpleStringSchema(), props); } } ``` 5. 下沉到hbase的sink类 ```scala package com.cch.bigdata.flink.ods import com.cch.bigdata.flink.model.{AreaEvent, AreaInfo} import com.alibaba.fastjson.{JSON, JSONArray, JSONObject} import org.apache.hadoop.conf.Configuration import org.apache.flink.configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Delete, Put, Table} import org.apache.hadoop.hbase.util.Bytes import java.{lang, util} class HbaseSink extends RichSinkFunction[util.ArrayList[AreaEvent]]{ val columnFamily:String = "area" var tableName = "lagou_area"; var connection : Connection = _ var hbaseTable : Table = _ override def open(parameters: configuration.Configuration): Unit = { connection = getConnection() hbaseTable = connection.getTable(TableName.valueOf(tableName)) } def processInsert(event: AreaInfo) = { val put = new Put(event.id.toString.getBytes) put.addColumn(columnFamily.getBytes(), "name".getBytes(), event.name.getBytes()) put.addColumn(columnFamily.getBytes(), "pid".getBytes(), event.pid.toString.getBytes()) put.addColumn(columnFamily.getBytes(), "sname".getBytes(), event.sname.getBytes()) put.addColumn(columnFamily.getBytes(), "level".getBytes(), event.level.toString.getBytes()) put.addColumn(columnFamily.getBytes(), "citycode".getBytes(), event.citycode.getBytes()) put.addColumn(columnFamily.getBytes(), "yzcode".getBytes(), event.yzcode.getBytes()) put.addColumn(columnFamily.getBytes(), "mername".getBytes(), event.mername.getBytes()) put.addColumn(columnFamily.getBytes(), "lng".getBytes(), event.Lng.toString.getBytes()) put.addColumn(columnFamily.getBytes(), "lat".getBytes(), event.Lat.toString.getBytes()) put.addColumn(columnFamily.getBytes(), "pinyin".getBytes(), event.pinyin.getBytes()) hbaseTable.put(put) println("数据插入成功") } def processUpdate(event: AreaInfo): Unit = { processInsert(event)//hbase会进行版本变更 } def processDelete(event: AreaInfo): Unit = { val delete = new Delete(event.id.toString.getBytes()) hbaseTable.delete(delete) println("删除成功") } def processDefault(event: AreaInfo): Unit = { } override def invoke(value: util.ArrayList[AreaEvent], context: SinkFunction.Context[_]): Unit = { value.forEach(event=>{ val opType: String = event.opType val jsonData: String = event.opData val data: JSONObject = JSON.parseObject(jsonData) val id: Integer = data.getInteger("id"); val name: String = data.getString("name") val pid: Integer = data.getInteger("pid") val sname: String = data.getString("sname") val level: Integer = data.getInteger("level") val cityCode: String = data.getString("citycode") val yzcode: String = data.getString("yzcode") val mername: String = data.getString("mername") val Lng: lang.Float = data.getFloat("Lng") val Lat: lang.Float = data.getFloat("Lat") val pinyin: String = data.getString("pinyin") val areaInfo:AreaInfo = AreaInfo(id, name, pid, sname, level, cityCode, yzcode, mername, Lng, Lat, pinyin) opType match{ case "INSERT" => processInsert(areaInfo) case "UPDATE" => processUpdate(areaInfo) case "DELETE" => processDelete(areaInfo) case _ => processDefault(areaInfo) } }) } override def close(): Unit = { if(hbaseTable!=null){ hbaseTable.close() } if(connection!=null){ connection.close() } } def getConnection():Connection ={ val conf : Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum","localhost") conf.set("hbase.zookeeper.property.clientPort","2181") conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,30000) conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,30000) val connection = ConnectionFactory.createConnection(conf) connection } } ``` 6. 对应的样例类 ```scala package com.cch.bigdata.flink.model case class AreaEvent(dataBaseName:String,tableName:String,opType:String,opData:String) extends Serializable ``` ```scala package com.cch.bigdata.flink.model case class AreaInfo( id:Int, name:String, pid:Int, sname:String, level:Int, citycode:String, yzcode:String, mername:String, Lng:Float, Lat:Float, pinyin:String ) ``` 7. 执行insert操作后结果 ```shell caochuanhong@cch-macbook-pro ~ % hbase shell 2021-11-27 22:02:16,087 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable HBase Shell; enter 'help' for list of supported commands. Type "exit" to leave the HBase Shell Version 1.3.1, r930b9a55528fe45d8edce7af42fef2d35e77677a, Thu Apr 6 19:36:54 PDT 2017 hbase(main):001:0> scan 'lagou_area' ROW COLUMN+CELL 100000 column=area:citycode, timestamp=1638021730606, value=0 100000 column=area:lat, timestamp=1638021730606, value=39.9151 100000 column=area:level, timestamp=1638021730606, value=0 100000 column=area:lng, timestamp=1638021730606, value=116.368 100000 column=area:mername, timestamp=1638021730606, value=\xE4\xB8\xAD\xE5\x9B\xBD 100000 column=area:name, timestamp=1638021730606, value=\xE4\xB8\xAD\xE5\x9B\xBD 100000 column=area:pid, timestamp=1638021730606, value=0 100000 column=area:pinyin, timestamp=1638021730606, value=China 100000 column=area:sname, timestamp=1638021730606, value=\xE4\xB8\xAD\xE5\x9B\xBD 100000 column=area:yzcode, timestamp=1638021730606, value=0 1 row(s) in 0.1980 seconds hbase(main):002:0> ``` 8. 执行update操作后结果 ```shell hbase(main):002:0> scan 'lagou_area' ROW COLUMN+CELL 100000 column=area:citycode, timestamp=1638021771114, value=0 100000 column=area:lat, timestamp=1638021771114, value=39.9151 100000 column=area:level, timestamp=1638021771114, value=0 100000 column=area:lng, timestamp=1638021771114, value=116.368 100000 column=area:mername, timestamp=1638021771114, value=\xE4\xB8\xAD\xE5\x9B\xBD 100000 column=area:name, timestamp=1638021771114, value=\xE5\xA4\xA7\xE4\xB8\xAD\xE5\x9B\xBD 100000 column=area:pid, timestamp=1638021771114, value=0 100000 column=area:pinyin, timestamp=1638021771114, value=big China 100000 column=area:sname, timestamp=1638021771114, value=\xE4\xB8\xAD\xE5\x9B\xBD 100000 column=area:yzcode, timestamp=1638021771114, value=0 1 row(s) in 0.0230 seconds hbase(main):003:0> ``` 9. 执行delete操作后结果 ```shell hbase(main):003:0> scan 'lagou_area' ROW COLUMN+CELL 0 row(s) in 0.0150 seconds hbase(main):004:0> ``` 其余详情请查看代码 ## 数据监控 ### prometheus安装(容器方式) 官方文档:https://prometheus.io/docs/prometheus/latest/installation/ ```shell docker run -d --name=prometheus \ --restart=always \ -p 9090:9090 \ -v /etc/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml \ -v prometheus_data:/prometheus \ -v /etc/localtime:/etc/localtime:ro \ prom/prometheus ``` ```yaml # my global config global: scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute. evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute. # scrape_timeout is set to the global default (10s). # Alertmanager configuration alerting: alertmanagers: - static_configs: - targets: # - alertmanager:9093 # Load rules once and periodically evaluate them according to the global 'evaluation_interval'. rule_files: # - "first_rules.yml" # - "second_rules.yml" # A scrape configuration containing exactly one endpoint to scrape: # Here it's Prometheus itself. scrape_configs: # The job name is added as a label `job=` to any timeseries scraped from this config. - job_name: "prometheus" # metrics_path defaults to '/metrics' # scheme defaults to 'http'. static_configs: - targets: ["localhost:9090"] ``` ![ui](asset/prometheus.png) ### grafana安装(容器方式) 官方文档:https://grafana.com/docs/grafana/latest/installation/docker/ ```shell docker run -d --name=grafana \ --restart always \ -p 3000:3000 \ -v /etc/localtime:/etc/localtime \ -v grafana-storage:/var/lib/grafana \ grafana/grafana ``` ![ui](asset/grafana.png) ### pushgateway安装(容器方式) ```shell docker run -d -p 9091:9091 --name pushgateway prom/pushgateway ``` ![ui](asset/pushgateway.png)