# dbmtsdk **Repository Path**: isyscore_admin/dbmtsdk ## Basic Information - **Project Name**: dbmtsdk - **Description**: 数据迁移引擎 DBMT 的接入 SDK - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-11-02 - **Last Updated**: 2024-06-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 指令集数据迁移 SDK 使用说明 本 SDK 被设计来用于高效的纯数据迁移,支持在 Oracle、SQLServer、MySQL、SQLite、达梦、以及其他数据库之间进行迁移。 本 SDK 支持跨平台,支持多种编程语言,可以接实际需求,在现有的技术栈条件下进行接入。 - - - ### 环境准备 若是在 JVM 条件下使用本 SDK,不需要进行额外的环境准备。 若是在 JVM 以外的条件使用,则需要将 ```对应平台的动态库文件``` 复制到项目所在目录下。复制后即告完成了环境准备 ### 配置文件 为了进行数据迁移,开发者可以编写配置文件如下: ```json { "source":{ "charset":"UTF8", "database":"helowin", "dbType":"Oracle", "host":"10.211.55.23", "jdbcUrl":"jdbc:oracle:thin:@//10.211.55.23:1521/helowin", "odbcDSN":"", "odbcDriver":"", "password":"sys", "port":1521, "user":"sys" }, "target":{ "charset":"utf8mb4", "database":"MigSample", "dbType":"MySQL", "host":"10.211.55.23", "jdbcUrl":"jdbc:mysql://10.211.55.23:3306/MigSample?useUnicode=true&characterEncoding=utf8", "odbcDSN":"", "odbcDriver":"", "password":"root", "port":3306, "user":"root" }, "tableMap":[ { "sourceTable":{ "idField":"ID", "name":"TEXTS" }, "targetTable":{ "idField":"id", "name":"en_texts" }, "mapping":[ { "dest":"id", "origin":"ID" }, { "dest":"name", "origin":"NAME" }, { "dest":"description", "origin":"DESC" }, { "dest":"subdesc", "origin":"STR1" } ] }, { "sourceTable":{ "idField":"DESC", "name":"JA_TEXTS" }, "targetTable":{ "idField":"id", "name":"jp_texts" }, "mapping":[ { "dest":"id", "origin":"ID" }, { "dest":"name", "origin":"NAME" }, { "dest":"description", "origin":"DESC" }, { "dest":"subdesc", "origin":"STR1" } ] } ] } ``` 对于每一个 json block,解释如下: **source** 源数据库,将要从此数据库将数据迁出。 **target** 目标数据库,将要接收从源数据库迁出的数据。 | 字段 | 含义 | 备注 | | :-- | :-- | :-- | | dbType | 数据库类型 | JVM 下可取值为 Oracle, MySQL, SQLServer, SQLite, DaMeng
其他环境下可取值为 Oracle, MySQL, SQLServer, SQLite, Other | | jdbcUrl | 用于 JDBC 连接的字符串 | 适用于 JVM SDK | | host | 主机名称(可以是IP或域名) | 适用于 JVM 以外的 SDK | | port | 端口号 | 适用于 JVM 以外的 SDK | | user | 用户名 | | | password | 密码 | | | database | 要连接的数据库名 | 适用于 JVM 以外的 SDK | | charset | 操作数据时所用的字符集 | | | odbcDriver | 用于 ODBC 连接的驱动名称 | 适用于 JVM 以外的 SDK | | odbcDSN | 用于 ODBC 连接的 DSN | 适用于 JVM 以外的 SDK | 在 JVM 环境下,仅能使用 JDBC 进行连接,可以忽略 odbc 相关的参数。在 JVM 以外的环境下,若是 dbType 被选择为 Other,则表示该数据库使用 ODBC 进行连接,odbc 相关的参数此时生效。 **tableMap** tableMap 指出了迁移的映射关系,其数据结构如下: | 字段 | 含义 | 备注 | | :-- | :-- | :-- | | sourceTable | 源表 | | | sourceTable.name | 源表的表名 | | | sourceTable.idField | 源表的主键列名 | 用于差量迁移 | | targetTable | 目标表 | | | targetTable.name | 目标表的表名 | | | targetTable.idField | 目标表的主键列名 | 用于差量迁移 | | mapping | 字段映射关系 | | | mapping[idx].origin | 源表内的列名 | | | mapping[idx].dest | 对应的目标表内的列名 | | tableMap 以数组形式表示,对于每一个要迁移的表,均占用该数组内一个 block。每个 block 均需要指出,源表,目标表,以及迁移时的字段对应关系。 同样的,开发者也可以通过代码来构建这样的配置文件,下面即是以 Kotlin 进行的声明式配置,若是使用 Java 或其他语言的 SDK,需要使用命令式的代码来进行配置,此处暂时不表。 ```kotlin val dbCfg = dbmt { source { dbType = DBMTTypes.Oracle jdbcUrl = "jdbc:oracle:thin:@//10.211.55.23:1521/helowin" user = "sys" password = "sys" } target { dbType = DBMTTypes.MySQL jdbcUrl = "jdbc:mysql://10.211.55.23:3306/MigSample?useUnicode=true&characterEncoding=utf8" user = "root" password = "root" } tableMap { sourceTable { name = "TEXTS" idField = "ID" } targetTable { name = "en_texts" idField = "id" } mapping("ID", "id") mapping("NAME", "name") mapping("DESC", "description") mapping("STR1", "subdesc") } tableMap { sourceTable { name = "JA_TEXTS" idField = "ID" } targetTable { name = "jp_texts" idField = "id" } mapping("ID", "id") mapping("NAME", "name") mapping("DESC", "description") mapping("STR1", "subdesc") } } ``` **若是阅读时觉得以上描述难以看懂,或对于手写配置文件没有信心,可以参考我们提供的```可视化配置工具```,它可以生成合法的配置文件。** 完成对配置文件的构建后,即可以进行数据迁移能力的接入。 - - - ### 数据迁移 数据库迁移的实际过程将按照编程语言分为以下几个部分: **1) Java/Kotlin** 需要引入数据迁移 SDK,按使用的依赖管理工具,引入方法分别为: **Maven**: ```xml com.github.isyscore iscdbmtsdk 1.0.0 ``` **Gradle**: ``` implementation 'com.github.isyscore:iscdbmtsdk:1.0.0' ``` 迁移时使用代码如下: ```kotlin val m = dbmt("./config.json") ?: return // 构建配置文件 DBMTSDK.migrate( m, // 配置文件 fromIds: listOf(1200, 5781), // 数据的起始ID,按配置内 mapping 对应的 source 表为基准排序 transactionBatchSize = 1000 // 每一批数据的迁移条目数 ) { srcTable, targetTable, progress, total -> // 迁移过程回调函数,四个参数分别为 源表,目标表,当前迁移条目数,总条目数 println("from: $srcTable, to: $targetTable, progress: $progress, total: $total") } ``` 以上代码即可完成数据迁移,若是需要全量迁移,则 ```fromIds``` 填写为 null,也可以对 ```listOf``` 内的某个参数填写为 null,表示指定的表需要全量迁移。 ```transactionBatchSize``` 指出了按批次迁移时,每一批数据的条目数,这个数据需要开发者自行根据内存情况和网络情况进行调整,一般来说,执行代码侧的机器配置越高,网络状况越好,```transactionBatchSize``` 的值就应该设置得越大,以更好的利用资源来提升性能。若反之则应当将 ```transactionBatchSize``` 调小,以防止发生 OOM 等情况。 对于迁移中发生的异常,可以在迁移完毕后调用 ```migrateErrors``` 接口来获取。代码如下: ```kotlin val err = DBMTSDK.migrateErrors() err.forEach { println(it) } ``` 此时将列出在迁移过程中所发生的异常。 对于 ```Java``` 使用的情况,将 ```DBMTSDK``` 替换为 ```DBMTSDKCompat``` 即可,它是一个针对 Java 的包装层,以抹去 Java 与 Kotlin 之间的语法差异,使得 Java 可以以基本类同的方法调用 Kotlin 的函数。 **2) C/C++** 针对 C/C++ 的接入方式如下: ```c #include #include #include "ISCDBMTSDK.h" void migCallback(const char* srcTable, const char* targetTable, int progress, int total) { printf("src = %s, tar = %s, p = %d, t = %d\n", srcTable, targetTable, progress, total); } void migErrorCallback(const int code, const char* time, const char* module, const char* message) { printf("code: %d, time: %s, module: %s, message: %s\n", code, time, module, message); } int main() { char* cfg = "./config.json"; int ids[2] = {1200, 5781}; cmigrate(cfg, ids, 2, 1000, migCallback); int errCount = cmigrateError(migErrorCallback); printf("error count = %d\n", errCount); return 0; } ``` **编译方式**: ``` $ gcc host.c -L. -liscdbmtsdk ``` **执行方式**: ``` $ LD_LIBRARY_PATH=../../ld:. ./host ``` **3) Go** 针对 Go 的接入方式如下,需要先编写 ```cgo bridge```: ```go package main /* void migCallback_cgo(const char* srcTable, const char* targetTable, int progress, int total) { void migCallback(const char*, const char*, int, int); migCallback(srcTable, targetTable, progress, total); } void migErrorCallback_cgo(int code, const char* time, const char* module, const char* message) { void migErrorCallback(int code, const char* time, const char* module, const char* message); migErrorCallback(code, time, module, message); } */ import "C" ``` 上面看起来都是注释,但是这是 ```cgo``` 的一个特性,这里的注释代码就是 C 代码,是可以被编译执行的。 然后再直接在 go 内引入 C SDK 即可: ```go package main /* #cgo CFLAGS: -I./ #cgo LDFLAGS: -L. -liscdbmtsdk #include "ISCDBMTSDK.h" void migCallback_cgo(const char* srcTable, const char* targetTable, int progress, int total); void migErrorCallback_cgo(int code, const char* time, const char* module, const char* message); */ import "C" import ( "fmt" "unsafe" ) //export migCallback func migCallback(srcTable *C.char, targetTable *C.char, progress int, totel int) { fmt.Printf("src = %s, tar = %s, p = %d, t = %d\n", C.GoString(srcTable), C.GoString(targetTable), progress, totel) } //export migErrorCallback func migErrorCallback(code int, time *C.char, module *C.char, message *C.char) { fmt.Printf("code = %d, time = %s, module = %s, message = %s\n", code, C.GoString(time), C.GoString(module), C.GoString(message)) } func main() { cs := C.CString("./config.json") C.cmigrate(cs, nil, 0, 1000, (C.CMigrateCallback)(unsafe.Pointer(C.migCallback_cgo))) errCount := int(C.cmigrateError((C.CMigrateErrorCallback)(unsafe.Pointer(C.migErrorCallback_cgo)))) fmt.Printf("errCount = %d\n", errCount) } ``` 注意这里的注释,是编译嵌入的 C 代码的脚本,它将被 go 解析并执行,因此不能删除这些注释。下面的 ```//export``` 也是 ```cgo``` 约定俗成的写法,用于将 go 函数与 cgo bridge 内的函数描述进行关联和调用。不要将这些代码当成注释处理。 **编译方式** ``` go build host.go DBMTBridge.go ``` **执行方式** ``` LD_LIBRARY_PATH=../../ld:. ./host ``` **4) Python** 针对 Python 的接入方式如下: ```python from ctypes import * CMigrateCallbackType = CFUNCTYPE( # return void None, # char* srcTable c_char_p, # char* targetTable c_char_p, # int progress c_int, # int total c_int) CMigrateErrorCallbackType = CFUNCTYPE( # return void None, # code c_int, # time c_char_p, # module c_char_p, # message c_char_p ) def mig_callback(src_table, target_table, progress, total): print(f'src = {src_table.decode()}, tar = {target_table.decode()}, p = {progress}, t = {total}') def mig_err_callback(code, time, module, message): print(f'code = {code}, time = {time.decode()}, module = {module.decode()}, message = {message.decode()}') if __name__ == '__main__': libFile = './libiscdbmtsdk.dylib' cfgFile = './config.json'.encode() C = cdll.LoadLibrary(libFile) C.cmigrate(cfgFile, None, 0, 1000, CMigrateCallbackType(mig_callback)) errCount = C.cmigrateError(CMigrateErrorCallbackType(mig_err_callback)) print(f'errCount = {errCount}') ``` **执行方式** ``` $ LD_LIBRARY_PATH=../../ld:. python3 main.py ``` - - - ### 执行器 当完成了配置文件的构建,并且不打算接入 SDK 的场景下,可以使用执行器,直接进行数据迁移,执行器分为 JVM 版本和原生版本两种,分别满足不同的使用场景。 JVM 版本的使用方法: ``` $ java -jar Executor.jar ``` 原生版本的使用方法: ``` $ ./executor ``` ```config path``` 为配置文件的路径,该路径不能包含空格和特殊字符; ```ID(s)``` 表示差量迁移时的起始主键序号值,以半角逗号分隔,若不需要此项,则可以填入 null; ```transaction batch size``` 为一个事务批次提交的数据量,若不需要此项,可以填入 null,此时默认的数据量为 1000、 样例命令: ``` $ ./executor config.json null null // 全量迁移,每个事务1000条数据 $ ./executor config.json 10000,20000 5000 // 两个表的起始ID分别为10000和20000,每个事务5000条数据 $ ./executor config.json 100,null,200 2000 // 三个表分别为以100为起始点迁移,全量迁移,以200为起始点迁移,每个事务2000条数据 ``` - - - ### 性能指标 测试在 Macbook Pro 上进行,数据库均在 localhost,主机内存 64G | 数据量 | 字段数 | 批量大小 | 耗时 | 速度 | | :-- | :-- | :-- | :-- | :-- | | 25000 | 4 | 500 | 8695ms | 2875条/s | | 25000 | 4 | 1000 | 9006ms | 2776条/s | | 25000 | 4 | 5000 | 9191ms | 2720条/s | | 25000 | 4 | 10000 | 8369ms | 2987条/s | | 25000 | 4 | 20000 | 8470ms | 2952条/s | | 25000 | 5 | 500 | 9786ms | 2555条/s | | 25000 | 5 | 1000 | 9843ms | 2540条/s | | 25000 | 5 | 5000 | 8721ms | 2867条/s | | 25000 | 5 | 10000 | 9377ms | 2666条/s | | 25000 | 5 | 20000 | 9956ms | 2511条/s | | 25000 | 7 | 500 | 9861ms | 2535条/s | | 25000 | 7 | 1000 | 8850ms | 2825条/s | | 25000 | 7 | 5000 | 8759ms | 2854条/s | | 25000 | 7 | 10000 | 9665ms | 2587条/s | | 25000 | 7 | 20000 | 10009ms | 2498条/s | | 25000 | 10 | 500 | 9431ms | 2651条/s | | 25000 | 10 | 1000 | 9079ms | 2754条/s | | 25000 | 10 | 5000 | 8908ms | 2806条/s | | 25000 | 10 | 10000 | 9628ms | 2597条/s | | 25000 | 10 | 20000 | 10246ms | 2440条/s | 从数据可知,当批量大小在 5000 时,迁移速度最优,可达2800条/秒,在研发过程中,这个数字是有参考意义的。