# flink-cdc-log-connectors **Repository Path**: thinkdata/flink-cdc-log-connectors ## Basic Information - **Project Name**: flink-cdc-log-connectors - **Description**: 使用flink-cdc-connectors改造而成,支持op属性 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2023-03-13 - **Last Updated**: 2023-03-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # CDC Log Connectors for Apache Flink®
flink-cdc-log-connectors是基于flink-cdc-connectors改造的一组Apache Flink®源连接器。flink-cdc-log-connectors集成了Debezium作为捕获数据变更的引擎,因此它可以充分利用Debezium的能力。使用它能够获取到flink-cdc-connectors所不支持的`op`属性,并能够与flink-cdc-connectors并列使用。 本自述文件旨在简要介绍用于Apache Flink®的CDC Log连接器的核心功能。它的使用方法除了连接器不同以外,其它跟flink-cdc-connectors完全一样。有关flink-cdc-connectors的完整详细的文档,参见[flink-cdc-connectors文档](https://ververica.github.io/flink-cdc-connectors/master/)。 ## 特性 1. 支持读取数据库快照,并继续读取事务日志,即使出现故障,也可以达到**有且只有一次处理**的效果。 2. 用于DataStream API的CDC Log连接器,用户可以在单个作业中使用多个数据库和表的变更数据,而无需部署Debezium和Kafka。 3. 用于Table/SQL API的CDC Log连接器,用户可以使用SQL和DDL创建CDC源以监视单个表上的变更。 ## Table/SQL API的用法 我们需要几个步骤使用提供的连接器来设置Flink集群 1. 安装1.12+和Java 8+版本的Flink集群。 2. 从[下载页面](https://mvnrepository.com/artifact/cn.tenmg)下载SQL连接器的jar(或自行构建)。 3. 将下载的jar放在`FLINK_HOME/lib/`下。 4. 重新启动Flink集群。 该示例显示了如何在[Flink SQL Client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/sqlClient.html)中创建MySQL CDC Log源并对其执行查询。 ```sql -- creates a mysql cdc table source CREATE TABLE mysql_binlog ( id INT NOT NULL, name STRING, description STRING, weight DECIMAL(10,3) ) WITH ( 'connector' = 'mysql-cdc-log', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'inventory', 'table-name' = 'products' ); -- 从MySQL读取快照和Binlog数据,进行一些转换,并在客户端上显示 SELECT id, UPPER(name), description, weight FROM mysql_binlog; ``` ## DataStream API的用法 包括以下Maven依赖项(可通过Maven 中央仓库获得)。 ```