# MysqlToPhoenixFlink **Repository Path**: suntyu/MysqlToPhoenix ## Basic Information - **Project Name**: MysqlToPhoenixFlink - **Description**: canal采集binlog入kafka,flink实时处理数据并sink到phoenix中 - **Primary Language**: Java - **License**: GPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 10 - **Forks**: 9 - **Created**: 2020-05-08 - **Last Updated**: 2024-10-31 ## Categories & Tags **Categories**: dbmanager **Tags**: None ## README ## MySQL实时同步数据到Phoenix --- Canal——>Kafka——>Flink——>Phoenix --- ### 打包 `mvn clean package -DskipTests` ### 程序运行 `./bin/flink run -d ./runJar/mysqlToPhoenix.jar --job ./jobSetting.json` ### job配置 ``` { "sink": { "mode": "phoenix", "phoenixTable": "phoenixTableName", "phoenixDriver": "org.apache.phoenix.jdbc.PhoenixDriver", "phoenixUrl": "jdbc:phoenix:xxx,xxx,xxx:2181:/hbase" }, "parallelism": 3, "taskName": "mysqlToPhoenixTask", "source": { "mode": "kafka", "partitionParallel": 3, "mysqlTable": "mysqlTableName", "server": "172.16.1.90:9092", "mysqlPk": "id", "offsetFlag":"latest" "partitionOffsets": [ 1111111, 2222222, 3333333 ], "topic": "topicName" } } ``` > 其中partitionOffsets有的话,按照partition0-n设置offset; > 没有的话,检查offsetFlag是否存在,存在按照offsetFlag消费; > 否则自动从最新的数据开始消费 ******************* ## 需要关注的问题 - 大量历史数据在MySQL库查询慢,迁移Phoenix中查询,性能提升显著 - Phoenix中的数据类型与MySQL不完全一致,需要转存合适类型,比如bit在Phoenix中使用tinyint存储 - Phoenix对接了PrestoSql,对于Phoenix中的时间戳没有支持,改为了bigint存储 - 同步过程中,处理数据为null问题 - PhoenixSink中,处理表数据类型与Flink-stream中具体数据列的匹配及转化问题 ## 功能 - Canal数据格式FlatMessage,Mysql的元数据类型,Phoenix元数据类型三者的数据及类型自动映射,自动填充NULL数据 - 只处理DML,以后增加DDL - 任意表的同步通用方案,