# dbmtsdk
**Repository Path**: isyscore_admin/dbmtsdk
## Basic Information
- **Project Name**: dbmtsdk
- **Description**: 数据迁移引擎 DBMT 的接入 SDK
- **Primary Language**: Unknown
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2021-11-02
- **Last Updated**: 2024-06-04
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 指令集数据迁移 SDK 使用说明
本 SDK 被设计来用于高效的纯数据迁移,支持在 Oracle、SQLServer、MySQL、SQLite、达梦、以及其他数据库之间进行迁移。
本 SDK 支持跨平台,支持多种编程语言,可以接实际需求,在现有的技术栈条件下进行接入。
- - -
### 环境准备
若是在 JVM 条件下使用本 SDK,不需要进行额外的环境准备。
若是在 JVM 以外的条件使用,则需要将 ```对应平台的动态库文件``` 复制到项目所在目录下。复制后即告完成了环境准备
### 配置文件
为了进行数据迁移,开发者可以编写配置文件如下:
```json
{
"source":{
"charset":"UTF8",
"database":"helowin",
"dbType":"Oracle",
"host":"10.211.55.23",
"jdbcUrl":"jdbc:oracle:thin:@//10.211.55.23:1521/helowin",
"odbcDSN":"",
"odbcDriver":"",
"password":"sys",
"port":1521,
"user":"sys"
},
"target":{
"charset":"utf8mb4",
"database":"MigSample",
"dbType":"MySQL",
"host":"10.211.55.23",
"jdbcUrl":"jdbc:mysql://10.211.55.23:3306/MigSample?useUnicode=true&characterEncoding=utf8",
"odbcDSN":"",
"odbcDriver":"",
"password":"root",
"port":3306,
"user":"root"
},
"tableMap":[
{
"sourceTable":{
"idField":"ID",
"name":"TEXTS"
},
"targetTable":{
"idField":"id",
"name":"en_texts"
},
"mapping":[
{
"dest":"id",
"origin":"ID"
},
{
"dest":"name",
"origin":"NAME"
},
{
"dest":"description",
"origin":"DESC"
},
{
"dest":"subdesc",
"origin":"STR1"
}
]
},
{
"sourceTable":{
"idField":"DESC",
"name":"JA_TEXTS"
},
"targetTable":{
"idField":"id",
"name":"jp_texts"
},
"mapping":[
{
"dest":"id",
"origin":"ID"
},
{
"dest":"name",
"origin":"NAME"
},
{
"dest":"description",
"origin":"DESC"
},
{
"dest":"subdesc",
"origin":"STR1"
}
]
}
]
}
```
对于每一个 json block,解释如下:
**source**
源数据库,将要从此数据库将数据迁出。
**target**
目标数据库,将要接收从源数据库迁出的数据。
| 字段 | 含义 | 备注 |
| :-- | :-- | :-- |
| dbType | 数据库类型 | JVM 下可取值为 Oracle, MySQL, SQLServer, SQLite, DaMeng
其他环境下可取值为 Oracle, MySQL, SQLServer, SQLite, Other |
| jdbcUrl | 用于 JDBC 连接的字符串 | 适用于 JVM SDK |
| host | 主机名称(可以是IP或域名) | 适用于 JVM 以外的 SDK |
| port | 端口号 | 适用于 JVM 以外的 SDK |
| user | 用户名 | |
| password | 密码 | |
| database | 要连接的数据库名 | 适用于 JVM 以外的 SDK |
| charset | 操作数据时所用的字符集 | |
| odbcDriver | 用于 ODBC 连接的驱动名称 | 适用于 JVM 以外的 SDK |
| odbcDSN | 用于 ODBC 连接的 DSN | 适用于 JVM 以外的 SDK |
在 JVM 环境下,仅能使用 JDBC 进行连接,可以忽略 odbc 相关的参数。在 JVM 以外的环境下,若是 dbType 被选择为 Other,则表示该数据库使用 ODBC 进行连接,odbc 相关的参数此时生效。
**tableMap**
tableMap 指出了迁移的映射关系,其数据结构如下:
| 字段 | 含义 | 备注 |
| :-- | :-- | :-- |
| sourceTable | 源表 | |
| sourceTable.name | 源表的表名 | |
| sourceTable.idField | 源表的主键列名 | 用于差量迁移 |
| targetTable | 目标表 | |
| targetTable.name | 目标表的表名 | |
| targetTable.idField | 目标表的主键列名 | 用于差量迁移 |
| mapping | 字段映射关系 | |
| mapping[idx].origin | 源表内的列名 | |
| mapping[idx].dest | 对应的目标表内的列名 | |
tableMap 以数组形式表示,对于每一个要迁移的表,均占用该数组内一个 block。每个 block 均需要指出,源表,目标表,以及迁移时的字段对应关系。
同样的,开发者也可以通过代码来构建这样的配置文件,下面即是以 Kotlin 进行的声明式配置,若是使用 Java 或其他语言的 SDK,需要使用命令式的代码来进行配置,此处暂时不表。
```kotlin
val dbCfg = dbmt {
source {
dbType = DBMTTypes.Oracle
jdbcUrl = "jdbc:oracle:thin:@//10.211.55.23:1521/helowin"
user = "sys"
password = "sys"
}
target {
dbType = DBMTTypes.MySQL
jdbcUrl = "jdbc:mysql://10.211.55.23:3306/MigSample?useUnicode=true&characterEncoding=utf8"
user = "root"
password = "root"
}
tableMap {
sourceTable {
name = "TEXTS"
idField = "ID"
}
targetTable {
name = "en_texts"
idField = "id"
}
mapping("ID", "id")
mapping("NAME", "name")
mapping("DESC", "description")
mapping("STR1", "subdesc")
}
tableMap {
sourceTable {
name = "JA_TEXTS"
idField = "ID"
}
targetTable {
name = "jp_texts"
idField = "id"
}
mapping("ID", "id")
mapping("NAME", "name")
mapping("DESC", "description")
mapping("STR1", "subdesc")
}
}
```
**若是阅读时觉得以上描述难以看懂,或对于手写配置文件没有信心,可以参考我们提供的```可视化配置工具```,它可以生成合法的配置文件。**
完成对配置文件的构建后,即可以进行数据迁移能力的接入。
- - -
### 数据迁移
数据库迁移的实际过程将按照编程语言分为以下几个部分:
**1) Java/Kotlin**
需要引入数据迁移 SDK,按使用的依赖管理工具,引入方法分别为:
**Maven**:
```xml
com.github.isyscore
iscdbmtsdk
1.0.0
```
**Gradle**:
```
implementation 'com.github.isyscore:iscdbmtsdk:1.0.0'
```
迁移时使用代码如下:
```kotlin
val m = dbmt("./config.json") ?: return // 构建配置文件
DBMTSDK.migrate(
m, // 配置文件
fromIds: listOf(1200, 5781), // 数据的起始ID,按配置内 mapping 对应的 source 表为基准排序
transactionBatchSize = 1000 // 每一批数据的迁移条目数
) { srcTable, targetTable, progress, total -> // 迁移过程回调函数,四个参数分别为 源表,目标表,当前迁移条目数,总条目数
println("from: $srcTable, to: $targetTable, progress: $progress, total: $total")
}
```
以上代码即可完成数据迁移,若是需要全量迁移,则 ```fromIds``` 填写为 null,也可以对 ```listOf``` 内的某个参数填写为 null,表示指定的表需要全量迁移。
```transactionBatchSize``` 指出了按批次迁移时,每一批数据的条目数,这个数据需要开发者自行根据内存情况和网络情况进行调整,一般来说,执行代码侧的机器配置越高,网络状况越好,```transactionBatchSize``` 的值就应该设置得越大,以更好的利用资源来提升性能。若反之则应当将 ```transactionBatchSize``` 调小,以防止发生 OOM 等情况。
对于迁移中发生的异常,可以在迁移完毕后调用 ```migrateErrors``` 接口来获取。代码如下:
```kotlin
val err = DBMTSDK.migrateErrors()
err.forEach { println(it) }
```
此时将列出在迁移过程中所发生的异常。
对于 ```Java``` 使用的情况,将 ```DBMTSDK``` 替换为 ```DBMTSDKCompat``` 即可,它是一个针对 Java 的包装层,以抹去 Java 与 Kotlin 之间的语法差异,使得 Java 可以以基本类同的方法调用 Kotlin 的函数。
**2) C/C++**
针对 C/C++ 的接入方式如下:
```c
#include
#include
#include "ISCDBMTSDK.h"
void migCallback(const char* srcTable, const char* targetTable, int progress, int total) {
printf("src = %s, tar = %s, p = %d, t = %d\n", srcTable, targetTable, progress, total);
}
void migErrorCallback(const int code, const char* time, const char* module, const char* message) {
printf("code: %d, time: %s, module: %s, message: %s\n", code, time, module, message);
}
int main() {
char* cfg = "./config.json";
int ids[2] = {1200, 5781};
cmigrate(cfg, ids, 2, 1000, migCallback);
int errCount = cmigrateError(migErrorCallback);
printf("error count = %d\n", errCount);
return 0;
}
```
**编译方式**:
```
$ gcc host.c -L. -liscdbmtsdk
```
**执行方式**:
```
$ LD_LIBRARY_PATH=../../ld:. ./host
```
**3) Go**
针对 Go 的接入方式如下,需要先编写 ```cgo bridge```:
```go
package main
/*
void migCallback_cgo(const char* srcTable, const char* targetTable, int progress, int total) {
void migCallback(const char*, const char*, int, int);
migCallback(srcTable, targetTable, progress, total);
}
void migErrorCallback_cgo(int code, const char* time, const char* module, const char* message) {
void migErrorCallback(int code, const char* time, const char* module, const char* message);
migErrorCallback(code, time, module, message);
}
*/
import "C"
```
上面看起来都是注释,但是这是 ```cgo``` 的一个特性,这里的注释代码就是 C 代码,是可以被编译执行的。
然后再直接在 go 内引入 C SDK 即可:
```go
package main
/*
#cgo CFLAGS: -I./
#cgo LDFLAGS: -L. -liscdbmtsdk
#include "ISCDBMTSDK.h"
void migCallback_cgo(const char* srcTable, const char* targetTable, int progress, int total);
void migErrorCallback_cgo(int code, const char* time, const char* module, const char* message);
*/
import "C"
import (
"fmt"
"unsafe"
)
//export migCallback
func migCallback(srcTable *C.char, targetTable *C.char, progress int, totel int) {
fmt.Printf("src = %s, tar = %s, p = %d, t = %d\n", C.GoString(srcTable), C.GoString(targetTable), progress, totel)
}
//export migErrorCallback
func migErrorCallback(code int, time *C.char, module *C.char, message *C.char) {
fmt.Printf("code = %d, time = %s, module = %s, message = %s\n", code, C.GoString(time), C.GoString(module), C.GoString(message))
}
func main() {
cs := C.CString("./config.json")
C.cmigrate(cs, nil, 0, 1000, (C.CMigrateCallback)(unsafe.Pointer(C.migCallback_cgo)))
errCount := int(C.cmigrateError((C.CMigrateErrorCallback)(unsafe.Pointer(C.migErrorCallback_cgo))))
fmt.Printf("errCount = %d\n", errCount)
}
```
注意这里的注释,是编译嵌入的 C 代码的脚本,它将被 go 解析并执行,因此不能删除这些注释。下面的 ```//export``` 也是 ```cgo``` 约定俗成的写法,用于将 go 函数与 cgo bridge 内的函数描述进行关联和调用。不要将这些代码当成注释处理。
**编译方式**
```
go build host.go DBMTBridge.go
```
**执行方式**
```
LD_LIBRARY_PATH=../../ld:. ./host
```
**4) Python**
针对 Python 的接入方式如下:
```python
from ctypes import *
CMigrateCallbackType = CFUNCTYPE(
# return void
None,
# char* srcTable
c_char_p,
# char* targetTable
c_char_p,
# int progress
c_int,
# int total
c_int)
CMigrateErrorCallbackType = CFUNCTYPE(
# return void
None,
# code
c_int,
# time
c_char_p,
# module
c_char_p,
# message
c_char_p
)
def mig_callback(src_table, target_table, progress, total):
print(f'src = {src_table.decode()}, tar = {target_table.decode()}, p = {progress}, t = {total}')
def mig_err_callback(code, time, module, message):
print(f'code = {code}, time = {time.decode()}, module = {module.decode()}, message = {message.decode()}')
if __name__ == '__main__':
libFile = './libiscdbmtsdk.dylib'
cfgFile = './config.json'.encode()
C = cdll.LoadLibrary(libFile)
C.cmigrate(cfgFile, None, 0, 1000, CMigrateCallbackType(mig_callback))
errCount = C.cmigrateError(CMigrateErrorCallbackType(mig_err_callback))
print(f'errCount = {errCount}')
```
**执行方式**
```
$ LD_LIBRARY_PATH=../../ld:. python3 main.py
```
- - -
### 执行器
当完成了配置文件的构建,并且不打算接入 SDK 的场景下,可以使用执行器,直接进行数据迁移,执行器分为 JVM 版本和原生版本两种,分别满足不同的使用场景。
JVM 版本的使用方法:
```
$ java -jar Executor.jar
```
原生版本的使用方法:
```
$ ./executor
```
```config path``` 为配置文件的路径,该路径不能包含空格和特殊字符;
```ID(s)``` 表示差量迁移时的起始主键序号值,以半角逗号分隔,若不需要此项,则可以填入 null;
```transaction batch size``` 为一个事务批次提交的数据量,若不需要此项,可以填入 null,此时默认的数据量为 1000、
样例命令:
```
$ ./executor config.json null null // 全量迁移,每个事务1000条数据
$ ./executor config.json 10000,20000 5000 // 两个表的起始ID分别为10000和20000,每个事务5000条数据
$ ./executor config.json 100,null,200 2000 // 三个表分别为以100为起始点迁移,全量迁移,以200为起始点迁移,每个事务2000条数据
```
- - -
### 性能指标
测试在 Macbook Pro 上进行,数据库均在 localhost,主机内存 64G
| 数据量 | 字段数 | 批量大小 | 耗时 | 速度 |
| :-- | :-- | :-- | :-- | :-- |
| 25000 | 4 | 500 | 8695ms | 2875条/s |
| 25000 | 4 | 1000 | 9006ms | 2776条/s |
| 25000 | 4 | 5000 | 9191ms | 2720条/s |
| 25000 | 4 | 10000 | 8369ms | 2987条/s |
| 25000 | 4 | 20000 | 8470ms | 2952条/s |
| 25000 | 5 | 500 | 9786ms | 2555条/s |
| 25000 | 5 | 1000 | 9843ms | 2540条/s |
| 25000 | 5 | 5000 | 8721ms | 2867条/s |
| 25000 | 5 | 10000 | 9377ms | 2666条/s |
| 25000 | 5 | 20000 | 9956ms | 2511条/s |
| 25000 | 7 | 500 | 9861ms | 2535条/s |
| 25000 | 7 | 1000 | 8850ms | 2825条/s |
| 25000 | 7 | 5000 | 8759ms | 2854条/s |
| 25000 | 7 | 10000 | 9665ms | 2587条/s |
| 25000 | 7 | 20000 | 10009ms | 2498条/s |
| 25000 | 10 | 500 | 9431ms | 2651条/s |
| 25000 | 10 | 1000 | 9079ms | 2754条/s |
| 25000 | 10 | 5000 | 8908ms | 2806条/s |
| 25000 | 10 | 10000 | 9628ms | 2597条/s |
| 25000 | 10 | 20000 | 10246ms | 2440条/s |
从数据可知,当批量大小在 5000 时,迁移速度最优,可达2800条/秒,在研发过程中,这个数字是有参考意义的。