# flink-cdc-sqlServer-mysql
**Repository Path**: li23/flink-cdc-sql-server-mysql
## Basic Information
- **Project Name**: flink-cdc-sqlServer-mysql
- **Description**: 使用Flink CDC 同步数据。Source:SqlServer。Sink:MySQL。
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 5
- **Forks**: 1
- **Created**: 2023-05-15
- **Last Updated**: 2025-06-25
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# flink-cdc-sqlServer-mysql
# 1.概述
使用`flink-connector-sqlserver-cdc 2.3.0`把数据从`SQL Server`实时同步到`MySQL`中。**(下面有代码)**
# 2.环境
## 2.1.`SQL Server`
要求`SqlServer`版本为14及以上,也就是`SQL Server 2017`版。
## 2.2.`MySQL`
我使用的`MySQL`版本为`8.0.33`,`5~8`版本的`MySQL`应该都没有问题。
## 2.3.`flink-connector-sqlserver-cdc`
因为我要从`SQL Server`把数据同步到`MySQL`,所以只能使用`Flink CDC 2.2`。因为在`Flink CDC 2.2`及更高的版本中`Flink CDC`才支持`SQL Server`,而且对`SQL Server`的版本有严格要求。
# 3.实现
## 3.1.`SqlServer`数据库设置
- `CDC`在`SQL Server`数据库上启用。
- `SQL Server`代理正在运行。
- 您就是数据库的`db_owner`。
## 3.2.`SqlServer`数据库脚本
``` sql
--新建数据库
create database zy_erp_cdc;
-- 在需要开启CDC的数据库上执行如下脚本
if exists(select 1 from sys.databases where name='zy_erp_cdc' and is_cdc_enabled=0)
begin
exec sys.sp_cdc_enable_db
end
-- 查询数据库的CDC开启状态,查询结果为1,表示开启成功。
select is_cdc_enabled from sys.databases where name='zy_erp_cdc';
-- 最好新建文件组,如新建文件组 CDC
-- 新建表
create table zy_erp_cdc.dbo.student(
id int primary key,
name varchar(50),
age int,
mark int
);
-- 在数据表上启动CDC
USE zy_erp_cdc
GO
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo', -- Specifies the schema of the source table.
@source_name = 'student', -- Specifies the name of the table that you want to capture.
@role_name = NULL, -- Specifies a role MyRole to which you can add users to whom you want to grant SELECT permission on the captured columns of the source table. Users in the sysadmin or db_owner role also have access to the specified change tables. Set the value of @role_name to NULL, to allow only members in the sysadmin or db_owner to have full access to captured information.
@filegroup_name = 'CDC1',-- Specifies the filegroup where SQL Server places the change table for the captured table. The named filegroup must already exist. It is best not to locate change tables in the same filegroup that you use for source tables.
@supports_net_changes = 0
GO
--测试脚本
select * from zy_erp_cdc.dbo.student;
insert into dbo.student values(1,'小黑',18,1);
update dbo.student set name = '小白' where id = 1;
delete dbo.student where id=1;
```
## 3.3.`MySQL`数据库脚本
``` sql
create database zy_ods;
create table zy_ods.student
(
id int not null
primary key,
name varchar(50) null,
age int null,
mark int null
);
```
## 3.4.`Flink CDC`采集到的数据格式
### 3.4.1.`Insert`
``` json
{"before":null,"after":{"id":1,"name":"小黑","age":18,"mark":1},"source":{"version":"1.6.4.Final","connector":"sqlserver","name":"sqlserver_transaction_log_source","ts_ms":1683209454617,"snapshot":"false","db":"zy_erp_cdc","sequence":null,"schema":"dbo","table":"student","change_lsn":"00000028:00000968:001b","commit_lsn":"00000028:00000968:001c","event_serial_no":1},"op":"c","ts_ms":1683180664795,"transaction":null}
```
### 3.4.2.`Update`
``` json
{"before":{"id":1,"name":"小黑","age":18,"mark":1},"after":{"id":1,"name":"小白","age":18,"mark":1},"source":{"version":"1.6.4.Final","connector":"sqlserver","name":"sqlserver_transaction_log_source","ts_ms":1683209589897,"snapshot":"false","db":"zy_erp_cdc","sequence":null,"schema":"dbo","table":"student","change_lsn":"00000028:00000d00:0002","commit_lsn":"00000028:00000d00:0003","event_serial_no":2},"op":"u","ts_ms":1683180799648,"transaction":null}
```
### 3.4.3.`Delete`
``` json
{"before":{"id":1,"name":"小白","age":18,"mark":1},"after":null,"source":{"version":"1.6.4.Final","connector":"sqlserver","name":"sqlserver_transaction_log_source","ts_ms":1683209644903,"snapshot":"false","db":"zy_erp_cdc","sequence":null,"schema":"dbo","table":"student","change_lsn":"00000028:00000d68:0002","commit_lsn":"00000028:00000d68:0005","event_serial_no":1},"op":"d","ts_ms":1683180855132,"transaction":null}
```
# 3.代码实现
## 3.1.代码结构

## 3.2.`pom.xml`
``` xml
4.0.0lhw.comflink-cdc-demo1.0-SNAPSHOT2.11.01.14.4org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_2.11${flink.version}org.apache.flinkflink-table-api-java${flink.version}org.apache.flinkflink-clients_2.11${flink.version}com.ververicaflink-connector-sqlserver-cdc2.3.0org.apache.flinkflink-connector-jdbc_2.111.14.4log4jlog4j1.2.17org.slf4jslf4j-nop1.7.2org.projectlomboklombok1.18.26com.alibabafastjson2.0.29mysqlmysql-connector-java8.0.33
```
## 3.3.`Java`代码
``` java
package com.lhw.bean;
import lombok.Data;
import java.util.Map;
/**
* @ClassName CommonBean
* @Author lihongwei
* @Version 1.0.0
* @Description 技术可行性验证,通用类
* @Date 2023/5/5 10:43
*/
@Data
public class CommonBean {
private Map before;
private Map after;
private Map source;
private String op;
private String ts_ms;
private String transaction;
}
```
``` java
package com.lhw.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
/**
* @ClassName CollectionConfig
* @Author lihongwei
* @Version 1.0.0
* @Description 配置文件加载
* @Date 2023/5/5 9:58
*/
public class CollectionConfig {
private static final Logger LOG = LoggerFactory.getLogger(CollectionConfig.class);
public final static Properties config = new Properties();
static {
InputStream profile = CollectionConfig.class.getClassLoader().getResourceAsStream("dev/config.properties");
try {
config.load(profile);
} catch (IOException e) {
LOG.info("load profile error!");
}
for (Map.Entry