# 格创实时处理引擎平台 **Repository Path**: longshiyu/getech-streaming-engine ## Basic Information - **Project Name**: 格创实时处理引擎平台 - **Description**: No description available - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 27 - **Created**: 2022-04-29 - **Last Updated**: 2022-04-29 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 介绍 Data-development-streaming-flink-job 项目是基于Flink流处理的实时数据同步组件。该项目由TCL集团孵化的深圳格创东智科技有限公司的数据中台组研发。该组件上手简单,同步速率高,任务运行稳定,已经为TCL集团各产业服务了五年之久,助力于提升工业互联网的大数据计算能力。经过不断的更新迭代,项目已经支持多种数据源,现在开源出来给各位同学们使用。 # 数据源 ## | 数据源 | Streaming | Sql | 读 | 写 | |:---------- | --------- | --- | --- | --- | | Kafka | 支持 | 支持 | 支持 | 支持 | | Kudu | 支持 | 支持 | | 支持 | | Hive | 支持 | 支持 | | 支持 | | Mysql | 支持 | 支持 | | 支持 | | Oracle | 支持 | 支持 | | 支持 | | PostgreSQL | 支持 | 支持 | | 支持 | | MongoDB | 支持 | 支持 | | 支持 | | ClickHouse | 支持 | 支持 | | 支持 | # 快速开始 * 项目打包 > mvn clean scala:compile compile install -DskipTests 完成后将包上传到环境中 # 配置说明 - SQL * kafka to kafak `参数与DAG含义保持一致, sql语句用;隔开` ```json { "flinkJobName": "596_menuId_281", "flinkSqls": "\n\n\n\n\n\n\ninsert into shishi_nei_0401_text\nselect\n*\nfrom\ntest_topic_waibu", "flinkSinks": "CREATE TABLE shishi_nei_0401_text( \nuserid string,action string,duration string,dt string\n) WITH ( \n'connector.type' = 'kafka','connector.version' = 'universal','format.field-delimiter' = '\t','format.type' = 'csv','connector.topic' = 'test118','connector.properties.bootstrap.servers' = 'cdh-hadoop-2:9092')", "resource": [], "flinkSources": "CREATE TABLE test_topic_waibu( \nuserid string,action string,duration string,dt string\n) WITH ( \n'connector.type' = 'kafka','connector.version' = 'universal','format.type' = 'json','connector.topic' = 'test_csv','connector.properties.bootstrap.servers' = 'cdh-hadoop-2:9092','connector.startup-mode' = 'earliest-offset')" } ``` * kafka to hive`参数与DAG含义保持一致, sql语句用;隔开` ```json { "flinkJobName": "593_menuId_280", "flinkSqls": "\n\n\n\n\n\n\nreal0401_json\nselect\n*\nfrom\nshishi_nei_0401_json", "resource": [ { "password": "hdfs", "isPartitionTable": false, "isDynamicPartition": false, "fields": [ { "class_type": "STRING", "isPartition": false, "field_name": "userid" }, { "class_type": "STRING", "isPartition": false, "field_name": "dt" }, { "class_type": "STRING", "isPartition": false, "field_name": "duration" }, { "class_type": "STRING", "isPartition": false, "field_name": "action" } ], "outputFormat": "json", "db": "test", "sinkTableName": "real0401_json_sink_2346", "url": "jdbc:hive2://bigdata-test-1:10000/test", "typename": "hive", "username": "hdfs", "tableName": "real0401_json" } ] } ``` - Streaming - kafka to hive ```json { -- 消费速率 "consumerByteRate": 0, -- 任务名称 "flinkJobName": "521_menuId_261", -- hive配置数据 "resource": { -- 是否分区表 "isPartitionTable": false, -- url[未使用] "url": "jdbc:hive2://bigdata-test-5:10000/test", -- 表名 "tableName": "test_120_parquet", -- 写入模式: append,cover "appendType": "cover", -- jdbc连接hive的密码[未使用] "password": "hdfs", -- 需要转换的data format字段 "collectionConfig": { -- format "collectionTimeFormat": "${yyyy-MM-dd HH:mm:ss}", -- 字段名 "collectionTimeName": "action" }, -- 是否开启动态分区 "isDynamicPartition": false, -- 字段 "fields": [ { -- 字段类型 "class_type": "STRING", -- 是否为分区字段 "isPartition": false, -- 字段名 "field_name": "userid" }, { "class_type": "STRING", "isPartition": false, "field_name": "action" }, { "class_type": "STRING", "isPartition": false, "field_name": "duration" }, { "class_type": "STRING", "isPartition": false, "field_name": "dt" } ], -- 输出的文件类型 "outputFormat": "parquet", -- 库名 "db": "test", -- 库名 "sinkTableName": "real0401_parquet_sink_2348", -- 数据类型: hive, kudu "typename": "hive", -- jdbc连接hive的用户名[未使用] "username": "hdfs" }, -- kafka配置 "kafkaResource": { -- 启动位点 "startUp": "EARLIEST", -- 分割字符类型,具体参考: cn.getech.data.development.enums.DelimitFormat "delimitFormat": 1, -- 数据类型 "format": "json", -- topic "topic": "test_topic", -- 参数 "fields": [ { -- value "class_type": "bigdata-test-4:9092,bigdata-test-5:9092,bigdata-test-6:9092", -- key "field_name": "bootstrap.servers" }, { -- value "class_type": "xiaorongzhuangId", -- key "field_name": "group.id" }, { -- value "class_type": "partition:0,offset:10000", -- key "field_name": "specific-offsets" } ], -- 数据类型: kafka "typename": "kafka" } } ``` - kudu `除resource外其余参数与hive含义保持一致, resource部分与dag kudu的配置相同` # 页面预览 下面展示我们数据中台-数据同步模块部分页面 ![输入图片说明](image/2.png) ![输入图片说明](image/3.png) ![输入图片说明](image/4.png)