# spring-boot-starter-canal **Repository Path**: projectzone/spring-boot-starter-canal ## Basic Information - **Project Name**: spring-boot-starter-canal - **Description**: 随着 spring boot 框架的逐渐流行,越来越多的 spring boot 组件随之诞生,我们需要做一个数据迁移和数据同步的业务,我就想着将其封装成 springboot 组件,这是写这个组件的初衷。 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-08-24 - **Last Updated**: 2022-10-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # canal 集成为 springboot 组件 *** ## 想法 ###### 随着 spring boot 框架的逐渐流行,越来越多的 spring boot 组件随之诞生,今天带大家一起来实现 canal 集成为 spring boot 的组件的详细过程。 ###### 我们大佬需要我们做一个数据迁移和数据同步的业务,我就想着将其封装成 springboot 组件,这是写这个组件的初衷。 ###### 也许还有小伙伴没接触过 canal ,想了解的伙伴请前往[官网文档](https://github.com/alibaba/canal) *** ## 思路 - 配置构思和书写 - 连接规则制定 - 书写 canal 客户端 - 监听 canal 客户端从 canal 服务端推送的消息,并处理。 - 处理消息机制,这里使用了注解方式 - 测试类去测试所写的组件 *** ## 实现 - 创建配置文件 1. [CanalProperties.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/config/CanalProperties.java):读取 spring boot 配置文件信息 2. [CanalClientConfiguration.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/config/CanalClientConfiguration.java):加载 canal 配置,并启动客户端 - 注解使其成为 spring boot 组件 [EnableCanalClient.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/EnableCanalClient.java):该注解作用是启用 canal - canal 客户端书写:[SimpleCanalClient.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/client/core/SimpleCanalClient.java) 1. 初始化监听器(注解方式:[CanalEventListener.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/CanalEventListener.java)),这里通过一个工具类,BeanUtil,通过反射注入 bean (包含通过注解方式的数据同步). 2. 开启 canal 客户端,若是开启多个客户端,会开启多个进程。 3. 初始化一个线程池,使得线程复用,减小频繁创建线程带来的内存开销。 4. 通过线程池开启 canal 客户端,每一个客户端都是一个线程。 - canal 客户端处理消息过程:[AbstractBasicMessageTransponder.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/client/abstracts/AbstractBasicMessageTransponder.java);[DefaultMessageTransponder.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/client/transfer/DefaultMessageTransponder.java) 1. 获取消息,判断消息可用性 2. 可用的消息处理机制 3. 消息消费完成确认 4. 处理消息发生异常,等待设定的心跳时间进行重试,当重试机制次数超过指定的次数,停止 canal 客户端,结束线程。 - canal 处理消息操作,主要通过反射和代理模式实现:[ListenPoint.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/ListenPoint.java) 1. 创建表操作 注解方式:[CreateTableListenPoint.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/table/CreateTableListenPoint.java) 2. 删除表操作 注解方式:[DropTableListenPoint.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/table/DropTableListenPoint.java) 3. 修改表信息 注解方式:[AlertTableListenPoint.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/table/AlertTableListenPoint.java) 4. 重新命名表 注解方式:[RenameTableListenPoint.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/table/RenameTableListenPoint.java) 5. 创建索引 注解方式:[CreateIndexListenPoint.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/table/CreateIndexListenPoint.java) 6. 删除索引 注解方式:[DropIndexListenPoint.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/table/DropIndexListenPoint.java) 7. 新增数据 注解方式:[InsertListenPoint.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/content/InsertListenPoint.java) 8. 更新数据 注解方式:[UpdateListenPoint.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/content/UpdateListenPoint.java) 9. 删除数据 注解方式:[DeleteListenPoint.java](https://gitee.com/projectzone/spring-boot-starter-canal/blob/master/starter-canal/src/main/java/cn/mxtao/starter/canal/annotation/content/DeleteListenPoint.java) *** ## 使用 ###### 组件写好了,那么怎么来使用呢?打个比方,锄头做好了,如何日当午才是关键,哈哈.... ###### 首先我们来看看通过 @ConfigurationProperties 注解的 CanalProperties 这个类,能配置的内容在 Instance 这个内部静态类里面,目前支持如下配置: ```markdown #canal 服务器地址,默认是本地的环回地址 canal.client.instances.${example}.host=127.0.0.1 #canal 服务设置的端口,默认 11111 canal.client.instances.${example}.port=11111 #集群 设置的用户名 canal.client.instances.${example}.userName=root canal.client.instances.${example}.user-name=root #集群 设置的密码 canal.client.instances.${example}.password=123456 #批量从 canal 服务器获取数据的最多数目 canal.client.instances.${example}.batchSize=1000 canal.client.instances.${example}.batch-size=1000 #是否有过滤规则 canal.client.instances.${example}.filter=.*\\..* #当错误发生时,重试次树 canal.client.instances.${example}.retryCount=20 canal.client.instances.${example}.retry-count=20 #信息捕获心跳时间 canal.client.instances.${example}.acquireInterval=1000 canal.client.instances.${example}.acquire-interval=1000 ``` ####### 假若你所有的环境都搞定了,包括 mysql 开启 binlog 日志,canal 伪装从数据库连接到 mysql 等,然后配置信息都正确,那就开始正文了 - 通过注解的方式处理数据:[MyAnnoEventListener.java](hhttps://gitee.com/projectzone/spring-boot-starter-canal/blob/master/canal-test/src/main/java/cn/mxtao/canal/canaltest/lister/MyAnnoEventListener.java) ###### 启动服务,操作 db,观察数据 ```markdown ======================注解方式(修改表信息操作)========================== use dao; /* ApplicationName=IntelliJ IDEA 2019.1.2 */ ALTER TABLE user ADD age int DEFAULT 18 NOT NULL COMMENT '年龄' ====================================================== ======================注解方式(新增数据操作)========================== use dao; INSERT INTO user(id,name,age) VALUES('85','wxz','107'); ====================================================== ``` ## 感言 ###### 初识 canal ,不足之处,还望多多指正和批评,在此感谢。