# mysql2doris-by-flinkcdc **Repository Path**: wsrarea/mysql2doris-by-flinkcdc ## Basic Information - **Project Name**: mysql2doris-by-flinkcdc - **Description**: MySQL数据实时同步到doris,通过flinkcdc实现。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 10 - **Forks**: 9 - **Created**: 2022-11-27 - **Last Updated**: 2025-04-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ### # mysql2doris-by-flinkcdc **1、概述** 开源平台CloudCanal满足大多数场景下的数据同步任务,其中就包括了MySQL同步到doris。但是有些特殊的场景就无法满足了,比如存在需要自定义decimal长度的字段、MySQL表存在联合主键的情况等。 首先,推荐使用CloudCanal,因为简单、好维护。傻瓜式操作即可。 对于CloudCanal无法满足的场景,我们采用flink cdc 同步到doris的方案。但是针对cdc本身的问题,比如:cdc接入decimal类型、date类型、datetime类型的字段时,会存在异常输出的问题,以及时差问题等。此时,我们则需要通过实现JsonDebeziumDeserializationSchema方法去处理。 数据写入doris有多种方式。比如:通过http封装curl命令方式进行写入、StreamLoad方式写入、利用doris官网封装好的flink-doris-connector进行写入。 第一种方式最麻烦,需要自己进行代码封装,但也最灵活,实际上用的不多,除非需要自定义数据格式写入。 第二方式需要自己编写代码,且写入的数据格式必须数据JSONArray类型。实际上这种写入方式就是在第一种方式的基础上进行封装的,doris官网也有相关的代码。感兴趣可以去看看,实际上用的也不多。 第三种方式,是doris官网基于第二种方式进行了封装,使用起来最为方便,无需像第二种方式中那样,需要指定字段及数据写入的格式调整。 因此,我们选择第三种方式进行数据写入。即:利用doris官网封装好的flink-doris-connector进行写入。 针对同步flink cdc 同步数据到doris的情况,我们需要自己创建doris表,这里我们将增加一个自动创表的功能。 **2、CloudCanal** (1)登录CloudCanal ![输入图片说明](https://foruda.gitee.com/images/1669534273329266573/2b7f5760_5043527.png "屏幕截图") (2)创建任务并跟着指示操作即可 ![输入图片说明](https://foruda.gitee.com/images/1669534299800012837/c498b4ed_5043527.png "屏幕截图") ![输入图片说明](https://foruda.gitee.com/images/1669534306148203263/1ab388b4_5043527.png "屏幕截图") **3、自动创表** 思路: (1)获取MySQL中表的信息,主要是ddl以及字段个数。 (2)将MySQL中表的字段类型、字段长度与doris建表语句做适配。 (3)连接doris,并拼接好的doris建表语句进行建表。 **4、flinkcdcdemo** 见代码 ![输入图片说明](image.png) **5、数据核对** 思路 (1)通过JDBC查询MySQL与doris中表的数据量,然后作差。如果不是0,则发出告警。 (2)告警程序:通过钉钉或者企业微信的接口,将信息反馈给当前项目的开发者。