# flink-cdc-sqlServer-mysql **Repository Path**: li23/flink-cdc-sql-server-mysql ## Basic Information - **Project Name**: flink-cdc-sqlServer-mysql - **Description**: 使用Flink CDC 同步数据。Source:SqlServer。Sink:MySQL。 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 5 - **Forks**: 1 - **Created**: 2023-05-15 - **Last Updated**: 2025-06-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # flink-cdc-sqlServer-mysql # 1.概述 使用`flink-connector-sqlserver-cdc 2.3.0`把数据从`SQL Server`实时同步到`MySQL`中。**(下面有代码)** # 2.环境 ## 2.1.`SQL Server` 要求`SqlServer`版本为14及以上,也就是`SQL Server 2017`版。 ## 2.2.`MySQL` 我使用的`MySQL`版本为`8.0.33`,`5~8`版本的`MySQL`应该都没有问题。 ## 2.3.`flink-connector-sqlserver-cdc` 因为我要从`SQL Server`把数据同步到`MySQL`,所以只能使用`Flink CDC 2.2`。因为在`Flink CDC 2.2`及更高的版本中`Flink CDC`才支持`SQL Server`,而且对`SQL Server`的版本有严格要求。 # 3.实现 ## 3.1.`SqlServer`数据库设置 - `CDC`在`SQL Server`数据库上启用。 - `SQL Server`代理正在运行。 - 您就是数据库的`db_owner`。 ## 3.2.`SqlServer`数据库脚本 ``` sql --新建数据库 create database zy_erp_cdc; -- 在需要开启CDC的数据库上执行如下脚本 if exists(select 1 from sys.databases where name='zy_erp_cdc' and is_cdc_enabled=0) begin exec sys.sp_cdc_enable_db end -- 查询数据库的CDC开启状态,查询结果为1,表示开启成功。 select is_cdc_enabled from sys.databases where name='zy_erp_cdc'; -- 最好新建文件组,如新建文件组 CDC -- 新建表 create table zy_erp_cdc.dbo.student( id int primary key, name varchar(50), age int, mark int ); -- 在数据表上启动CDC USE zy_erp_cdc GO EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', -- Specifies the schema of the source table. @source_name = 'student', -- Specifies the name of the table that you want to capture. @role_name = NULL, -- Specifies a role MyRole to which you can add users to whom you want to grant SELECT permission on the captured columns of the source table. Users in the sysadmin or db_owner role also have access to the specified change tables. Set the value of @role_name to NULL, to allow only members in the sysadmin or db_owner to have full access to captured information. @filegroup_name = 'CDC1',-- Specifies the filegroup where SQL Server places the change table for the captured table. The named filegroup must already exist. It is best not to locate change tables in the same filegroup that you use for source tables. @supports_net_changes = 0 GO --测试脚本 select * from zy_erp_cdc.dbo.student; insert into dbo.student values(1,'小黑',18,1); update dbo.student set name = '小白' where id = 1; delete dbo.student where id=1; ``` ## 3.3.`MySQL`数据库脚本 ``` sql create database zy_ods; create table zy_ods.student ( id int not null primary key, name varchar(50) null, age int null, mark int null ); ``` ## 3.4.`Flink CDC`采集到的数据格式 ### 3.4.1.`Insert` ``` json {"before":null,"after":{"id":1,"name":"小黑","age":18,"mark":1},"source":{"version":"1.6.4.Final","connector":"sqlserver","name":"sqlserver_transaction_log_source","ts_ms":1683209454617,"snapshot":"false","db":"zy_erp_cdc","sequence":null,"schema":"dbo","table":"student","change_lsn":"00000028:00000968:001b","commit_lsn":"00000028:00000968:001c","event_serial_no":1},"op":"c","ts_ms":1683180664795,"transaction":null} ``` ### 3.4.2.`Update` ``` json {"before":{"id":1,"name":"小黑","age":18,"mark":1},"after":{"id":1,"name":"小白","age":18,"mark":1},"source":{"version":"1.6.4.Final","connector":"sqlserver","name":"sqlserver_transaction_log_source","ts_ms":1683209589897,"snapshot":"false","db":"zy_erp_cdc","sequence":null,"schema":"dbo","table":"student","change_lsn":"00000028:00000d00:0002","commit_lsn":"00000028:00000d00:0003","event_serial_no":2},"op":"u","ts_ms":1683180799648,"transaction":null} ``` ### 3.4.3.`Delete` ``` json {"before":{"id":1,"name":"小白","age":18,"mark":1},"after":null,"source":{"version":"1.6.4.Final","connector":"sqlserver","name":"sqlserver_transaction_log_source","ts_ms":1683209644903,"snapshot":"false","db":"zy_erp_cdc","sequence":null,"schema":"dbo","table":"student","change_lsn":"00000028:00000d68:0002","commit_lsn":"00000028:00000d68:0005","event_serial_no":1},"op":"d","ts_ms":1683180855132,"transaction":null} ``` # 3.代码实现 ## 3.1.代码结构 ![image.png](https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f05b8d4231974dd2a02bf33364c0465a~tplv-k3u1fbpfcp-watermark.image?) ## 3.2.`pom.xml` ``` xml 4.0.0 lhw.com flink-cdc-demo 1.0-SNAPSHOT 2.11.0 1.14.4 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_2.11 ${flink.version} org.apache.flink flink-table-api-java ${flink.version} org.apache.flink flink-clients_2.11 ${flink.version} com.ververica flink-connector-sqlserver-cdc 2.3.0 org.apache.flink flink-connector-jdbc_2.11 1.14.4 log4j log4j 1.2.17 org.slf4j slf4j-nop 1.7.2 org.projectlombok lombok 1.18.26 com.alibaba fastjson 2.0.29 mysql mysql-connector-java 8.0.33 ``` ## 3.3.`Java`代码 ``` java package com.lhw.bean; import lombok.Data; import java.util.Map; /** * @ClassName CommonBean * @Author lihongwei * @Version 1.0.0 * @Description 技术可行性验证,通用类 * @Date 2023/5/5 10:43 */ @Data public class CommonBean { private Map before; private Map after; private Map source; private String op; private String ts_ms; private String transaction; } ``` ``` java package com.lhw.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.util.Map; import java.util.Properties; /** * @ClassName CollectionConfig * @Author lihongwei * @Version 1.0.0 * @Description 配置文件加载 * @Date 2023/5/5 9:58 */ public class CollectionConfig { private static final Logger LOG = LoggerFactory.getLogger(CollectionConfig.class); public final static Properties config = new Properties(); static { InputStream profile = CollectionConfig.class.getClassLoader().getResourceAsStream("dev/config.properties"); try { config.load(profile); } catch (IOException e) { LOG.info("load profile error!"); } for (Map.Entry kv : config.entrySet()) { LOG.info(kv.getKey()+"="+kv.getValue()); } } // // public static void main(String[] args) { // String property = config.getProperty("mysql.driver"); // System.out.println("------------------"+property); // } } ``` ``` java package com.lhw.inter; import com.lhw.bean.CommonBean; import org.apache.flink.streaming.api.datastream.DataStream; /** * @ClassName ProcessDataInterface * @Author lihongwei * @Version 1.0.0 * @Description TODO * @Date 2023/5/5 11:19 */ public interface ProcessDataInterface { void process(DataStream commonData); } ``` ``` java package com.lhw.job; import com.lhw.bean.CommonBean; import com.lhw.config.CollectionConfig; import com.lhw.map.CommonBeanMap; import com.lhw.task.StudentTask; import com.ververica.cdc.connectors.sqlserver.SqlServerSource; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.log4j.Logger; /** * @ClassName CollectionStream * @Author lihongwei * @Version 1.0.0 * @Description 主类,处理流程 * @Date 2023/5/4 13:09 */ public class CollectionStream { private static Logger logger = Logger.getLogger(CollectionStream.class); public static void main(String[] args) { DebeziumSourceFunction sourceFunction = SqlServerSource.builder() .hostname(CollectionConfig.config.getProperty("sqlserver.hostname")) .port(Integer.parseInt(CollectionConfig.config.getProperty("sqlserver.port"))) .database(CollectionConfig.config.getProperty("sqlserver.database"))// monitor sqlserver database .tableList(CollectionConfig.config.getProperty("sqlserver.tableList"))// monitor products table .username(CollectionConfig.config.getProperty("sqlserver.username")) .password(CollectionConfig.config.getProperty("sqlserver.password")) .deserializer(new JsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //SqlServer 增量业务数据流 DataStreamSource streamSource = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering; // streamSource.print(); // 数据转换,转换成通用类 DataStream commonBeanStream = streamSource.map(new CommonBeanMap()); // 数据过滤,只保留student表的数据 SingleOutputStreamOperator filterDataStream = commonBeanStream.filter( new FilterFunction() { public boolean filter(CommonBean commonBean) throws Exception { return commonBean.getSource().get("table").equals("student"); } } ); // 业务数据处理,学生表 new StudentTask().process(filterDataStream); // 触发执行 try { env.execute(); } catch (Exception e) { logger.error("===自动化脚本执行异常==="); logger.error(e); e.printStackTrace(); } } } ``` ``` java package com.lhw.map; import com.alibaba.fastjson.JSON; import com.lhw.bean.CommonBean; import org.apache.flink.api.common.functions.MapFunction; /** * @ClassName CommonBeanMap * @Author lihongwei * @Version 1.0.0 * @Description 技术可行性验证代码,将从SqlServer监听到的数据转换为统一的类型 * @Date 2023/5/5 10:48 */ public class CommonBeanMap implements MapFunction { public CommonBean map(String s) throws Exception { CommonBean commonBean = JSON.parseObject(s, CommonBean.class); return commonBean; } } ``` ``` java package com.lhw.map; import com.lhw.bean.CommonBean; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.types.Row; /** * @ClassName StudentMap * @Author lihongwei * @Version 1.0.0 * @Description 技术可行性验证代码,将通用类转换为与业务对应的Row * @Date 2023/5/5 11:47 */ public class StudentMap extends RichMapFunction { public Row map(CommonBean commonBean) throws Exception { Row row; if (commonBean.getOp().equals("c") || commonBean.getOp().equals("u")|| commonBean.getOp().equals("r")) { row = new Row(4); row.setField(0, Integer.parseInt(commonBean.getAfter().get("id"))); row.setField(1, commonBean.getAfter().get("name")); row.setField(2, Integer.parseInt(commonBean.getAfter().get("age"))); row.setField(3, Integer.parseInt(commonBean.getAfter().get("mark"))); }else{ row = new Row(1); row.setField(0, Integer.parseInt(commonBean.getBefore().get("id"))); } return row; } } ``` ``` java package com.lhw.sink; import com.lhw.util.DbUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.types.Row; import java.sql.Connection; import java.sql.PreparedStatement; /** * @ClassName SinkMysql * @Author lihongwei * @Version 1.0.0 * @Description 写入mysql数据(通用) * @Date 2023/5/5 9:52 */ public class SinkMysql extends RichSinkFunction { String sql; public SinkMysql(String sql){ this.sql=sql; } Connection conn = null; PreparedStatement ps = null; //获取连接 @Override public void open(Configuration parameters) throws Exception { conn = DbUtil.getConnByJdbc(); ps = conn.prepareStatement(sql); } //执行操作 // @Override public void invoke(Row value, Context context) throws Exception { for (int i = 0; i < value.getArity(); i++) { ps.setObject(i+1,value.getField(i)); } //执行插入 ps.executeUpdate(); } //关闭连接 @Override public void close() throws Exception { if (ps != null){ ps.close(); } if (conn!=null){ conn.close(); } } } ``` ``` java package com.lhw.task; import com.lhw.bean.CommonBean; import com.lhw.config.CollectionConfig; import com.lhw.inter.ProcessDataInterface; import com.lhw.map.StudentMap; import com.lhw.sink.SinkMysql; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; /** * @ClassName StudentTask * @Author lihongwei * @Version 1.0.0 * @Description 技术可行性测试代码 * @Date 2023/5/5 11:14 */ public class StudentTask implements ProcessDataInterface { public void process(DataStream commonData) { // 新建侧边流分支(删除) //封装删除 final OutputTag deleteOpt = new OutputTag("deleteOpt", TypeInformation.of(CommonBean.class)); //数据分流 SingleOutputStreamOperator processData = commonData.process( new ProcessFunction() { @Override public void processElement(CommonBean commonBean, Context context, Collector collector) throws Exception { if (commonBean.getOp().equals("c") || commonBean.getOp().equals("u") || commonBean.getOp().equals("r")) { //insert or update collector.collect(commonBean); }else { //delete context.output(deleteOpt, commonBean); } } } ); String upsertSql = "replace into %s value(?,?,?,?)"; String deleteSql = "delete from %s where id=?"; //insert,update processData.map(new StudentMap()).addSink(new SinkMysql(String.format(upsertSql, CollectionConfig.config.getProperty("mysql.student.sql.table")))); //delete processData.getSideOutput(deleteOpt).map(new StudentMap()).addSink(new SinkMysql(String.format(deleteSql,CollectionConfig.config.getProperty("mysql.student.sql.table")))); } } ``` ``` java package com.lhw.util; import com.lhw.config.CollectionConfig; import java.sql.*; import java.util.*; /** * @ClassName DbUtil * @Author lihongwei * @Version 1.0.0 * @Description 数据库工具类 * @Date 2023/5/5 9:55 */ public class DbUtil { public static Map> query(String key, String sql) { Connection connection = null; PreparedStatement preparedStatement = null; ResultSet rs = null; Map> table = new HashMap>(); String[] keys = key.split(","); try { connection = DbUtil.getConnByJdbc(); preparedStatement = connection.prepareStatement(sql); rs = preparedStatement.executeQuery(); ResultSetMetaData rsm = rs.getMetaData(); int colNum = rsm.getColumnCount(); while (rs.next()) { Map row = new HashMap(); for (int i = 1; i <= colNum; i++) { row.put(rsm.getColumnName(i).toLowerCase(), rs.getObject(i)); } String keyVal = ""; for (String val : keys) { keyVal += String.valueOf(row.get(val.toLowerCase())).trim(); } if (keyVal.equalsIgnoreCase("")) { System.out.println("keyval is empty"); continue; } table.put(keyVal, row); } } catch (Exception e) { System.out.println("Query Database error!\n" + e); } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } return table; } public static void update(String sql) { Connection connection = null; Statement statement = null; try { connection = DbUtil.getConnByJdbc(); statement = connection.createStatement(); statement.executeUpdate(sql); } catch (Exception e) { System.out.println("Query Database error!\n" + e); } finally { if (statement != null) { try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } } // 获取数据库数据> public static Map>> queryForGroup(String key, String sql) { Connection connection = null; PreparedStatement preparedStatement = null; ResultSet rs = null; // 输出结果集 Map>> res = new HashMap>>(); try { connection = DbUtil.getConnByJdbc(); preparedStatement = connection.prepareStatement(sql); rs = preparedStatement.executeQuery(); ResultSetMetaData rsm = rs.getMetaData(); // 获取列数 int colNum = rsm.getColumnCount(); while (rs.next()) { // 获取keyVal String keyVal = rs.getString(key); if (res.containsKey(keyVal)) { List> oldRows = res.get(keyVal); Map cols = new HashMap(); for (int i = 1; i <= colNum; i++) { cols.put(rsm.getColumnName(i).toLowerCase(), rs.getObject(i)); } oldRows.add(cols); } else { List> newRows = new ArrayList>(); Map cols = new HashMap(); for (int i = 1; i <= colNum; i++) { cols.put(rsm.getColumnName(i).toLowerCase(), rs.getObject(i)); } newRows.add(cols); if (keyVal.equalsIgnoreCase("")) { System.out.println("keyval is empty"); continue; } res.put(keyVal, newRows); } } } catch (Exception e) { System.out.println("Query Database error!"); } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } return res; } public static Map queryKv(String sql) { Connection connection = null; PreparedStatement preparedStatement = null; ResultSet rs = null; // 输出结果集 Map map = new HashMap(); try { connection = getConnByJdbc(); preparedStatement = connection.prepareStatement(sql); rs = preparedStatement.executeQuery(); ResultSetMetaData rsm = rs.getMetaData(); int colNum = rsm.getColumnCount(); while (rs.next()) { int i = 1; while (i <= colNum) { map.put(rsm.getColumnName(i).toLowerCase(), rs.getString(i)); i += 1; } } } catch (Exception e) { System.out.println("Query Database error!"); } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } return map; } public static List> queryKvList(String sql) { Connection connection = null; PreparedStatement preparedStatement = null; ResultSet rs = null; // 输出结果集 List> list = new ArrayList>(); try { connection = getConnByJdbc(); preparedStatement = connection.prepareStatement(sql); rs = preparedStatement.executeQuery(); ResultSetMetaData rsm = rs.getMetaData(); int colNum = rsm.getColumnCount(); while (rs.next()) { Map map = new HashMap(); int i = 1; while (i <= colNum) { map.put(rsm.getColumnName(i).toLowerCase(), rs.getString(i)); list.add(map); i += 1; } } } catch (Exception e) { System.out.println("Query Database error!"); } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } return list; } public static Connection getConnByJdbc() { Connection conn = null; try { Class.forName(CollectionConfig.config.getProperty("mysql.driver")); conn = DriverManager.getConnection(CollectionConfig.config.getProperty("mysql.url"), CollectionConfig.config.getProperty("mysql.username"), CollectionConfig.config.getProperty("mysql.password")); } catch (Exception e) { e.printStackTrace(); } return conn; } } ``` ``` properties #MySQL配置 mysql.driver=com.mysql.cj.jdbc.Driver mysql.url=jdbc:mysql://192.168.2.71:3306/database1?useUnicode=true&characterEncoding=utf-8&allowEncodingChanges=true mysql.username=user mysql.password=password #写入mysql表名 mysql.student.sql.table=student #SqlServer配置 sqlserver.hostname=192.168.2.71 sqlserver.port=1433 sqlserver.database=database2 sqlserver.tableList=dbo.student sqlserver.username=user sqlserver.password=password ```