# dbSub **Repository Path**: Rezar/dbSub ## Basic Information - **Project Name**: dbSub - **Description**: 集群版mysql binlog 订阅 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 0 - **Created**: 2020-05-18 - **Last Updated**: 2023-04-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # dbSub #### 介绍 基于akkaCluster的分布式 binlog 订阅服务,客户端自动向在线binlog服务注册自身,集群服务按照数据库实例区分,客户端订阅多个数据库实例 的请求可能会均衡到多个binlog服务上,集群保证单台binlog服务端宕机不影响客户端继续订阅事件,系统会自动切换客户端连接的binlog服务端以持续 提供事件同步. #### 软件架构 1: AkkaCluster
2: SpringBoot
3: LevelDB 持久事件
#### 安装教程 mvn clean install -DskipTests=true java -jar dbSub-0.0.1-SNAPSHOT.jar -bc=initBinlog.xml -cc=binlogAkkaServer.conf Usage: [options] * -binlogConfig, -bc 启动的binlog配置文件(.xml) * -clusterConfig, -cc 启动的akka集群的配置文件(.conf) * -workFolder, -wf 服务存储文件的目录(默认为当前目录--[.]代表当前目录) Default: . * -maxStore, -ms 存储的单表的记录数上限,超过该数会清理掉历史数据 Default: 100000 [警告]停机服务请先使用 kill pid 停止服务(切勿直接使用kill -9 pid等),hook回调会保证服务正常停止并清理资源 如在使用kill pid之后[10,]秒后进程仍存活,可再使用kill -9 pid强制退出. 1) initBinlog.xml ```
``` 1: 订阅同一数据库实例的服务机器必须保证clientId不同
2: fromBinlogFile和fromBinlogOffset用于解决潜在问题1,用于服务B远落后与其他服务机器时,直接 指定binlog订阅开始的文件和下标再提供服务
3: filterTimeChange是过滤掉无关字段,即只是这些字段的数据发生变化,不下发binlog事件
4: acceptEvent:用于指定当前实例上订阅的关注的下发事件,ChangeType的数值(多服务端需保证配置一致)
2) binlogAkkaServer.conf ``` akka { loglevel = info actor { provider = cluster serializers { jackson-json = "akka.serialization.jackson.JacksonJsonSerializer" jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer" proto = "akka.remote.serialization.ProtobufSerializer" syncEvent = "com.Rezar.dbSub.base.event.SyncEventDataSerializer" } serialization-bindings { "com.Rezar.dbSub.base.event.SyncEvent" = syncEvent "com.Rezar.dbSub.utils.CborSerializable" = jackson-cbor } } remote { artery { canonical.hostname = "127.0.0.1" canonical.port = 42767 } } cluster { allow-weakly-up-members=off seed-nodes = [ "akka://binlogSyncSystem@127.0.0.1:42767"] } } ``` 1: canonical.hostname 实际使用中请替换为内网或公网IP
2: 如果种子机器两台以上或者同一机器多端口,请先保证种子节点中排第一个服务优先启动,否则集群无法建立
#### 客户端使用说明 1) 客户端处理变更事件的业务类实现ChangeDataHandler 接口 e.g. ``` @EventHandlerAnnot(dbInstance = "ad_ins", db = "dsp", tableName = "ext_audit", changeType={ChangeType.UPDATE}, filter = "") public class IdeaExtAuditChangeDataHandler implements ChangeDataHandler { @Getter private String curOffset; @Override public boolean onEvent(TestExtAudit oldData, TestExtAudit newData, String seqId, long timestamp, ChangeType changeType) { log.info("eventMsgId:{} changeType:{}", seqId, changeType); curOffset = seqId; return false; } } ``` 注意:dbInstance中不能包含(:)符号 其中:
1: dbInstance对应initBinlog.xml中的实例名
2: db/table 数据库实例上的库表名称
3: filter:支持数据过滤 如:filter= "where new.zmAuditStatus=0" 即该业务处理器只处理更新事件中满足 变更后的数据中 zmAuditStatus = 0 的变更事件
2) 客户端配置akkaCluster文件(默认为binlogAkkaClient.conf) ``` akka { loglevel = info actor { provider = cluster serializers { jackson-json = "akka.serialization.jackson.JacksonJsonSerializer" jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer" proto = "akka.remote.serialization.ProtobufSerializer" syncEvent = "com.Rezar.dbSub.base.event.SyncEventDataSerializer" } serialization-bindings { "com.Rezar.dbSub.base.event.SyncEvent" = syncEvent "com.Rezar.dbSub.utils.CborSerializable" = jackson-cbor } } remote { artery { canonical.port = 0 } } cluster { allow-weakly-up-members=off seed-nodes = [ "akka://binlogSyncSystem@127.0.0.1:42767" ] } } } ``` 3) 客户端启动
``` 1: spring : 将ClientInitWithSpring类注册到容器中 1) akkaConfig用于指定akka集群的配置 2) serverName对应需要订阅binlog事件的服务名称用于日志区分 3) forceToAll用于指定全部数据表的offset模式 ``` ``` 2: 非spring : 构建ClientInit并registerChangeDataHandler(xxxx)或registerSubTableInfo(xxx) 客户端订阅事件可以指定三种offset模式
1: CONTINUE 从服务端最新的seqId开始同步,即从连接到服务端后的位置开始同步事件,不管历史数据 2: LAST_POS 从客户端最后处理的seqId的下一个事件开始同步,服务端会接收客户端ack消息记录下客户端最后处理成功的数据 3: seqId 从指定seqId的下一条开始同步事件 系统默认为LAST_POS,如果需要指定模式,可对单表订阅配置一个SubTableInfo对象,offset="CONTINUE/seqId" ``` #### 系统说明 1) seqId
标识事件的序列ID,格式为[binlog文件数字后缀_binlog文件位移_批量index]
服务端从指定binlog文件和位移启动时,分别对应seqId的(binlog文件数字后缀)和(binlog文件位移)
``` ps:执行 show binary logs; 会显示 log_name 列,其中的文件名为类似:mysql-bin.000005这样,[binlog文件数字后缀]就是后面的5数字后缀, 根据seqId反推出对应的binlog文件,需要补充前缀:mysql-bin.00000,最终得到 mysql-bin.000005 ``` 2) 客户端会在baseDir/clientInfo目录下面记录单表处理完成的seqId,用于按照LAST_POS重连服务端 3) 数据清理
服务端会在单表数据超过一定数量(maxStore)之后清理超出数据,maxStore可在启动的时候指定-maxStore设置,需要注意的是: 如果offset模式为LAST_POS,存在该表数据被清理后无法再次从指定的位置同步事件,尽量保证不要让客户端与服务端断连过久. 4) 客户端代码打包 项目使用springboot,正常打包后是服务端可运行的jar包,如果需要打客户端使用的普通jar包,使用命令行 mvn clean package -DskipTests=true -D spring-boot.repackage.skip=true #### 潜在问题 1: [已解决]集群机器启动时间相差过大,会导致之前由服务A提供服务的客户端在服务A宕机重连服务B之后,带上了 一个服务B不存在的lastReadSeqId. 即服务B在服务A运行蛮久时间之后再加入集群中,服务B的binlog文件订阅的下标落后于服务A,由服务A 运行产生的seqId下发给客户端之后,客户端又在服务A宕机后重连服务B带上给服务B,但这个lastSeqId 在服务B上并未缓存对应的事件,导致事件丢失,如果客户端的重连策略是(CONTINUE),则无影响,若为FROM_OFFSET 或者LAST_CONTINUE,则会出现事件丢失的情况 解决:通过服务启动服务时指定binlog文件和下标来同步历史事件,以和其他服务趋于同步.