diff --git a/.gitignore b/.gitignore
index fbb1d7706cedbb5aed3072d6014eae3fc52448d5..96f47cedd5bddf1dc9d95217fe3241be1f6719bd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,5 +31,15 @@ Dockerfile_functest
pkg/
test/
-tools/
+tools/intarkdb_server/
+tools/local-db/
+tools/local-server/
+tools/data-sync-agent/
+tools/gradle-7.2-all.zip
+tools/pressure_test/
+tools/rust-gstor-multithreading/
+tools/rust-gstor/
+tools/rust-memory-allocato/
+tools/shell_tools/
+tools/wget-log
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 28f0dde57fada416cf72388aa5dd90c880512c72..91297eb3d0cb7d2202ab4027f38aa5c72f4c1376 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -425,7 +425,7 @@ endif ()
# OPTION(TEST "option for test module" OFF)
# message(STATUS "TEST = ${TEST}")
# if (TEST)
- add_subdirectory(test)
+# add_subdirectory(test)
# endif()
install(
diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..cf0ec86042a21c111540cc3976a5ef9e3b0a834a
--- /dev/null
+++ b/tools/CMakeLists.txt
@@ -0,0 +1,3 @@
+add_subdirectory(intarkdb_cli)
+
+add_subdirectory(sqlite3-api-test)
diff --git a/tools/data-sync-agent/.gitignore b/tools/data-sync-agent/.gitignore
new file mode 100644
index 0000000000000000000000000000000000000000..1f5d0169cbfa0ee143536f5e44d085e87be8b70c
--- /dev/null
+++ b/tools/data-sync-agent/.gitignore
@@ -0,0 +1,60 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+/.mvn/
+/mvnw
+/mvnw.cmd
+
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
diff --git a/tools/data-sync-agent/README.md b/tools/data-sync-agent/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..a70d62dde50d709e66c4fcb7020259fcc807ddd6
--- /dev/null
+++ b/tools/data-sync-agent/README.md
@@ -0,0 +1,31 @@
+# data-sync-agent
+
+此工具用于将数据从IntarkDB同步到openGauss数据库中,支持时序数据表的全量同步和增量同步。
+
+#### 一、工程说明
+
+##### 1、编程语言:Java
+
+##### 2、编译工具:Maven
+
+#### 二、使用说明
+
+##### 1、数据表准备
+云端数据库的数据表,第一列必须是node_id varchar,其余字段与边端IntarkDB数据表完全一致。
+
+##### 2、配置文件修改
+修改src/main/resources文件夹下conf.yaml文件,其中
+- centerDb为云端openGauss数据库的对应配置信息,[ip, port, user, password]为openGauss数据库的ip地址、端口、用户名、密码,
+- localDb为边端intarkDB数据库的对应配置信息,[ip, port, user, password, nodeId]为intarkDB数据库的ip地址、端口、用户名、密码、节点id
+- syncTable为需要同步的数据表的配置信息,[tableName, syncColumn]为数据表名,数据表中的时间索引列
+- loopWait为任务队列处理线程空闲等待时长,单位为毫秒
+
+##### 3、程序运行
+启动云端openGauss数据库
+
+启动边端IntarkDB数据库的local-db程序
+
+使用maven打包data-sync-agent,可以得到cloud-edge-0.0.1-SNAPSHOT.jar,将修改好的配置文件放在同一目录下,输入指令执行
+```
+java -Dconfig_file='conf.yaml' -jar .\cloud-edge-0.0.1-SNAPSHOT.jar
+```
diff --git a/tools/data-sync-agent/pom.xml b/tools/data-sync-agent/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..c76c5f2da8406cd78cb8ef408ca6e24852a06ad0
--- /dev/null
+++ b/tools/data-sync-agent/pom.xml
@@ -0,0 +1,107 @@
+
+
+ 4.0.0
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 3.2.0
+
+
+ com.intarkdb
+ cloud-edge
+ 0.0.1-SNAPSHOT
+ cloud-edge
+ Demo project for Spring Boot
+
+ 17
+ 3.2.0
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.instardb
+ InstarDB-jdbc
+ 1.0
+ system
+ ${project.basedir}/src/main/resources/lib/InstarDB-socket-jdbc-1.0.jar
+
+
+ org.opengauss
+ opengauss-jdbc
+ 5.0.0
+ system
+ ${project.basedir}/src/main/resources/lib/opengauss-jdbc-5.0.0.jar
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.8.0
+ test
+
+
+
+ org.yaml
+ snakeyaml
+ 1.29
+
+
+
+ ch.qos.logback
+ logback-core
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${spring-boot.version}
+ pom
+ import
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 11
+ 11
+ UTF-8
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring-boot.version}
+
+ com.intarkdb.cloudedge.CloudEdgeApplication
+ true
+
+
+
+ repackage
+
+ repackage
+
+
+
+
+
+
+
+
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/CloudEdgeApplication.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/CloudEdgeApplication.java
new file mode 100644
index 0000000000000000000000000000000000000000..16d2caab02363a620829c124f51bbaa44548fdd9
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/CloudEdgeApplication.java
@@ -0,0 +1,45 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge;
+
+import com.intarkdb.cloudedge.config.ConfigProperties;
+import com.intarkdb.cloudedge.config.ConfigReader;
+import com.intarkdb.cloudedge.mission.SimpleMissionUploadManagerImpl;
+import com.intarkdb.cloudedge.mission.UploadMissionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class CloudEdgeApplication {
+
+ private static final Logger log = LoggerFactory.getLogger(CloudEdgeApplication.class);
+
+ public static void main(String[] args) {
+// SpringApplication.run(CloudEdgeApplication.class, args);
+
+ // read config
+ ConfigProperties conf = ConfigReader.readConfigFromYaml();
+
+ UploadMissionManager syncManager = new SimpleMissionUploadManagerImpl();
+ // UploadMissionManager syncManager = new AsyncMissionUploadManagerImpl();
+ syncManager.init(conf);
+ log.info("init finished");
+ syncManager.start();
+ }
+
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/center/AsyncCopyManagerHelper.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/center/AsyncCopyManagerHelper.java
new file mode 100644
index 0000000000000000000000000000000000000000..557a472bc0316fea59346f9835123eabdfea30b8
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/center/AsyncCopyManagerHelper.java
@@ -0,0 +1,22 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.center;
+
+public class AsyncCopyManagerHelper {
+
+
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/center/CopyManagerHelper.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/center/CopyManagerHelper.java
new file mode 100644
index 0000000000000000000000000000000000000000..3bb6aafab148721f46a0002d4c89db4835b387b3
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/center/CopyManagerHelper.java
@@ -0,0 +1,119 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.center;
+
+import com.intarkdb.cloudedge.config.DbMsg;
+import org.opengauss.copy.CopyManager;
+import org.opengauss.core.BaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.text.MessageFormat;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * 中心数据库相关的方法
+ */
+
+public class CopyManagerHelper {
+
+ private static final Logger log = LoggerFactory.getLogger(CopyManagerHelper.class);
+
+ private CopyManagerHelper() {
+ }
+
+ private static CopyManagerHelper singleton = new CopyManagerHelper();
+
+ public static CopyManagerHelper getCopyManagerHelper() {
+ return singleton;
+ }
+
+ private static CopyManager copyManager;
+ private static Connection connection;
+ private static ReentrantLock lock = new ReentrantLock();
+
+ public static void connectToCenterDB(DbMsg dbMsg) {
+ try {
+ Class.forName("org.opengauss.Driver");
+ String url = MessageFormat.format("jdbc:opengauss://{0}:{1}/postgres", dbMsg.getIp(), dbMsg.getPort());
+ connection = DriverManager.getConnection(url, dbMsg.getUser(), dbMsg.getPassword());
+ copyManager = new CopyManager((BaseConnection) connection);
+ } catch (ClassNotFoundException | SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void releaseConnection() {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public boolean uploadData(String tableName, String data) {
+ if (data == null || "".equals(data)) {
+ return true;
+ }
+ lock.lock();
+ try {
+ copyIn(tableName, data);
+ return true;
+ } catch (SQLException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (Throwable t) {
+ // todo handle duplicate upload error
+ t.printStackTrace();
+ } finally {
+ lock.unlock();
+ }
+ return false;
+ }
+
+ private void copyIn(String tableName, String data) throws SQLException, IOException {
+ long t = System.currentTimeMillis();
+ copyManager.copyIn("COPY " + tableName + " FROM STDIN WITH (FORMAT CSV)", new StringReader(data));
+ log.debug("copy manager cost {} ms", System.currentTimeMillis() - t);
+ }
+
+ public boolean uploadDataNoLock(String tableName, String data) {
+ if (data == null || "".equals(data)) {
+ return true;
+ }
+ try {
+ copyIn(tableName, data);
+ return true;
+ } catch (SQLException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (Throwable t) {
+ // todo handle duplicate upload error
+ t.printStackTrace();
+ }
+ return false;
+ }
+
+
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/center/MultiCopyManagerHelper.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/center/MultiCopyManagerHelper.java
new file mode 100644
index 0000000000000000000000000000000000000000..f9cf6f82cafb791807eb2162e8a5d3a1349b2088
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/center/MultiCopyManagerHelper.java
@@ -0,0 +1,106 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.center;
+
+import com.intarkdb.cloudedge.config.DbMsg;
+import com.intarkdb.cloudedge.config.SyncTableInfo;
+import org.opengauss.copy.CopyManager;
+import org.opengauss.core.BaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MultiCopyManagerHelper {
+ private static final Logger log = LoggerFactory.getLogger(MultiCopyManagerHelper.class);
+
+ private static String url;
+ private static String user;
+ private static String password;
+ private static Map connectionMap;
+
+ public static void init(DbMsg dbMsg, List syncTableInfos) {
+ try {
+ Class.forName("org.opengauss.Driver");
+
+ String target = "jdbc:opengauss://center_ip:center_port/postgres"
+ .replaceFirst("center_ip", dbMsg.getIp())
+ .replaceFirst("center_port", dbMsg.getPort());
+ url = target;
+ user = dbMsg.getUser();
+ password = dbMsg.getPassword();
+ connectionMap = new ConcurrentHashMap<>();
+
+ for (SyncTableInfo info : syncTableInfos) {
+ connectionMap.put(info.getTableName(), DriverManager.getConnection(url, user, password));
+ }
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ } catch (SQLException throwables) {
+ throwables.printStackTrace();
+ }
+ }
+
+ private final static int DUPLICATE_ERR_CODE = 16153;
+
+ public static boolean uploadData(String tableName, String data) {
+ if (data == null || "".equals(data)) {
+ return true;
+ }
+ long t = System.currentTimeMillis();
+ Connection connection = connectionMap.get(tableName);
+ if (connection == null) {
+ try {
+ connection = DriverManager.getConnection(url, user, password);
+ } catch (SQLException e) {
+ return false;
+ }
+ connectionMap.put(tableName, connection);
+ }
+ try {
+ CopyManager copyManager = new CopyManager((BaseConnection) connection);
+ copyManager.copyIn("COPY " + tableName + " FROM STDIN WITH (FORMAT CSV)", new StringReader(data));
+// log.debug("copy manager cost {} ms", System.currentTimeMillis() - t);
+ return true;
+ } catch (SQLException e) {
+ if ("Database connection failed when starting copy".equals(e.getMessage())) {
+ connectionMap.remove(tableName);
+ try {
+ connection.close();
+ } catch (SQLException e1) {
+ }
+ return false;
+ } else if (e.getErrorCode() == DUPLICATE_ERR_CODE) {
+ return true;
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (Throwable t1) {
+ // todo handle duplicate upload error
+ t1.printStackTrace();
+ }
+ return false;
+ }
+
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/common/Constant.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/common/Constant.java
new file mode 100644
index 0000000000000000000000000000000000000000..99c7040c0277defd08d715736d4f2c9f80b1f134
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/common/Constant.java
@@ -0,0 +1,66 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.common;
+
+public class Constant {
+
+ /**
+ * 单次读取数量
+ */
+ public static final int BATCH_SIZE = 500;
+
+ /**
+ * 扫描窗口宽度
+ */
+ public static final long SCAN_WINDOW_RANGE = 15 * 1000;
+
+ /**
+ * 乱序插入等待时间,读取时将只读取到当前时间戳 - 本值
+ */
+ public static final long SCAN_WAIT_RANGE = 15 * 1000;
+
+ public static final int FAIL_LIMIT = 10;
+
+ public static final String CREATE_SYNC_TABLE_SQL = "CREATE TABLE SYS_CLOUD_SYNC " +
+ "(id INTEGER, " +
+ "table_name varchar(100), " +
+ "sync_column varchar(100), " +
+ "type int, " +
+ "begin_timestamp long, " +
+ "end_timestamp long, " +
+ "node_id varchar(100), " +
+ "unique_column varchar(100), " +
+ "update_time long, " +
+ "PRIMARY KEY (id), " +
+ "unique (table_name))";
+
+ public static final String DESCRIBE_SYNC_TABLE_SQL = "describe SYS_CLOUD_SYNC";
+
+ public static final String SQL_SELECT_1 = "SELECT * FROM {0} where {1} > ? and {2} <= ? limit {3}";
+ public static final String SQL_SELECT_2 = "SELECT count(*) FROM {0} where {1} = ?";
+ public static final String SQL_SELECT_3 = "SELECT * FROM {0} where {1} = ? offset ? limit ?";
+
+ public static final String CENTER_IP = "center_ip";
+ public static final String CENTER_PORT = "center_port";
+ public static final String CENTER_USER = "center_user";
+ public static final String CENTER_PASSWORD = "center_password";
+
+ public static final String LOCAL_IP = "local_ip";
+ public static final String LOCAL_PORT = "local_port";
+ public static final String LOCAL_USER = "local_user";
+ public static final String LOCAL_PASSWORD = "local_password";
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/common/SyncException.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/common/SyncException.java
new file mode 100644
index 0000000000000000000000000000000000000000..00346c29a25e9ddaf263bf7b8ce55c6bd60486e8
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/common/SyncException.java
@@ -0,0 +1,25 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.common;
+
+public class SyncException extends Exception {
+ public SyncException(String message) {
+ super(message);
+ }
+
+
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/common/ThreadUncaughtExceptionHandler.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/common/ThreadUncaughtExceptionHandler.java
new file mode 100644
index 0000000000000000000000000000000000000000..571e257a315561543015c9e4a473c1635dc3367e
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/common/ThreadUncaughtExceptionHandler.java
@@ -0,0 +1,34 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.common;
+
+import org.slf4j.Logger;
+
+public class ThreadUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+ private Logger log;
+
+ public ThreadUncaughtExceptionHandler(Logger log) {
+ this.log = log;
+ }
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ log.error("thread run exception: {}", e.getMessage());
+ e.printStackTrace();
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/ConfigProperties.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/ConfigProperties.java
new file mode 100644
index 0000000000000000000000000000000000000000..ef9249cfd7d3ed6ea368a30f14720e6e0e660f1c
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/ConfigProperties.java
@@ -0,0 +1,66 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.config;
+
+import java.util.List;
+
+public class ConfigProperties {
+
+ // center database setting
+ private DbMsg centerDb;
+
+ // local database setting
+ private List localDb;
+
+ // sync table msg
+ private List syncTable;
+
+ // loop wait time (ms)
+ private Integer loopWait;
+
+ public DbMsg getCenterDb() {
+ return centerDb;
+ }
+
+ public void setCenterDb(DbMsg centerDb) {
+ this.centerDb = centerDb;
+ }
+
+ public List getLocalDb() {
+ return localDb;
+ }
+
+ public void setLocalDb(List localDb) {
+ this.localDb = localDb;
+ }
+
+ public List getSyncTable() {
+ return syncTable;
+ }
+
+ public void setSyncTable(List syncTable) {
+ this.syncTable = syncTable;
+ }
+
+ public Integer getLoopWait() {
+ return loopWait;
+ }
+
+ public void setLoopWait(Integer loopWait) {
+ this.loopWait = loopWait;
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/ConfigReader.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/ConfigReader.java
new file mode 100644
index 0000000000000000000000000000000000000000..f9f0f84b147720141cf139f1a3e569adbef69ab3
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/ConfigReader.java
@@ -0,0 +1,178 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.config;
+
+import com.intarkdb.cloudedge.common.SyncException;
+import com.intarkdb.cloudedge.tool.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+
+public class ConfigReader {
+
+ private static final Logger log = LoggerFactory.getLogger(ConfigReader.class);
+
+ /**
+ * Deprecated
+ * @param path
+ * @return
+ */
+ @Deprecated
+ public static Map readConfiguration(String path) {
+ if (path == null || "".equals(path)) {
+ String currentPath = System.getProperty("user.dir");
+ path = currentPath + "/conf.properties";
+ }
+ Map result = new HashMap<>();
+ try (BufferedReader reader = new BufferedReader(new FileReader(path))) {
+ String line;
+
+ while ((line = reader.readLine()) != null) {
+ handle(line.trim(), result);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+ /**
+ *
+ * @param line
+ * @param result
+ */
+ @Deprecated
+ private static void handle(String line, Map result) {
+ int commentIndex = line.indexOf("#");
+ if (commentIndex > -1) {
+ line = line.substring(0, commentIndex);
+ }
+ if (line.length() == 0) {
+ return;
+ }
+ String[] kvs = line.split("=");
+ result.put(kvs[0].trim(), kvs[1].trim());
+ }
+
+ /**
+ * read configurations from yaml file
+ * @return
+ */
+ public static ConfigProperties readConfigFromYaml() {
+ ConfigProperties configProperties = new ConfigProperties();
+
+ String configFile = System.getProperty("config_file");
+ if (StringUtil.isBlank(configFile)) {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ URL resourceUrl = classLoader.getResource("conf.yaml");
+ if (resourceUrl != null) {
+ configFile = resourceUrl.getPath();
+ } else {
+ configFile = System.getProperty("user.dir") + "\\src\\main\\resources\\conf.yaml";
+ }
+ }
+ Yaml yaml = new Yaml();
+ try (FileInputStream fileInputStream = new FileInputStream(configFile)) {
+ ConfigProperties config = yaml.loadAs(fileInputStream, ConfigProperties.class);
+
+ checkAndFixConfig(config);
+
+ return config;
+ } catch (IOException | SyncException e) {
+ // config file read failed
+ log.error("property file read failed!");
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ /**
+ * check if there are any required configurations not set
+ * apply the default value if there are available rules
+ * @param config
+ */
+ private static void checkAndFixConfig(ConfigProperties config) throws SyncException {
+ // check local db setting
+ List msgList = config.getLocalDb();
+ for (LocalDbMsg msg: msgList) {
+ checkDbMsg(msg);
+ checkNodeId(msg);
+ }
+
+ // check center db setting
+ checkDbMsg(config.getCenterDb());
+
+ // check sync table setting
+ List syncTableInfos = config.getSyncTable();
+ for (SyncTableInfo info : syncTableInfos) {
+ checkTableInfo(info);
+ }
+ }
+
+ private static void checkDbMsg(DbMsg msg) throws SyncException {
+ String ip = msg.getIp();
+ if (StringUtil.isBlank(ip) || !isValidIP(ip)) {
+ throw new SyncException("invalid ip: " + msg.getIp());
+ }
+ String port = msg.getPort();
+ if (StringUtil.isBlank(port) || !isValidPort(port)) {
+ throw new SyncException("invalid port: " + msg.getPort());
+ }
+ }
+
+ private static void checkNodeId(LocalDbMsg msg) {
+ String nodeId = msg.getNodeId();
+ if (StringUtil.isBlank(nodeId)) {
+ // set default node id using ip and port
+ msg.setNodeId(msg.getIp() + "_" + msg.getPort());
+ }
+ }
+
+ private static void checkTableInfo(SyncTableInfo info) throws SyncException {
+ String tableName = info.getTableName();
+ if (StringUtil.isBlank(tableName)) {
+ throw new SyncException("table name or sync column name missing");
+ }
+ String syncColumn = info.getSyncColumn();
+ if (StringUtil.isBlank(syncColumn)) {
+ throw new SyncException("table name or sync column name missing");
+ }
+ }
+
+ // ip and port pattern
+ private static final String IP_REGEX = "^((\\d{1,2}|1\\d{2}|2[0-4]\\d|25[0-5])\\.){3}(\\d{1,2}|1\\d{2}|2[0-4]\\d|25[0-5])$";
+ private static final String PORT_REGEX = "^\\d{1,5}$";
+
+ public static boolean isValidIP(String ip) {
+ return Pattern.matches(IP_REGEX, ip);
+ }
+
+ public static boolean isValidPort(String port) {
+ return Pattern.matches(PORT_REGEX, port);
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/ConfigWatchService.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/ConfigWatchService.java
new file mode 100644
index 0000000000000000000000000000000000000000..eb2d35909dbf7594a312034e23f7d23f3090356a
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/ConfigWatchService.java
@@ -0,0 +1,63 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.config;
+
+import com.intarkdb.cloudedge.tool.StringUtil;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+public class ConfigWatchService {
+
+ public static void configWatch() {
+ String configFile = System.getProperty("config.file");
+ if (StringUtil.isBlank(configFile)) {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ URL resourceUrl = classLoader.getResource("conf.yaml");
+ configFile = resourceUrl.getPath();
+ }
+ Path filePath = Paths.get(configFile);
+ try (WatchService watchService = FileSystems.getDefault().newWatchService()) {
+ Path directory = filePath.getParent();
+ directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
+
+ WatchKey watchKey;
+ while ((watchKey = watchService.take()) != null) {
+ checkWatch(watchKey, filePath);
+ }
+ } catch (IOException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static void checkWatch(WatchKey watchKey, Path filePath) {
+ for (WatchEvent> event : watchKey.pollEvents()) {
+ if (event.context().equals(filePath.getFileName())) {
+ // 文件被修改,执行相应的操作
+ ConfigProperties newProperties = ConfigReader.readConfigFromYaml();
+ }
+ }
+ watchKey.reset();
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/DbMsg.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/DbMsg.java
new file mode 100644
index 0000000000000000000000000000000000000000..f856d4b9845147413d685a06d4e6814fe22eff74
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/DbMsg.java
@@ -0,0 +1,56 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.config;
+
+public class DbMsg {
+ private String ip;
+ private String port;
+ private String user = "";
+ private String password = "";
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public String getPort() {
+ return port;
+ }
+
+ public void setPort(String port) {
+ this.port = port;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/LocalDbMsg.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/LocalDbMsg.java
new file mode 100644
index 0000000000000000000000000000000000000000..b039c6f983dad86d8357059014e56d1a623950d1
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/LocalDbMsg.java
@@ -0,0 +1,29 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.config;
+
+public class LocalDbMsg extends DbMsg {
+ private String nodeId = "";
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/SyncTableInfo.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/SyncTableInfo.java
new file mode 100644
index 0000000000000000000000000000000000000000..47194cb4604713bc680905279f07a724e406de7b
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/config/SyncTableInfo.java
@@ -0,0 +1,58 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.config;
+
+public class SyncTableInfo {
+ /**
+ * 表名
+ */
+ private String tableName;
+
+ /**
+ * 同步依据列,必须是该时序表的时间列
+ */
+ private String syncColumn;
+
+ /**
+ * 唯一列,非必填
+ */
+ private String uniqueColumn;
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getSyncColumn() {
+ return syncColumn;
+ }
+
+ public void setSyncColumn(String syncColumn) {
+ this.syncColumn = syncColumn;
+ }
+
+ public String getUniqueColumn() {
+ return uniqueColumn;
+ }
+
+ public void setUniqueColumn(String uniqueColumn) {
+ this.uniqueColumn = uniqueColumn;
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/CsvResult.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/CsvResult.java
new file mode 100644
index 0000000000000000000000000000000000000000..88fe6825bc9a6fd3657503927e55637264562b7d
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/CsvResult.java
@@ -0,0 +1,54 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.local;
+
+public class CsvResult {
+
+ private int count;
+ private String date;
+ private String resultString;
+
+ public CsvResult(int count, String date, String resultString) {
+ this.count = count;
+ this.date = date;
+ this.resultString = resultString;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+
+ public String getDate() {
+ return date;
+ }
+
+ public void setDate(String date) {
+ this.date = date;
+ }
+
+ public String getResultString() {
+ return resultString;
+ }
+
+ public void setResultString(String resultString) {
+ this.resultString = resultString;
+ }
+}
\ No newline at end of file
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/LastDateInfo.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/LastDateInfo.java
new file mode 100644
index 0000000000000000000000000000000000000000..a0f5601617c88b186cc9321e6b30df0daadfa809
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/LastDateInfo.java
@@ -0,0 +1,44 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.local;
+
+public class LastDateInfo {
+
+ String lastDate;
+ int lastDateCount;
+
+ public LastDateInfo() {
+ this.lastDate = null;
+ this.lastDateCount = 0;
+ }
+
+ public String getLastDate() {
+ return lastDate;
+ }
+
+ public void setLastDate(String lastDate) {
+ this.lastDate = lastDate;
+ }
+
+ public int getLastDateCount() {
+ return lastDateCount;
+ }
+
+ public void setLastDateCount(int lastDateCount) {
+ this.lastDateCount = lastDateCount;
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/LocalDBReader.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/LocalDBReader.java
new file mode 100644
index 0000000000000000000000000000000000000000..1d8eabd8632f586b2623ecbbf3de794369b0e57a
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/LocalDBReader.java
@@ -0,0 +1,249 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.local;
+
+import com.intarkdb.cloudedge.common.Constant;
+import com.intarkdb.cloudedge.config.LocalDbMsg;
+import com.intarkdb.cloudedge.config.SyncTableInfo;
+import com.intarkdb.cloudedge.mission.AsyncUploadMission;
+import com.intarkdb.cloudedge.mission.SimpleUploadMission;
+import com.intarkdb.cloudedge.mission.SyncTableControlInfo;
+import com.intarkdb.cloudedge.mission.UploadMission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+public class LocalDBReader {
+
+ private static final Logger log = LoggerFactory.getLogger(LocalDBReader.class);
+
+ public LocalDBReader(LocalDbMsg localDbMsg) {
+ this.localDbMsg = localDbMsg;
+ String projectRootPath = System.getProperty("user.dir");
+ try {
+ Class.forName("org.instardb.jdbc.InstarDriver");
+ String url = "jdbc:instardb://local_ip:local_port"
+ .replaceAll("local_ip", localDbMsg.getIp())
+ .replaceAll("local_port", localDbMsg.getPort());
+ connection = DriverManager.getConnection(url, localDbMsg.getUser(), localDbMsg.getPassword());
+ } catch (SQLException | ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected Connection connection;
+
+ protected LocalDbMsg localDbMsg;
+
+
+ /**
+ * init the sync manage table
+ */
+ public void initManageTable() {
+ try (Statement statement = connection.createStatement();
+ ResultSet i = statement.executeQuery(Constant.DESCRIBE_SYNC_TABLE_SQL)) {
+
+ return;
+ } catch (SQLException e) {
+ log.info("Manage table not existed, build manage table");
+ try (Statement statement = connection.createStatement();
+ ResultSet j = statement.executeQuery(Constant.CREATE_SYNC_TABLE_SQL)) {
+ log.debug("create manage table");
+ } catch (SQLException e1) {
+ log.error("create Manage table error");
+ e1.printStackTrace();
+ }
+ }
+
+ }
+
+ public void initSimpleMissionQueue(ConcurrentLinkedQueue missionQueue, List syncTableInfo, String nodeId) {
+
+ Map tableMap = syncTableInfo.stream().collect(Collectors.toMap(SyncTableInfo::getTableName, a -> a));
+
+ try (
+ Statement statement = connection.createStatement()) {
+
+ ResultSet resultSet = statement.executeQuery("SELECT table_name, sync_column, begin_timestamp, end_timestamp from SYS_CLOUD_SYNC");
+ while (resultSet.next()) {
+ String tableName = resultSet.getString(1);
+ String syncColumn = resultSet.getString(2);
+ Long beginTime = resultSet.getLong(3);
+ Long endTime = resultSet.getLong(4);
+
+ SyncTableControlInfo controlMsg = new SyncTableControlInfo(tableName, syncColumn, beginTime, endTime, nodeId);
+ missionQueue.offer(new SimpleUploadMission(controlMsg));
+ tableMap.remove(tableName);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ // new table config to upload
+ int id = missionQueue.size();
+ Long endTimeStamp = (Instant.now().toEpochMilli() - Constant.SCAN_WAIT_RANGE) * 1000;
+ for (Map.Entry entry : tableMap.entrySet()) {
+ try (PreparedStatement prepareStatement = connection.prepareStatement("insert into SYS_CLOUD_SYNC(id, type, table_name, sync_column, begin_timestamp, end_timestamp, node_id) values (?, ?, ?, ?, ?, ?, ?)")) {
+ prepareStatement.setInt(1, id++);
+ prepareStatement.setInt(2, 1);
+ prepareStatement.setString(3, entry.getKey());
+ prepareStatement.setString(4, entry.getValue().getSyncColumn());
+ prepareStatement.setLong(5, 0);
+ prepareStatement.setLong(6, endTimeStamp);
+ prepareStatement.setString(7, ""); // todo node id
+ int i = prepareStatement.executeUpdate();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ // add to mission queue
+ SyncTableControlInfo controlMsg = new SyncTableControlInfo(entry.getKey(), entry.getValue().getSyncColumn(), 0L, endTimeStamp, nodeId);
+ missionQueue.offer(new SimpleUploadMission(controlMsg));
+ }
+
+ }
+
+ public void initAsyncMissionQueue(ConcurrentLinkedQueue missionQueue, List syncTableInfo, String nodeId) {
+
+ Map tableMap = syncTableInfo.stream().collect(Collectors.toMap(SyncTableInfo::getTableName, a -> a));
+
+ try (
+ Statement statement = connection.createStatement()) {
+
+ ResultSet resultSet = statement.executeQuery("SELECT table_name, sync_column, begin_timestamp, end_timestamp from SYS_CLOUD_SYNC");
+ while (resultSet.next()) {
+ String tableName = resultSet.getString(1);
+ String syncColumn = resultSet.getString(2);
+ Long beginTime = resultSet.getLong(3);
+ Long endTime = resultSet.getLong(4);
+
+ SyncTableControlInfo controlMsg = new SyncTableControlInfo(tableName, syncColumn, beginTime, endTime, nodeId);
+ missionQueue.offer(new AsyncUploadMission(controlMsg));
+ tableMap.remove(tableName);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ // new table config to upload
+ int id = missionQueue.size();
+ Long endTimeStamp = Instant.now().toEpochMilli() - Constant.SCAN_WAIT_RANGE;
+ for (Map.Entry entry : tableMap.entrySet()) {
+ try (PreparedStatement prepareStatement = connection.prepareStatement("insert into SYS_CLOUD_SYNC(id, type, table_name, sync_column, begin_timestamp, end_timestamp, node_id) values (?, ?, ?, ?, ?, ?, ?)")) {
+ prepareStatement.setInt(1, id++);
+ prepareStatement.setInt(2, 1);
+ prepareStatement.setString(3, entry.getKey());
+ prepareStatement.setString(4, entry.getValue().getSyncColumn());
+ prepareStatement.setLong(5, 0);
+ prepareStatement.setLong(6, endTimeStamp);
+ prepareStatement.setString(7, ""); // todo node id
+ int i = prepareStatement.executeUpdate();
+ // add to mission queue
+ SyncTableControlInfo controlMsg = new SyncTableControlInfo(entry.getKey(), entry.getValue().getSyncColumn(), 0L, endTimeStamp, nodeId);
+ missionQueue.offer(new AsyncUploadMission(controlMsg));
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ public ResultSet readData(String tableName, String columnName, String lastSyncTime, String nowDate, int batchSize) {
+ String sql = MessageFormat.format(Constant.SQL_SELECT_1, tableName, columnName, columnName, batchSize);
+ try (PreparedStatement prepareStatement = connection.prepareStatement(sql)) {
+
+ prepareStatement.setString(1, lastSyncTime);
+ prepareStatement.setString(2, nowDate);
+
+ return prepareStatement.executeQuery();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ public List> readData(String tableName, String columnName, String timeStamp, int beginIndex, int batchSize) {
+ String countSql = MessageFormat.format(Constant.SQL_SELECT_2, tableName, columnName);
+ String querySql = MessageFormat.format(Constant.SQL_SELECT_3, tableName, columnName);
+ try (PreparedStatement countStatement = connection.prepareStatement(countSql);
+ PreparedStatement queryStatement = connection.prepareStatement(querySql)) {
+ countStatement.setString(1, timeStamp);
+ ResultSet resultSet = countStatement.executeQuery();
+ int count = ResultParser.countParse(resultSet);
+
+ List> result = new ArrayList<>();
+
+ // 分页将等于右闭区间的数据全读取
+ for (int i = beginIndex; i < count; i += batchSize) {
+ queryStatement.setString(1, timeStamp);
+ queryStatement.setInt(2, i);
+ queryStatement.setInt(3, batchSize);
+ ResultSet rs = queryStatement.executeQuery();
+ result.addAll(ResultParser.simpleParse(tableName, columnName, rs));
+ }
+ return result;
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ public boolean updateSyncTime(String tableName, long beginTime, long endTime) {
+ try (PreparedStatement prepareStatement = connection.prepareStatement("UPDATE SYS_CLOUD_SYNC set begin_timestamp = ?, end_timestamp = ? where table_name = ?")) {
+ prepareStatement.setLong(1, beginTime);
+ prepareStatement.setLong(2, endTime);
+ prepareStatement.setString(3, tableName);
+ int i = prepareStatement.executeUpdate();
+ return true;
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+
+ public static void main(String[] args) throws InterruptedException {
+ int total = 100; // 总进度
+ int progress = 0; // 当前进度
+
+ while (progress <= total) {
+ System.out.print("\r进度: " + progress + "%"); // \r 会将光标移动到行首,从而实现覆盖打印的效果
+ progress += 10;
+ Thread.sleep(500); // 暂停 500 毫秒以模拟进度更新
+ }
+ }
+
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/LocalReadResult.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/LocalReadResult.java
new file mode 100644
index 0000000000000000000000000000000000000000..fe3ce4150335a5604c386480e9f0bb1c909205cd
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/LocalReadResult.java
@@ -0,0 +1,73 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.local;
+
+import java.util.List;
+
+public class LocalReadResult {
+ /**
+ * 结果值
+ */
+ private List> resultStringList;
+
+ /**
+ * 本次读取最后一个时间戳
+ */
+ private String date;
+
+ /**
+ * 本次读取跟最后一个时间戳一致的数据行数
+ */
+ private int sameDateCount;
+
+ /**
+ * 本次读取结果数
+ */
+ private int count;
+
+ public List> getResultStringList() {
+ return resultStringList;
+ }
+
+ public void setResultStringList(List> resultStringList) {
+ this.resultStringList = resultStringList;
+ }
+
+ public String getDate() {
+ return date;
+ }
+
+ public void setDate(String date) {
+ this.date = date;
+ }
+
+ public int getSameDateCount() {
+ return sameDateCount;
+ }
+
+ public void setSameDateCount(int sameDateCount) {
+ this.sameDateCount = sameDateCount;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/ResultParser.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/ResultParser.java
new file mode 100644
index 0000000000000000000000000000000000000000..48fdfefe67a0cdddd4bd4a30ce117c8edfa4d09c
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/local/ResultParser.java
@@ -0,0 +1,295 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.local;
+
+import org.opengauss.util.csv.CSVWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+public class ResultParser {
+
+ private static final Logger log = LoggerFactory.getLogger(ResultParser.class);
+
+ /**
+ * cache the trans methods in a map
+ */
+ private static HashMap> parserMap = new HashMap<>();
+ private static HashMap dateMap = new HashMap<>();
+
+ /**
+ * get the trans method of each column, based on the msg of metadata
+ *
+ * @param resultSet
+ * @return
+ */
+ private static LinkedList initParserChain(String tableName, String columnName, ResultSet resultSet) throws SQLException, NoSuchMethodException {
+ LinkedList result = new LinkedList<>();
+ Class> resultSetClass = ResultSet.class;
+ int columnIndex = -1;
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ int columnCount = metaData.getColumnCount();
+ for (int i = 1; i <= columnCount; i++) {
+ if (columnIndex == -1 && columnName.equals(metaData.getColumnName(i))) {
+ columnIndex = i;
+ dateMap.put(tableName, columnIndex);
+ }
+ switch (metaData.getColumnType(i)) {
+ case Types.INTEGER:
+ result.add(resultSetClass.getDeclaredMethod("getInt", int.class));
+ break;
+ case Types.BIGINT:
+ result.add(resultSetClass.getDeclaredMethod("getLong", int.class));
+ break;
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGNVARCHAR:
+ result.add(resultSetClass.getDeclaredMethod("getString", int.class));
+ break;
+// case Types.TINYINT:
+// case Types.SMALLINT:
+// result.add(resultSetClass.getDeclaredMethod("getShort", int.class));
+// break;
+// case Types.FLOAT:
+// result.add(resultSetClass.getDeclaredMethod("getFloat", int.class));
+// break;
+// case Types.DECIMAL:
+// result.add(resultSetClass.getDeclaredMethod("getBigDecimal", int.class));
+// break;
+// case Types.DATE:
+// result.add(resultSetClass.getDeclaredMethod("getTime", int.class));
+// break;
+// case Types.TIMESTAMP:
+// result.add(resultSetClass.getDeclaredMethod("getTimestamp", int.class));
+// break;
+// case Types.BOOLEAN:
+// result.add(resultSetClass.getDeclaredMethod("getBoolean", int.class));
+// break;
+ default:
+// System.out.println("unknown type");
+ result.add(resultSetClass.getDeclaredMethod("getString", int.class));
+ }
+ }
+ parserMap.put(tableName, result);
+ return result;
+ }
+
+ public static int countParse(ResultSet resultSet) {
+ try {
+ if (resultSet.next()) {
+ return resultSet.getInt(1);
+ }
+ } catch (SQLException e) {
+ log.error("count sql parse error!");
+ e.printStackTrace();
+ }
+ return 0;
+ }
+
+ /**
+ * 读取时间戳正好等于右闭区间的情况
+ * 不需要考虑读取时间戳右闭区间没读完的情况
+ * @param tableName
+ * @param columnName
+ * @param resultSet
+ * @return
+ */
+ public static List> simpleParse(String tableName, String columnName, ResultSet resultSet) {
+ // get the trans method from the cache
+ LinkedList parserChain = parserMap.get(tableName);
+ // build it if it doesn't exist
+ if (parserChain == null) {
+ try {
+ parserChain = initParserChain(tableName, columnName, resultSet);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ } catch (NoSuchMethodException e) {
+ e.printStackTrace();
+ }
+ }
+ int colCnt = parserChain.size();
+ int rowCnt = 0;
+ List> listList = new ArrayList<>();
+ try {
+ while (resultSet.next()) {
+ List line = new ArrayList<>(colCnt);
+ int column = 1;
+ for (Method m : parserChain) {
+ Object o = m.invoke(resultSet, column);
+ if (o == null) {
+ line.add("");
+ } else {
+ line.add(o + "");
+ }
+ column++;
+ }
+ listList.add(line);
+ rowCnt++;
+ }
+ return listList;
+ } catch (IllegalAccessException | SQLException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ /**
+ * 结果转成列表格式待进一步处理
+ * @param tableName
+ * @param columnName
+ * @param resultSet
+ * @return
+ */
+ public static LocalReadResult commonParse(String tableName, String columnName, ResultSet resultSet)
+ throws SQLException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ // get the trans method from the cache
+ LinkedList parserChain = parserMap.get(tableName);
+ // build it if it doesn't exist
+ if (parserChain == null) {
+ parserChain = initParserChain(tableName, columnName, resultSet);
+ }
+ int colCnt = parserChain.size();
+ int rowCnt = 0;
+ int dateIndex = dateMap.get(tableName);
+ LastDateInfo lastDateInfo = new LastDateInfo();
+ LocalReadResult result = new LocalReadResult();
+ List> listList = new ArrayList<>();
+ while (resultSet.next()) {
+ List line = new ArrayList<>(colCnt);
+ int column = 1;
+ for (Method m : parserChain) {
+ Object o = m.invoke(resultSet, column);
+ if (o == null) {
+ line.add("");
+ } else {
+ line.add(o + "");
+ }
+ check(column, dateIndex, line, lastDateInfo);
+ column++;
+ }
+ listList.add(line);
+ rowCnt++;
+ }
+ result.setResultStringList(listList);
+ result.setDate(lastDateInfo.getLastDate());
+ result.setSameDateCount(lastDateInfo.getLastDateCount());
+ result.setCount(rowCnt);
+ return result;
+ }
+
+ private static void check(int column, int dateIndex, List line, LastDateInfo lastDateInfo) {
+ if (column == dateIndex) {
+ String thisDate = line.get(column - 1);
+ if (thisDate.equals(lastDateInfo.getLastDate())) {
+ int lastDateCount = lastDateInfo.getLastDateCount();
+ lastDateInfo.setLastDateCount(lastDateCount + 1);
+ } else {
+ lastDateInfo.setLastDate(thisDate);
+ lastDateInfo.setLastDateCount(1);
+ }
+ }
+ }
+
+ /**
+ * 将结果转成csv格式
+ * @return
+ */
+ public static String listParseToCsv(List> list, String nodeId) {
+ StringWriter stringWriter = new StringWriter();
+ CSVWriter csvWriter = new CSVWriter(stringWriter);
+ for (List l : list) {
+ l.add(0, nodeId);
+ csvWriter.writeNext(l.toArray(new String[l.size()]));
+ }
+ return stringWriter.toString();
+ }
+
+ /**
+ * 结果直接转成csv格式
+ * @param tableName
+ * @param columnName
+ * @param resultSet
+ * @return
+ */
+ @Deprecated
+ public static CsvResult csvParse(String tableName, String columnName, ResultSet resultSet) {
+ // get the trans method from the cache
+ LinkedList parserChain = parserMap.get(tableName);
+ // build it if it doesn't exist
+ if (parserChain == null) {
+ try {
+ parserChain = initParserChain(tableName, columnName, resultSet);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ } catch (NoSuchMethodException e) {
+ e.printStackTrace();
+ }
+ }
+ StringWriter stringWriter = new StringWriter();
+ CSVWriter csvWriter = new CSVWriter(stringWriter);
+ int colCnt = parserChain.size();
+ int rowCnt = 0;
+ int dateIndex = dateMap.get(tableName);
+ String lastDate = null;
+ try {
+ while (resultSet.next()) {
+ String[] line = new String[colCnt];
+ lastDate = runParser(parserChain, line, resultSet, dateIndex, lastDate);
+ csvWriter.writeNext(line);
+ rowCnt++;
+ }
+ return new CsvResult(rowCnt, lastDate, stringWriter.toString());
+ } catch (IllegalAccessException | SQLException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ } finally {
+ try {
+ csvWriter.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ return null;
+ }
+
+ private static String runParser(LinkedList parserChain, String[] line, ResultSet resultSet, int dateIndex, String lastDate) throws InvocationTargetException, IllegalAccessException {
+ int column = 1;
+ for (Method m : parserChain) {
+ line[column - 1] = m.invoke(resultSet, column) + "";
+ if (column == dateIndex) {
+ lastDate = line[column - 1];
+ }
+ column++;
+ }
+ return lastDate;
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/AsyncExecutor.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/AsyncExecutor.java
new file mode 100644
index 0000000000000000000000000000000000000000..5573a5e8073e0fd6e4c7417d596cd3b43dadab62
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/AsyncExecutor.java
@@ -0,0 +1,29 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.mission;
+
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class AsyncExecutor {
+ private static ExecutorService executor = Executors.newFixedThreadPool(6);
+
+ public static void run(Runnable runnable) {
+ executor.execute(runnable);
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/AsyncMissionUploadManagerImpl.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/AsyncMissionUploadManagerImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..7c9df22eabd70f98d5f746895aa940e9ce0e3803
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/AsyncMissionUploadManagerImpl.java
@@ -0,0 +1,179 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.mission;
+
+import com.intarkdb.cloudedge.center.MultiCopyManagerHelper;
+import com.intarkdb.cloudedge.common.Constant;
+import com.intarkdb.cloudedge.common.ThreadUncaughtExceptionHandler;
+import com.intarkdb.cloudedge.config.ConfigProperties;
+import com.intarkdb.cloudedge.config.LocalDbMsg;
+import com.intarkdb.cloudedge.local.LocalDBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AsyncMissionUploadManagerImpl implements UploadMissionManager {
+
+ private static final Logger log = LoggerFactory.getLogger(AsyncMissionUploadManagerImpl.class);
+
+ private ConcurrentLinkedQueue missionQueue;
+
+ private ConcurrentHashMap readerMap;
+
+ public AtomicLong uploadCount = new AtomicLong(0);
+
+ public AtomicLong readCount = new AtomicLong(0);
+
+ private ScheduledExecutorService executorService;
+
+ private boolean closeFlag = false;
+
+ private Integer loopWait;
+
+
+ /**
+ * init the mission
+ */
+ public void init(ConfigProperties conf) {
+ readerMap = new ConcurrentHashMap<>();
+ // init mission
+ missionQueue = new ConcurrentLinkedQueue<>();
+
+ int coreSize = 0;
+
+ // init copyManager
+// CopyManagerHelper.connectToCenterDB(conf.getCenterDb());
+ MultiCopyManagerHelper.init(conf.getCenterDb(), conf.getSyncTable());
+
+ // get access to local instarDB
+ for (LocalDbMsg localDbMsg: conf.getLocalDb()) {
+ LocalDBReader reader = new LocalDBReader(localDbMsg);
+ // check if the sync manage table exists, if not, build it.
+ reader.initManageTable();
+ readerMap.put(localDbMsg.getNodeId(), reader);
+ reader.initAsyncMissionQueue(missionQueue, conf.getSyncTable(), localDbMsg.getNodeId());
+ coreSize += conf.getSyncTable().size();
+ }
+
+ // init thread pool
+ executorService = Executors.newScheduledThreadPool(coreSize);
+
+ loopWait = conf.getLoopWait();
+ }
+
+ @Override
+ public void refreshConfig(ConfigProperties conf) {
+ // todo
+ // shut down old ScheduledExecutorService
+
+ // initialize and start new ScheduledExecutorService
+
+ }
+
+ /**
+ * add mission to list
+ * @return
+ */
+ @Override
+ public boolean addMission(SimpleUploadMission missionMsg) {
+ return missionQueue.offer(missionMsg);
+ }
+
+ @Override
+ public void start() {
+ speedPrint();
+ uploadLoop();
+ }
+
+ @Override
+ public void close() {
+ closeFlag = true;
+ }
+
+ public void uploadLoop() {
+ while (!closeFlag) {
+ if (missionQueue.isEmpty()) {
+ try {
+ Thread.sleep(loopWait);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ UploadMission msg;
+ while ((msg = missionQueue.poll()) != null) {
+ handleMission(msg, missionQueue);
+ }
+ }
+ }
+ executorService.shutdown();
+ }
+
+ private void handleMission(UploadMission mission, ConcurrentLinkedQueue missionQueue) {
+
+ Long now = Instant.now().toEpochMilli();
+ Long delayTime = Math.max(mission.getControlMsg().getEndTime() / 1000 + Constant.SCAN_WAIT_RANGE - now, 0);
+ delayTime = Math.max(delayTime, Math.min(mission.getFailCount(), Constant.FAIL_LIMIT) * 5 * 1000);
+ // put the mission to delay executorService
+ executorService.schedule(() -> {
+ long begin = 0;
+ try {
+ int handleNum = mission.readAndUpload(readerMap.get(mission.getControlMsg().getNodeId()), mission.getControlMsg(), uploadCount);
+ readCount.getAndAdd(handleNum);
+ } catch (Throwable t) {
+ log.error("handle mission error!" + mission.toString());
+ t.printStackTrace();
+ mission.addFailCount();
+ } finally {
+ // 将更新完的任务放回列表,下次处理
+ long end = Instant.now().toEpochMilli();
+ missionQueue.offer(mission);
+ }
+ }, delayTime, TimeUnit.MILLISECONDS);
+ }
+
+
+ private void speedPrint() {
+ Thread thread = new Thread(() -> {
+ long lastReadCount = 0;
+ long lastUploadCount = 0;
+ int secCount = 1;
+ while (!closeFlag) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ long thisUploadCount = uploadCount.get();
+ long thisReadCount = readCount.get();
+ log.info("read count: " + thisReadCount + " read speed: " + (thisReadCount - lastReadCount) + " avg speed: " + (thisReadCount / secCount));
+ log.info("upload count: " + thisUploadCount + " upload speed: " + (thisUploadCount - lastUploadCount) + " avg speed: " + (thisUploadCount / secCount));
+ lastReadCount = thisReadCount;
+ lastUploadCount = thisUploadCount;
+ secCount++;
+ }
+ });
+ thread.setUncaughtExceptionHandler(new ThreadUncaughtExceptionHandler(log));
+ thread.start();
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/AsyncUploadMission.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/AsyncUploadMission.java
new file mode 100644
index 0000000000000000000000000000000000000000..3e0fe51d41dc2c2c5a749ce708a7f98eaeaea0e6
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/AsyncUploadMission.java
@@ -0,0 +1,117 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.mission;
+
+import com.intarkdb.cloudedge.center.MultiCopyManagerHelper;
+import com.intarkdb.cloudedge.common.Constant;
+import com.intarkdb.cloudedge.local.LocalDBReader;
+import com.intarkdb.cloudedge.local.LocalReadResult;
+import com.intarkdb.cloudedge.local.ResultParser;
+import com.intarkdb.cloudedge.tool.TimeParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AsyncUploadMission extends UploadMission {
+ private static final Logger log = LoggerFactory.getLogger(AsyncUploadMission.class);
+
+ public AsyncUploadMission(SyncTableControlInfo controlMsg) {
+ super(controlMsg);
+ }
+
+ /**
+ *
+ * @param reader
+ * @param controlMsg
+ * @return 返回本次已读取数据
+ */
+ @Override
+ public Integer readAndUpload(LocalDBReader reader, SyncTableControlInfo controlMsg, AtomicLong uploadCounter) {
+ // default value of next read begin timestamp
+ long updateBeginTimestamp = controlMsg.getEndTime();
+ String nowDate = TimeParser.microsecondToString(updateBeginTimestamp);
+
+// CopyManagerHelper copyManagerHelper = CopyManagerHelper.getCopyManagerHelper();
+
+ String lastSyncDate = TimeParser.microsecondToString(controlMsg.getBeginTime());
+
+ // read data from table
+ ResultSet dataRaw = reader.readData(controlMsg.getTableName(), controlMsg.getSyncColumn(), lastSyncDate, nowDate, Constant.BATCH_SIZE);
+
+ LocalReadResult localReadResult = null;
+ try {
+ localReadResult = ResultParser.commonParse(controlMsg.getTableName(), controlMsg.getSyncColumn(), dataRaw);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ } catch (NoSuchMethodException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ }
+
+ List> resultStringList = localReadResult.getResultStringList();
+
+ // 右闭区间是否已经读完?
+ if (localReadResult.getCount() == Constant.BATCH_SIZE) {
+ List> lastDateResult = reader.readData(controlMsg.getTableName(), controlMsg.getSyncColumn(),
+ localReadResult.getDate(), localReadResult.getSameDateCount(), Constant.BATCH_SIZE);
+ resultStringList.addAll(lastDateResult);
+ updateBeginTimestamp = TimeParser.stringToMicrosecond(localReadResult.getDate());
+ }
+
+ // upload to center db
+ asyncUpload(reader, resultStringList, nowDate, uploadCounter);
+
+ // update control message
+ long scanEndTimestamp = (Instant.now().toEpochMilli() - Constant.SCAN_WAIT_RANGE) * 1000;
+ if (controlMsg.getEndTime() < scanEndTimestamp) {
+ controlMsg.setEndTime(scanEndTimestamp);
+ }
+ controlMsg.setBeginTime(updateBeginTimestamp);
+
+
+ long tt5 = Instant.now().toEpochMilli();
+
+ if (this.failCount > 0) {
+ this.failCount = 0;
+ }
+
+ return resultStringList.size();
+ }
+
+ private void asyncUpload(LocalDBReader reader, List> resultStringList, String nowDate, AtomicLong uploadCounter) {
+ AsyncExecutor.run(() -> {
+ String csvString = ResultParser.listParseToCsv(resultStringList, controlMsg.getNodeId());
+ if (!MultiCopyManagerHelper.uploadData(controlMsg.getTableName(), csvString)) {
+ log.warn("upload failed! table : " + controlMsg.getTableName() + " time: " + nowDate);
+ // todo 区分不同的返回异常
+ return ;
+ } else {
+ uploadCounter.getAndAdd(resultStringList.size());
+ reader.updateSyncTime(controlMsg.getTableName(), controlMsg.getBeginTime(), controlMsg.getEndTime());
+ }
+ });
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/SimpleMissionUploadManagerImpl.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/SimpleMissionUploadManagerImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..8c36444b00c13d497df8c3355715b07c6abe080d
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/SimpleMissionUploadManagerImpl.java
@@ -0,0 +1,175 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.mission;
+
+import com.intarkdb.cloudedge.center.MultiCopyManagerHelper;
+import com.intarkdb.cloudedge.common.Constant;
+import com.intarkdb.cloudedge.common.ThreadUncaughtExceptionHandler;
+import com.intarkdb.cloudedge.config.ConfigProperties;
+import com.intarkdb.cloudedge.config.LocalDbMsg;
+import com.intarkdb.cloudedge.local.LocalDBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SimpleMissionUploadManagerImpl implements UploadMissionManager {
+
+ private static final Logger log = LoggerFactory.getLogger(SimpleMissionUploadManagerImpl.class);
+
+ private ConcurrentLinkedQueue missionQueue;
+
+ private ConcurrentHashMap readerMap;
+
+ public AtomicLong uploadCount = new AtomicLong(0);
+
+ private ScheduledExecutorService executorService;
+
+ private boolean closeFlag = false;
+
+ private Integer loopWait;
+
+
+ /**
+ * init the mission
+ */
+ public void init(ConfigProperties conf) {
+ readerMap = new ConcurrentHashMap<>();
+ // init mission
+ missionQueue = new ConcurrentLinkedQueue<>();
+
+ int coreSize = 0;
+
+ // init copyManager
+// CopyManagerHelper.connectToCenterDB(conf.getCenterDb());
+ MultiCopyManagerHelper.init(conf.getCenterDb(), conf.getSyncTable());
+
+ // get access to local instarDB
+ for (LocalDbMsg localDbMsg: conf.getLocalDb()) {
+ LocalDBReader reader = new LocalDBReader(localDbMsg);
+ // check if the sync manage table exists, if not, build it.
+ reader.initManageTable();
+ readerMap.put(localDbMsg.getNodeId(), reader);
+ reader.initSimpleMissionQueue(missionQueue, conf.getSyncTable(), localDbMsg.getNodeId());
+ coreSize += conf.getSyncTable().size();
+ }
+
+ // init thread pool
+ executorService = Executors.newScheduledThreadPool(coreSize);
+
+ loopWait = conf.getLoopWait();
+ }
+
+ @Override
+ public void refreshConfig(ConfigProperties conf) {
+ // todo
+ // shut down old ScheduledExecutorService
+
+ // initialize and start new ScheduledExecutorService
+
+ }
+
+ /**
+ * add mission to list
+ * @return
+ */
+ @Override
+ public boolean addMission(SimpleUploadMission missionMsg) {
+ return missionQueue.offer(missionMsg);
+ }
+
+ @Override
+ public void start() {
+ speedPrint();
+ uploadLoop();
+ }
+
+ @Override
+ public void close() {
+ closeFlag = true;
+ }
+
+ public void uploadLoop() {
+ while (!closeFlag) {
+ if (missionQueue.isEmpty()) {
+ try {
+ Thread.sleep(loopWait);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ UploadMission msg;
+ while ((msg = missionQueue.poll()) != null) {
+ handleMission(msg, missionQueue);
+ }
+ }
+ }
+ executorService.shutdown();
+ }
+
+ private void handleMission(UploadMission mission, ConcurrentLinkedQueue missionQueue) {
+
+ Long now = Instant.now().toEpochMilli();
+ Long delayTime = Math.max(mission.getControlMsg().getEndTime() / 1000 + Constant.SCAN_WAIT_RANGE - now, 0);
+ delayTime = Math.max(delayTime, Math.min(mission.getFailCount(), Constant.FAIL_LIMIT) * 5 * 1000);
+ // put the mission to delay executorService
+ executorService.schedule(() -> {
+ long begin = 0;
+ try {
+ begin = Instant.now().toEpochMilli();
+ int handleNum = mission.readAndUpload(readerMap.get(mission.getControlMsg().getNodeId()), mission.getControlMsg(), null);
+ uploadCount.getAndAdd(handleNum);
+ } catch (Throwable t) {
+ log.error("handle mission error!" + mission.toString());
+ t.printStackTrace();
+ mission.addFailCount();
+ } finally {
+ // 将更新完的任务放回列表,下次处理
+ long end = Instant.now().toEpochMilli();
+// log.debug("mission update " + mission.getControlMsg().getNodeId() + " " + mission.getControlMsg().getTableName() + " cost " + (end - begin));
+ missionQueue.offer(mission);
+ }
+ }, delayTime, TimeUnit.MILLISECONDS);
+ }
+
+
+ private void speedPrint() {
+ Thread thread = new Thread(() -> {
+ long lastCount = 0;
+ int secCount = 1;
+ while (!closeFlag) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ long thisCount = uploadCount.get();
+ log.info("upload count: " + thisCount + " upload speed: " + (thisCount - lastCount) + " avg speed: " + (thisCount / secCount));
+ lastCount = thisCount;
+ secCount++;
+ }
+ });
+ thread.setUncaughtExceptionHandler(new ThreadUncaughtExceptionHandler(log));
+ thread.start();
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/SimpleUploadMission.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/SimpleUploadMission.java
new file mode 100644
index 0000000000000000000000000000000000000000..60208a4d74f815ed87b2df2d0f6817f1c32c7df5
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/SimpleUploadMission.java
@@ -0,0 +1,108 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.mission;
+
+import com.intarkdb.cloudedge.center.MultiCopyManagerHelper;
+import com.intarkdb.cloudedge.common.Constant;
+import com.intarkdb.cloudedge.local.LocalDBReader;
+import com.intarkdb.cloudedge.local.LocalReadResult;
+import com.intarkdb.cloudedge.local.ResultParser;
+import com.intarkdb.cloudedge.tool.TimeParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class SimpleUploadMission extends UploadMission {
+
+ private static final Logger log = LoggerFactory.getLogger(SimpleUploadMission.class);
+
+ public SimpleUploadMission(SyncTableControlInfo controlMsg) {
+ super(controlMsg);
+ }
+
+ /**
+ *
+ * @param reader
+ * @param controlMsg
+ * @return 返回本次已处理数据
+ */
+ @Override
+ public Integer readAndUpload(LocalDBReader reader, SyncTableControlInfo controlMsg, AtomicLong uploadCounter) {
+ // default value of next read begin timestamp
+ long updateBeginTimestamp = controlMsg.getEndTime();
+ String nowDate = TimeParser.microsecondToString(updateBeginTimestamp);
+
+// CopyManagerHelper copyManagerHelper = CopyManagerHelper.getCopyManagerHelper();
+
+ String lastSyncDate = TimeParser.microsecondToString(controlMsg.getBeginTime());
+
+ // read data from table
+ ResultSet dataRaw = reader.readData(controlMsg.getTableName(), controlMsg.getSyncColumn(), lastSyncDate, nowDate, Constant.BATCH_SIZE);
+
+ LocalReadResult localReadResult = null;
+ try {
+ localReadResult = ResultParser.commonParse(controlMsg.getTableName(), controlMsg.getSyncColumn(), dataRaw);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ } catch (NoSuchMethodException e) {
+ e.printStackTrace();
+ } catch (InvocationTargetException e) {
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ }
+
+ List> resultStringList = localReadResult.getResultStringList();
+
+ // 右闭区间是否已经读完?
+ if (localReadResult.getCount() == Constant.BATCH_SIZE) {
+ List> lastDateResult = reader.readData(controlMsg.getTableName(), controlMsg.getSyncColumn(),
+ localReadResult.getDate(), localReadResult.getSameDateCount(), Constant.BATCH_SIZE);
+ resultStringList.addAll(lastDateResult);
+ updateBeginTimestamp = TimeParser.stringToMicrosecond(localReadResult.getDate());
+ }
+
+ String csvString = ResultParser.listParseToCsv(resultStringList, controlMsg.getNodeId());
+ // upload to center db
+ if (!MultiCopyManagerHelper.uploadData(controlMsg.getTableName(), csvString)) {
+ log.warn("upload failed! table : " + controlMsg.getTableName() + " time: " + nowDate);
+ // todo 区分不同的返回异常
+ return 0;
+ }
+
+ // update control message
+ long scanEndTimestamp = Instant.now().toEpochMilli() - Constant.SCAN_WAIT_RANGE;
+ if (controlMsg.getEndTime() < scanEndTimestamp) {
+ controlMsg.setEndTime(scanEndTimestamp);
+ }
+ controlMsg.setBeginTime(updateBeginTimestamp);
+ reader.updateSyncTime(controlMsg.getTableName(), controlMsg.getBeginTime(), controlMsg.getEndTime());
+
+ if (this.failCount > 0) {
+ this.failCount = 0;
+ }
+
+ return resultStringList.size();
+ }
+
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/SyncTableControlInfo.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/SyncTableControlInfo.java
new file mode 100644
index 0000000000000000000000000000000000000000..f9cdc28dfecdf2768f8c2b6d9c9b8eb3d2304761
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/SyncTableControlInfo.java
@@ -0,0 +1,128 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.mission;
+
+import com.intarkdb.cloudedge.common.Constant;
+
+public class SyncTableControlInfo {
+ /**
+ * 表名
+ */
+ private String tableName;
+
+ /**
+ * 任务类型
+ */
+ private int type;
+
+ /**
+ * 时序表的时间列,由用户指定
+ */
+ private String syncColumn;
+
+ /**
+ * 同步任务扫描左区间(左开)
+ */
+ private Long beginTime;
+
+ /**
+ * 同步任务扫描右区间(右闭)
+ */
+ private Long endTime;
+
+ /**
+ * 节点id
+ */
+ private String nodeId;
+
+ /**
+ * 唯一列
+ */
+ private String uniqueColumn;
+
+ public SyncTableControlInfo(String tableName, String syncColumn, Long beginTime) {
+ this.tableName = tableName;
+ this.syncColumn = syncColumn;
+ this.beginTime = beginTime;
+ this.endTime = beginTime + Constant.SCAN_WINDOW_RANGE;
+ }
+
+ public SyncTableControlInfo(String tableName, String syncColumn, Long beginTime, Long endTime, String nodeId) {
+ this.tableName = tableName;
+ this.syncColumn = syncColumn;
+ this.beginTime = beginTime;
+ this.endTime = endTime;
+ this.nodeId = nodeId;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public void setType(int type) {
+ this.type = type;
+ }
+
+ public String getSyncColumn() {
+ return syncColumn;
+ }
+
+ public void setSyncColumn(String syncColumn) {
+ this.syncColumn = syncColumn;
+ }
+
+ public Long getBeginTime() {
+ return beginTime;
+ }
+
+ public void setBeginTime(Long beginTime) {
+ this.beginTime = beginTime;
+ }
+
+ public Long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(Long endTime) {
+ this.endTime = endTime;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String getUniqueColumn() {
+ return uniqueColumn;
+ }
+
+ public void setUniqueColumn(String uniqueColumn) {
+ this.uniqueColumn = uniqueColumn;
+ }
+
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/UploadMission.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/UploadMission.java
new file mode 100644
index 0000000000000000000000000000000000000000..0d07a0e009bf6989029cb1a96e20c872d660b043
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/UploadMission.java
@@ -0,0 +1,55 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.mission;
+
+import com.intarkdb.cloudedge.local.LocalDBReader;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class UploadMission {
+ protected SyncTableControlInfo controlMsg;
+
+ protected int failCount = 0;
+
+ public UploadMission(SyncTableControlInfo controlMsg) {
+ this.controlMsg = controlMsg;
+ }
+
+ public SyncTableControlInfo getControlMsg() {
+ return controlMsg;
+ }
+
+ public void setControlMsg(SyncTableControlInfo controlMsg) {
+ this.controlMsg = controlMsg;
+ }
+
+ public int getFailCount() {
+ return failCount;
+ }
+
+ public void setFailCount(int failCount) {
+ this.failCount = failCount;
+ }
+
+ public void addFailCount() {
+ if (this.failCount < Integer.MAX_VALUE) {
+ this.failCount++;
+ }
+ }
+
+ public abstract Integer readAndUpload(LocalDBReader reader, SyncTableControlInfo controlMsg, AtomicLong uploadCounter);
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/UploadMissionManager.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/UploadMissionManager.java
new file mode 100644
index 0000000000000000000000000000000000000000..a991e9e02444f21ede0437524da39ba12af2ea65
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/mission/UploadMissionManager.java
@@ -0,0 +1,51 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.mission;
+
+import com.intarkdb.cloudedge.config.ConfigProperties;
+
+public interface UploadMissionManager {
+
+ /**
+ * mission initialize
+ * @param conf
+ */
+ void init(ConfigProperties conf);
+
+ /**
+ * refresh config
+ * @param conf
+ */
+ void refreshConfig(ConfigProperties conf);
+
+ /**
+ * add mission to the running loop
+ * @param missionMsg
+ * @return
+ */
+ boolean addMission(SimpleUploadMission missionMsg);
+
+ /**
+ * begin to run the mission loop
+ */
+ void start();
+
+ /**
+ * stop the mission loop and close the resource
+ */
+ void close();
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/tool/StringUtil.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/tool/StringUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..a5fba8d519a71f23cc98d8e8803a87bdfb08c2a6
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/tool/StringUtil.java
@@ -0,0 +1,27 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.tool;
+
+public class StringUtil {
+ public static boolean isBlank(String s) {
+ if (null == s || "".equals(s)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+}
diff --git a/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/tool/TimeParser.java b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/tool/TimeParser.java
new file mode 100644
index 0000000000000000000000000000000000000000..52bc4a67719194d8a14aaa8a497dd090cadca991
--- /dev/null
+++ b/tools/data-sync-agent/src/main/java/com/intarkdb/cloudedge/tool/TimeParser.java
@@ -0,0 +1,131 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN
+ BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*/
+package com.intarkdb.cloudedge.tool;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+
+public class TimeParser {
+
+ private static final Logger log = LoggerFactory.getLogger(TimeParser.class);
+
+ private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private static DateTimeFormatter formatter1 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+ private static DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ private static DateTimeFormatter formatter3 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SS");
+ private static DateTimeFormatter formatter4 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S");
+ private static DateTimeFormatter formatter5 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+
+ private static final int FORMATTER_LENGTH = 19;
+ private static final int FORMATTER_LENGTH1 = 23;
+ private static final int FORMATTER_LENGTH2 = 10;
+ private static final int FORMATTER_LENGTH3 = 22;
+ private static final int FORMATTER_LENGTH4 = 21;
+ private static final int FORMATTER_LENGTH5 = 26;
+
+ public static LocalDateTime toLocalDatetime(String dateTimeString) {
+ LocalDateTime localDateTime = null;
+
+ try {
+ switch (dateTimeString.length()) {
+ case FORMATTER_LENGTH:
+ localDateTime = LocalDateTime.parse(dateTimeString, formatter);
+ break;
+ case FORMATTER_LENGTH1:
+ localDateTime = LocalDateTime.parse(dateTimeString, formatter1);
+ break;
+ case FORMATTER_LENGTH2:
+ localDateTime = LocalDateTime.parse(dateTimeString, formatter2);
+ break;
+ case FORMATTER_LENGTH3:
+ localDateTime = LocalDateTime.parse(dateTimeString, formatter3);
+ break;
+ case FORMATTER_LENGTH4:
+ localDateTime = LocalDateTime.parse(dateTimeString, formatter4);
+ break;
+ case FORMATTER_LENGTH5:
+ localDateTime = LocalDateTime.parse(dateTimeString, formatter5);
+ break;
+ default:
+ System.out.println("unsupported datetime string: " + dateTimeString);
+ return localDateTime;
+ }
+ return localDateTime;
+ } catch (Exception e) {
+ log.error("date time trans failed, String: " + dateTimeString);
+ return localDateTime;
+ }
+ }
+
+ public static long toTimestamp(String dateTimeString) {
+ LocalDateTime localDateTime = toLocalDatetime(dateTimeString);
+
+ if (localDateTime != null) {
+ return localDateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+ } else {
+ return 0;
+ }
+ }
+
+ public static String toString(long timestamp) {
+ LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
+
+ return localDateTime.format(formatter1);
+ }
+
+ public static String microsecondToString(long microsecondTimestamp) {
+ long milli = microsecondTimestamp / 1000;
+ LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(milli), ZoneId.systemDefault());
+ return localDateTime.format(formatter1) + String.format("%03d", microsecondTimestamp % 1000);
+ }
+
+ public static long stringToMicrosecond(String dateTimeString) {
+ long milli = toTimestamp(dateTimeString);
+
+ return milli * 1000 + Integer.valueOf(dateTimeString.substring(23));
+ }
+
+ public static String roundUpTimestamp(String dateTimeString) {
+ LocalDateTime localDateTime = toLocalDatetime(dateTimeString);
+
+ if (localDateTime != null) {
+
+ if (localDateTime.getNano() != 0) {
+ LocalDateTime ceilTimestamp = localDateTime.plus(1, ChronoUnit.SECONDS);
+ return ceilTimestamp.format(formatter5);
+ } else {
+ return localDateTime.format(formatter5);
+ }
+ } else {
+ return null;
+ }
+
+ }
+
+ public static void main(String... args) {
+// System.out.println(roundUpTimestamp("2020-01-01 11:11:11.231"));
+ System.out.println(microsecondToString(1702006505063167L));
+ System.out.println(stringToMicrosecond("2023-12-08 11:35:05.063167"));
+ }
+}
diff --git a/tools/data-sync-agent/src/main/resources/application.properties b/tools/data-sync-agent/src/main/resources/application.properties
new file mode 100644
index 0000000000000000000000000000000000000000..ac6e80bd845043f4e7ccf61c0379b6ea78038bf7
--- /dev/null
+++ b/tools/data-sync-agent/src/main/resources/application.properties
@@ -0,0 +1 @@
+logging.pattern.console = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
\ No newline at end of file
diff --git a/tools/data-sync-agent/src/main/resources/conf.yaml b/tools/data-sync-agent/src/main/resources/conf.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..f484f639fc9896b69a015c53f75da1e95275251b
--- /dev/null
+++ b/tools/data-sync-agent/src/main/resources/conf.yaml
@@ -0,0 +1,58 @@
+# center db setting
+centerDb:
+# test server
+# ip: 192.168.110.41
+# port: 5432
+# user: gaussdb
+# password: openGauss@123
+
+# ip: 192.168.111.194
+# port: 5432
+# user: gaussdb
+# password: openGauss@123
+
+#local
+ ip: 192.168.110.30
+ port: 5432
+ user: gaussdb
+ password: Test@1234
+
+# local db setting
+localDb:
+ - ip: 10.0.2.15
+ port: 3333
+ user:
+ password:
+ nodeId:
+# - ip: 172.21.186.202
+# port: 3336
+# user:
+# password:
+# nodeId:
+# - ip: 192.168.111.194
+# port: 3333
+# user:
+# password:
+# nodeId:
+# - ip: 192.168.110.41
+# port: 3333
+# user:
+# password:
+# nodeId:
+
+# sync table [table name]:[sync column]
+syncTable:
+ - tableName: readings
+ syncColumn: created_time
+ - tableName: diagnostics
+ syncColumn: created_time
+# - tableName: common_t_table_2
+# syncColumn: t
+# - tableName: common_t_table_3
+# syncColumn: t
+# - tableName: common_t_table_4
+# syncColumn: t
+# - tableName: common_t_table_5
+# syncColumn: t
+
+loopWait: 30
\ No newline at end of file
diff --git a/tools/gradle-7.2-all.zip b/tools/gradle-7.2-all.zip
new file mode 100755
index 0000000000000000000000000000000000000000..d43cd93e0f292281d2df8e8e3b3d0f3096a1fa67
Binary files /dev/null and b/tools/gradle-7.2-all.zip differ
diff --git a/tools/intarkdb_cli/CMakeLists.txt b/tools/intarkdb_cli/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..7848bc382409243a0c7d4870d1f4b999e15d2e7c
--- /dev/null
+++ b/tools/intarkdb_cli/CMakeLists.txt
@@ -0,0 +1,38 @@
+cmake_minimum_required(VERSION 3.14.1)
+# project(intarkdb_cli)
+
+add_compile_options(-fPIC)
+# # add_compile_options(-fPIC -MMD -fno-strict-aliasing -fsigned-char -fms-extensions)
+
+set(CMAKE_CXX_STANDARD 17)
+## create compile_command.json
+set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
+
+include_directories(${INTARKDB_ZEKERNEL_COMMON_INC_PATH})
+
+set(INTARKDB_LINK_LIBS
+sqlite3_api_wrapper
+intarkdb
+cjson
+fmt
+)
+
+if(UNIX)
+ file(GLOB test_main ./*.cpp ./*.c)
+ add_executable(intarkdb_cli ${test_main})
+ # target_link_libraries(intarkdb_cli ${INTARKDB_LINK_LIBS} -Wl,--copy-dt-needed-entries)
+ if ((${OS_ARCH} STREQUAL "arm32") OR (${OS_ARCH} STREQUAL "aarch64"))
+ target_link_libraries(intarkdb_cli ${INTARKDB_LINK_LIBS})
+ else()
+ target_link_libraries(intarkdb_cli ${INTARKDB_LINK_LIBS} -Wl,--copy-dt-needed-entries)
+ endif()
+ target_link_directories(intarkdb_cli PUBLIC ${INTARKDB_LIB_PATH} ${INTARKDB_THRID_LIB_PATH})
+ target_include_directories(intarkdb_cli PUBLIC ${INTARKDB_COMPUTE_SQL_INC_PATH}
+ ${INTARKDB_PGQUERY_INC_PATH}
+ ${INTARKDB_SECUREC_INC_PATH}
+ ${INTARKDB_SRC_PATH}
+ ${INTARKDB_CJSON_PATH}
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${INTARKDB_THRID_INC_PATH}
+ ${INTARKDB_FMT_INC_PATH})
+endif(UNIX)
\ No newline at end of file
diff --git a/tools/intarkdb_cli/assist.cpp b/tools/intarkdb_cli/assist.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..e72ab904ef9a9b3b78c78d204e2f7ec349fe7cf7
--- /dev/null
+++ b/tools/intarkdb_cli/assist.cpp
@@ -0,0 +1,112 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*
+* assist.cpp
+*
+* IDENTIFICATION
+* openGauss-embedded/tools/intarkdb_cli/assist.cpp
+*
+* -------------------------------------------------------------------------
+*/
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "assist.h"
+
+
+/*
+** Return a pathname which is the user's home directory. A
+** 0 return indicates an error of some kind.
+*/
+char *find_home_dir(int clearFlag){
+ static char *home_dir = nullptr;
+ if( clearFlag ){
+ free(home_dir);
+ home_dir = nullptr;
+ return nullptr;
+ }
+ if( home_dir ) return home_dir;
+
+#if !defined(_WIN32) && !defined(WIN32) && !defined(_WIN32_WCE) \
+ && !defined(__RTP__) && !defined(_WRS_KERNEL)
+ {
+ struct passwd *pwent;
+ uid_t uid = getuid();
+ if( (pwent=getpwuid(uid)) != nullptr) {
+ home_dir = pwent->pw_dir;
+ }
+ }
+#endif
+
+#if defined(_WIN32_WCE)
+ /* Windows CE (arm-wince-mingw32ce-gcc) does not provide getenv()
+ */
+ home_dir = "/";
+#else
+
+#if defined(_WIN32) || defined(WIN32)
+ if (!home_dir) {
+ home_dir = getenv("USERPROFILE");
+ }
+#endif
+
+ if (!home_dir) {
+ home_dir = getenv("HOME");
+ }
+
+#if defined(_WIN32) || defined(WIN32)
+ if (!home_dir) {
+ char *zDrive, *zPath;
+ int n;
+ zDrive = getenv("HOMEDRIVE");
+ zPath = getenv("HOMEPATH");
+ if( zDrive && zPath ){
+ n = strlen30(zDrive) + strlen30(zPath) + 1;
+ home_dir = malloc( n );
+ if( home_dir==0 ) return 0;
+ sqlite3_snprintf(n, home_dir, "%s%s", zDrive, zPath);
+ return home_dir;
+ }
+ home_dir = "c:\\";
+ }
+#endif
+
+#endif /* !_WIN32_WCE */
+
+ if( home_dir ){
+ int n = strlen30(home_dir) + 1;
+ char *z = (char*)malloc( n );
+ if( z ) memcpy(z, home_dir, n);
+ home_dir = z;
+ }
+
+ return home_dir;
+}
+
+/*
+** Compute a string length that is limited to what can be stored in
+** lower 30 bits of a 32-bit signed integer.
+*/
+int strlen30(const char *z){
+ const char *z2 = z;
+ while( *z2 ){ z2++; }
+ return 0x3fffffff & (int)(z2 - z);
+}
\ No newline at end of file
diff --git a/tools/intarkdb_cli/assist.h b/tools/intarkdb_cli/assist.h
new file mode 100644
index 0000000000000000000000000000000000000000..6642fcf71f89838e72daa08258b2497ef1d0e875
--- /dev/null
+++ b/tools/intarkdb_cli/assist.h
@@ -0,0 +1,27 @@
+/*
+* Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+*
+* openGauss embedded is licensed under Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+*
+* http://license.coscl.org.cn/MulanPSL2
+*
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+* MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+* See the Mulan PSL v2 for more details.
+* -------------------------------------------------------------------------
+*
+* assist.h
+*
+* IDENTIFICATION
+* openGauss-embedded/tools/intarkdb_cli/assist.h
+*
+* -------------------------------------------------------------------------
+*/
+#include "interface/sqlite3_api_wrapper/include/sqlite3.h"
+
+char *find_home_dir(int clearFlag);
+
+int strlen30(const char *z);
\ No newline at end of file
diff --git a/tools/intarkdb_cli/cmd.cpp b/tools/intarkdb_cli/cmd.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..7260dd222462022a992f91c68426149e889713d0
--- /dev/null
+++ b/tools/intarkdb_cli/cmd.cpp
@@ -0,0 +1,1010 @@
+/*
+ * Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+ *
+ * openGauss embedded is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan
+ * PSL v2. You may obtain a copy of Mulan PSL v2 at:
+ *
+ * http://license.coscl.org.cn/MulanPSL2
+ *
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY
+ * KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
+ * NON-INFRINGEMENT, MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE. See the
+ * Mulan PSL v2 for more details.
+ * -------------------------------------------------------------------------
+ *
+ * cmd.cpp
+ *
+ * IDENTIFICATION
+ * openGauss-embedded/tools/intarkdb_cli/cmd.cpp
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "cmd.h"
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "cJSON.h"
+#include "cm_signal.h"
+#include "interface/c/intarkdb_kv.h"
+#include "include/intarkdb.h"
+#include "linenoise.h"
+#include "storage/gstor/zekernel/common/cm_thread.h"
+#ifdef SUPPORT_SQLITE
+#include
+#endif
+
+#define PLATFORM_INFO_SIZE 1024
+#define CLOSE_TIMES 2
+
+ClassCmd::ClassCmd(const std::string &path, Connection *conn, KvOperator *kv_oper)
+ : sql_conn(conn), kv_operator(kv_oper), db_path(path + "intarkdb"), print_type(box) {
+ ex_func_map.insert(std::make_pair(".help", &ClassCmd::Help));
+ ex_func_map.insert(std::make_pair(".dbinfo", &ClassCmd::Info));
+ ex_func_map.insert(std::make_pair(".version", &ClassCmd::Version));
+
+ ex_func_map.insert(std::make_pair(".dump", &ClassCmd::Dump));
+ ex_func_map.insert(std::make_pair(".import", &ClassCmd::Import));
+ ex_func_map.insert(std::make_pair(".read", &ClassCmd::Read));
+
+ ex_func_map.insert(std::make_pair(".table", &ClassCmd::ShowTable));
+ ex_func_map.insert(std::make_pair(".index", &ClassCmd::ShowIndex));
+ ex_func_map.insert(std::make_pair(".keys", &ClassCmd::SHowKeys));
+ ex_func_map.insert(std::make_pair(".schema", &ClassCmd::Schema));
+ ex_func_map.insert(std::make_pair(".fullschema", &ClassCmd::FullSchema));
+
+ ex_func_map.insert(std::make_pair(".mode", &ClassCmd::Mode));
+ ex_func_map.insert(std::make_pair(".explain", &ClassCmd::Explain));
+ ex_func_map.insert(std::make_pair(".nullvalue", &ClassCmd::SetNullStr));
+ ex_func_map.insert(std::make_pair(".width", &ClassCmd::SetWidth));
+ ex_func_map.insert(std::make_pair(".echo", &ClassCmd::SetEcho));
+ ex_func_map.insert(std::make_pair(".change", &ClassCmd::SetChange));
+ ex_func_map.insert(std::make_pair(".timer", &ClassCmd::SetTimer));
+ ex_func_map.insert(std::make_pair(".output", &ClassCmd::SetOutFile));
+ ex_func_map.insert(std::make_pair(".once", &ClassCmd::SetOutputOnce));
+ ex_func_map.insert(std::make_pair(".prompt", &ClassCmd::SetPrompt));
+ ex_func_map.insert(std::make_pair(".show", &ClassCmd::ShowOption));
+ ex_func_map.insert(std::make_pair(".dbconfig", &ClassCmd::DBConfig));
+
+ help_msg = "Help: \n";
+ std::map help_map;
+ int length = 20;
+ help_map.insert(std::make_pair(".echo on|off", "Turn command echo on or off"));
+ help_map.insert(std::make_pair(".quit", "Exit this program"));
+ help_map.insert(std::make_pair(".exit", "Exit this program"));
+ help_map.insert(std::make_pair(".help", "Show help text"));
+ help_map.insert(std::make_pair(".help kv", "Show help text for kv"));
+ help_map.insert(std::make_pair(".dbinfo", "Show status information about the database"));
+ help_map.insert(std::make_pair(".version", "Show version infomation"));
+ help_map.insert(std::make_pair(".dump", "Render database content as SQL"));
+ help_map.insert(std::make_pair(".import $FILE $TABLE", "Import data from FILE into TABLE"));
+ help_map.insert(std::make_pair(".read $FILE", "Read input from FILE"));
+
+ help_map.insert(std::make_pair(".table $TABLE", "List names of tables matching LIKE pattern TABLE"));
+ help_map.insert(std::make_pair(".index $TABLE", "Show names of indexes"));
+ help_map.insert(std::make_pair(".schema $PATTERN", "Show the CREATE statements matching PATTERN"));
+ help_map.insert(std::make_pair(".fullschema", "Show schema and the content of sqlite_stat tables"));
+ help_map.insert(std::make_pair(".keys", "show all keys"));
+
+ help_map.insert(std::make_pair(".mode", "Set output mode, support: " + GetModeType(",")));
+ help_map.insert(std::make_pair(".explain on|off", "Turn command echo on or off"));
+ help_map.insert(std::make_pair(".nullvalue", "Use STRING in place of NULL values"));
+ help_map.insert(std::make_pair(".width $NUM", "Set minimum column widths"));
+ help_map.insert(std::make_pair(".echo on|off", "Turn command echo on or off"));
+ help_map.insert(std::make_pair(".change on|off", "Show number of rows changed by SQL"));
+ help_map.insert(std::make_pair(".timer on|off", "Turn SQL timer on or off"));
+ help_map.insert(std::make_pair(".output $FILE", "Send output to FILE or stdout if FILE is omitted"));
+ help_map.insert(std::make_pair(".once $FILE", "Output for the next SQL command only to FILE"));
+ help_map.insert(std::make_pair(".prompt", "Replace the standard prompts"));
+ help_map.insert(std::make_pair(".show", "Show the current values for various settings"));
+ help_map.insert(std::make_pair(".dbconfig $OP $VAL", " List or change onfig optionss"));
+ for (auto item : help_map) {
+ fmt::format_to(std::back_inserter(help_msg), "{:<{}}: {}\n", item.first, length, item.second);
+ }
+}
+
+std::function shutdown_handler;
+
+static void signal_handler(int sig_no) { shutdown_handler(sig_no); }
+
+void ClassCmd::main() {
+ int HistorySetMaxLen = 30;
+ uint close_num = 0;
+ auto prompt = GetBeginPrompt();
+ linenoiseSetMultiLine(1);
+ linenoiseHistorySetMaxLen(HistorySetMaxLen);
+ shutdown_handler = [&close_num](int sig_no) {
+ close_num++;
+ if (close_num >= CLOSE_TIMES) {
+ std::cout << "Closing..." << std::endl;
+ exit(1);
+ }
+ };
+ (void)cm_regist_signal(SIGINT, signal_handler);
+
+ print_delegate_ = GetPrintDelegate("stdout");
+ std::cout << "WelCome intarkdb shell." << std::endl;
+ while (true) {
+ std::string query;
+ while (true) {
+ char *input = linenoise(prompt);
+ if (input == nullptr) {
+ close_num++;
+ if (close_num < CLOSE_TIMES) {
+ std::cout << "If you want to close, press ctrl+c again" << std::endl;
+ continue;
+ } else {
+ std::cout << "Closing..." << std::endl;
+ return;
+ }
+ } else {
+ close_num = 0;
+ }
+
+ linenoiseHistoryAdd(input);
+ query += std::string(input);
+ if (IsFinishQuery(query)) {
+ prompt = GetBeginPrompt();
+ break;
+ }
+ prompt = GetContinuePrompt();
+ query += " \n ";
+ linenoiseFree(input);
+ }
+ if (execute(query) == false) {
+ std::cout << "Closing..." << std::endl;
+ break;
+ }
+ }
+ return;
+}
+
+std::string ClassCmd::GetTableName(const std::string &str) {
+ if (str[0] == '\"' && str[str.size() - 1] == '\"') return str.substr(1, str.size() - 2);
+
+ std::string tmp = str;
+ transform(tmp.begin(), tmp.end(), tmp.begin(), ::tolower);
+ return tmp;
+}
+
+void ClassCmd::GetCmd(std::string str, const char split, std::vector &res) {
+ std::istringstream iss(str);
+ std::string buf;
+ while (getline(iss, buf, split)) {
+ if (buf.length() > 0 && buf != "\n") res.push_back(buf);
+ }
+ // 去掉";"
+ if (res.size() > 0) {
+ int last = res.size() - 1;
+ if (res[last][res[last].size() - 1] == ';') res[last] = res[last].substr(0, res[last].size() - 1);
+ if (res[last].size() == 0) res.pop_back();
+ }
+}
+
+bool ClassCmd::execute(std::string &query) {
+ std::vector strList;
+ GetCmd(query, ' ', strList);
+ std::string result;
+ try {
+ if (IsQuitCmd(strList[0])) {
+ return false;
+ }
+
+ func build_func = nullptr;
+ if (IsBuildInCmd(strList[0], build_func)) {
+ PrintBuildInCmdResult(query, build_func, strList);
+ return true;
+ }
+
+ KvOperator::kv_func kv_func = nullptr;
+ if (IsKVCmd(strList[0], kv_func) && !IsSqlCmd(query, strList)) {
+ PrintKVCmdResult(query, kv_func, strList);
+ return true;
+ }
+
+ PrintSQLCmdResult(query);
+ } catch (const std::exception &ex) {
+ std::cout << "Error:" << std::string(ex.what()) << std::endl;
+ }
+ return true;
+}
+
+bool ClassCmd::IsSqlCmd(std::string query, std::vector &strList) {
+ std::string cmd = strList[0];
+ std::transform(cmd.begin(), cmd.end(), cmd.begin(), ::tolower);
+
+ std::string sql = query;
+ std::transform(sql.begin(), sql.end(), sql.begin(), ::tolower);
+ if (cmd == "set") {
+ if (sql.find("auto_commit") != std::string::npos && sql.find("=") != std::string::npos) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+std::string ClassCmd::Help(const std::vector &strList) {
+ if (strList.size() == 1)
+ return help_msg;
+ else if (strList.size() == 2) {
+ if (strList[1] == "kv") return KvOperator::show_kv_help();
+ }
+
+ return help_msg;
+}
+
+std::string ClassCmd::GetModeType(const std::string &separator) {
+ std::string s_mode_type_msg;
+ for (const auto &item : m_print_type) s_mode_type_msg += item.first + separator;
+ s_mode_type_msg = s_mode_type_msg.substr(0, s_mode_type_msg.size() - 1);
+ return s_mode_type_msg;
+}
+
+std::string ClassCmd::Mode(const std::vector &strList) {
+ std::string prompt_msg = "Useage: .mode " + GetModeType("|") + "\n";
+ if (strList.size() != 2) return prompt_msg;
+ auto item = m_print_type.find(strList[1]);
+ if (item == m_print_type.end()) return prompt_msg;
+ print_type = item->second;
+ return "change mode = " + strList[1] + "\n";
+}
+
+std::string ClassCmd::Info(const std::vector &strList) {
+ std::stringstream msg;
+ msg << "Info: " << std::endl;
+ msg << "version: " << get_version() << std::endl;
+ msg << "git commit id: " << get_commit_id() << std::endl;
+ msg << "DB_PATH: " << db_path << std::endl;
+ return msg.str();
+}
+
+std::string ClassCmd::Version(const std::vector &strList) {
+ std::stringstream msg;
+ msg << "Info: " << std::endl;
+ msg << "version: " << get_version() << std::endl;
+ msg << "git commit id: " << get_commit_id() << std::endl;
+ return msg.str();
+}
+
+std::string ClassCmd::SetNullStr(const std::vector &strList) {
+ std::string prompt_msg = "Useage: .nullvalue $null\n";
+ if (strList.size() != 2) return prompt_msg;
+ null_str = strList[1];
+ return "change nullvalue = " + strList[1] + "\n";
+}
+
+std::string ClassCmd::SetWidth(const std::vector &strList) {
+ std::string prompt_msg = "Useage: .width $num\n";
+ if (strList.size() != 2) return prompt_msg;
+ min_col_len = atoi(strList[1].c_str());
+ return "change min width = " + strList[1];
+}
+
+void ClassCmd::DrowTableLine(const std::vector &vlength, std::string_view symbols, size_t symbols_size,
+ std::string_view line) {
+ const uint32_t SYMBOLS_AND_INTERVAL_SIZE = 3;
+ std::stringstream s_format;
+ s_format << "{:" << line << ">{}}";
+ print_delegate_->Print(symbols.substr(0, symbols_size));
+ for (std::size_t i = 0; i < vlength.size() - 1; i++) {
+ print_delegate_->Print(fmt::format(s_format.str(), symbols.substr(symbols_size, symbols_size),
+ vlength[i] + SYMBOLS_AND_INTERVAL_SIZE));
+ }
+ s_format << "\n";
+ print_delegate_->Print(fmt::format(s_format.str(), symbols.substr(symbols_size * 2, symbols_size),
+ vlength[vlength.size() - 1] + SYMBOLS_AND_INTERVAL_SIZE));
+}
+
+static auto UpdateWidth(const std::vector &record, std::vector &vlength) -> void {
+ for (std::size_t i = 0; i < record.size(); i++) {
+ if (record[i].size() > vlength[i]) {
+ vlength[i] = record[i].size();
+ }
+ }
+}
+
+static auto UpdateWidth(const std::vector> &record_batch, std::vector &vlength)
+ -> void {
+ for (const auto &row : record_batch) {
+ UpdateWidth(row, vlength);
+ }
+}
+
+auto ClassCmd::PrintHeader(const std::vector &headers, const std::vector &widths,
+ const std::string &symbols, int symbols_size) -> void {
+ std::string_view symbols_view = symbols;
+ std::string_view first_line_symbol(symbols_view.substr(symbols_size * 2, symbols_size * 3));
+ std::string_view line_symbol(symbols_view.substr(symbols_size, symbols_size));
+ std::string_view separator(symbols_view.substr(0, symbols_size));
+ std::string_view second_line_symbol(symbols_view.substr(symbols_size * 5, symbols_size * 3));
+
+ DrowTableLine(widths, first_line_symbol, symbols_size, line_symbol);
+ for (std::size_t i = 0; i < headers.size(); i++) {
+ print_delegate_->Print(fmt::format(fmt::emphasis::bold, "{} {:<{}} ", separator, headers[i], widths[i]));
+ }
+ print_delegate_->Print(separator);
+ print_delegate_->Print("\n");
+ DrowTableLine(widths, second_line_symbol, symbols_size, line_symbol);
+}
+
+auto ClassCmd::PrintFooter(const std::vector &widths, const std::string &symbols, int symbols_size) -> void {
+ std::string_view symbols_view = symbols;
+ std::string_view end_line_symbol(symbols_view.substr(symbols_size * 8, symbols_size * 3));
+ std::string_view line_symbol(symbols_view.substr(symbols_size, symbols_size));
+ DrowTableLine(widths, end_line_symbol, symbols_size, line_symbol);
+}
+
+auto ClassCmd::PrintRows(const std::vector &headers, const std::vector> &rows,
+ std::vector &widths, const std::string &symbols, int symbols_size, bool first_page)
+ -> void {
+ std::string_view symbols_view = symbols;
+ std::string_view separator(symbols_view.substr(0, symbols_size));
+ if (first_page) {
+ PrintHeader(headers, widths, symbols, symbols_size);
+ }
+ for (size_t i = 0; i < rows.size(); ++i) {
+ for (size_t j = 0; j < rows[i].size(); ++j) {
+ print_delegate_->Print(fmt::format("{} {:<{}} ", separator, rows[i][j], widths[j]));
+ }
+ print_delegate_->Print(separator);
+ print_delegate_->Print("\n");
+ }
+}
+
+auto ClassCmd::PrintHeaderAndBody(RecordIterator &records, const std::vector &headers,
+ std::vector &widths, const std::string &symbols, int symbols_size) -> void {
+ constexpr int PAGE_SIZE = 100; // 每次打印的行数
+ int row_count = 0;
+ std::vector> record_batch;
+ record_batch.reserve(PAGE_SIZE);
+ bool first_page = true;
+ UpdateWidth(headers, widths); // 根据header更新列宽
+ while (true) {
+ const auto &[record, eof] = records.Next();
+ if (eof) {
+ break;
+ }
+ row_count++;
+ record_batch.push_back(RecordToString(record, null_str));
+ if (row_count == PAGE_SIZE) {
+ UpdateWidth(record_batch, widths); // 根据新批次的数据更新列宽
+ PrintRows(headers, record_batch, widths, symbols, symbols_size, first_page);
+ first_page = false;
+ record_batch.clear();
+ row_count = 0;
+ }
+ }
+
+ if (row_count > 0) {
+ UpdateWidth(record_batch, widths); // 根据新批次的数据更新列宽
+ PrintRows(headers, record_batch, widths, symbols, symbols_size, first_page);
+ first_page = false;
+ }
+ if (first_page) {
+ // empty table , 上面都没有打印
+ PrintRows(headers, record_batch, widths, symbols, symbols_size, first_page);
+ }
+}
+
+// 打印整个结果集
+auto ClassCmd::PrintTable(RecordIterator &records, const std::string &symbols, size_t symbols_size) -> void {
+ auto headers = records.GetHeader();
+ std::vector widths(headers.size(), min_col_len);
+ PrintHeaderAndBody(records, headers, widths, symbols, symbols_size);
+ PrintFooter(widths, symbols, symbols_size);
+}
+
+auto ClassCmd::PrintSelectRecords(RecordIterator &record_iterator) -> void {
+ switch (b_explain ? box : print_type) {
+ case box: {
+ PrintTable(record_iterator, "│─┌┬┐├┼┤└┴┘", 3); // 3 bytes
+ break;
+ }
+ case table: {
+ PrintTable(record_iterator, "|-+++++++++", 1);
+ break;
+ }
+ case csv: {
+ PrintCSVFormat(record_iterator);
+ break;
+ }
+ case json: {
+ PrintJSONFormat(record_iterator);
+ break;
+ }
+ case insert: {
+ PrintInsert(record_iterator, "\"table\"");
+ break;
+ }
+ default:
+ break;
+ }
+}
+
+static auto PrintInsertValue(std::shared_ptr &print_delegate, const Value &v) {
+ if (v.IsNull()) {
+ print_delegate->Print("NULL");
+ } else if (v.IsNumeric()) {
+ print_delegate->Print(fmt::format("{}", v.GetCastAs()));
+ } else if (v.GetLogicalType().TypeId() == GS_TYPE_BOOLEAN) {
+ print_delegate->Print(fmt::format("{}", v.GetCastAs()));
+ } else {
+ print_delegate->Print(fmt::format("{}", v.ToString()));
+ }
+}
+
+auto ClassCmd::PrintInsert(RecordIterator &record_iterator, const std::string &table_name) -> void {
+ while (true) {
+ const auto &[record, eof] = record_iterator.Next();
+ if (eof) {
+ break;
+ }
+ print_delegate_->Print("INSERT INTO " + table_name + " VALUES");
+ auto col_num = record.ColumnCount();
+ for (uint32_t i = 0; i < col_num; ++i) {
+ const auto &v = record.FieldRef(i);
+ if (i == 0) {
+ print_delegate_->Print("(");
+ } else {
+ print_delegate_->Print(",");
+ }
+ PrintInsertValue(print_delegate_, v);
+ }
+ print_delegate_->Print(");\n");
+ }
+}
+
+auto ClassCmd::PrintRecords(RecordIterator &record_iterator) -> void {
+ if (record_iterator.GetRetCode() != 0) {
+ print_delegate_->Print(
+ fmt::format("Error Code:{} Msg:{}\n", record_iterator.GetRetCode(), record_iterator.GetRetMsg()));
+ return;
+ }
+
+ switch (record_iterator.GetStmtType()) {
+ case StatementType::INVALID_STATEMENT: {
+ return;
+ }
+ case StatementType::SHOW_STATEMENT:
+ case StatementType::SELECT_STATEMENT: {
+ PrintSelectRecords(record_iterator);
+ break;
+ }
+ case StatementType::CALL_STATEMENT: {
+ if (record_iterator.GetBatchType() == RecordBatchType::Select) {
+ PrintSelectRecords(record_iterator);
+ }
+ else {
+ print_delegate_->Print("Query OK\n");
+ }
+ break;
+ }
+ default: {
+ print_delegate_->Print("Query OK\n");
+ break;
+ }
+ }
+ UpdateTotalChanges(record_iterator.GetEffectRow());
+ PrintEffectRow(record_iterator.GetEffectRow());
+}
+
+void ClassCmd::PrintTimer(const std::chrono::steady_clock::time_point &start,
+ const std::chrono::steady_clock::time_point &end) {
+ if (b_timer) {
+ print_delegate_->Print(
+ fmt::format("Run Time: {:.4f} ms \n", std::chrono::duration(end - start).count()));
+ }
+}
+
+void ClassCmd::UpdatePrintDelegate(uint16_t &output_times, std::string &tmp_out_file) {
+ if (output_times > 0) {
+ if (tmp_out_file.size() > 0) {
+ print_delegate_ = GetPrintDelegate(tmp_out_file);
+ tmp_out_file.clear();
+ } else {
+ output_times--;
+ if (output_times == 0) {
+ print_delegate_ = GetPrintDelegate("stdout");
+ }
+ }
+ }
+}
+
+void ClassCmd::PrintResult(const std::string &str) {
+ print_delegate_->Print(str);
+ // 更新print_delegate_
+ UpdatePrintDelegate(output_times, tmp_output_file);
+}
+
+std::string ClassCmd::SetEcho(const std::vector &strList) { return SetSwitch(".echo", b_echo, strList); }
+
+std::string ClassCmd::SetChange(const std::vector &strList) {
+ return SetSwitch(".change", b_show_change, strList);
+}
+
+std::string ClassCmd::SetTimer(const std::vector &strList) {
+ return SetSwitch(".timer", b_timer, strList);
+}
+
+std::string ClassCmd::Explain(const std::vector &strList) {
+ return SetSwitch(".explain", b_explain, strList);
+}
+
+std::string ClassCmd::SetSwitch(const std::string &cmd, bool &bswitch, const std::vector &strList) {
+ static std::string prompt_msg = "Useage: " + cmd + " on|off \n"; // 只创建一次
+ if (strList.size() != 2) {
+ return prompt_msg;
+ }
+
+ if (strList[1] == "on") {
+ bswitch = true;
+ } else if (strList[1] == "off") {
+ bswitch = false;
+ } else {
+ return prompt_msg;
+ }
+ return "";
+}
+
+std::string ClassCmd::SetOutFile(const std::vector &strList) {
+ std::string prompt_msg = "Useage: .output FILE|off\n";
+ if (strList.size() != 2) return prompt_msg;
+ if (strList[1] == "off") {
+ print_delegate_ = GetPrintDelegate("stdout");
+ } else {
+ print_delegate_ = GetPrintDelegate(strList[1]);
+ }
+ output_times = 0;
+ return "";
+}
+
+std::string ClassCmd::SetOutputOnce(const std::vector &strList) {
+ std::string prompt_msg = "Useage: .once FILE\n";
+ if (strList.size() != 2) return prompt_msg;
+ tmp_output_file = strList[1];
+ output_times = 1;
+ return "";
+}
+
+std::string ClassCmd::SetPrompt(const std::vector &strList) {
+ std::string prompt_msg = "Useage: .prompt your_prompt\n";
+ if (strList.size() != 2) return prompt_msg;
+ begin_prompt.clear();
+ continue_prompt.clear();
+ fmt::format_to(std::back_inserter(begin_prompt), "{:<{}}> ", strList[1], strList[1].size());
+ fmt::format_to(std::back_inserter(continue_prompt), "{:>{}} ", ">>>", strList[1].size() + 1);
+ return "";
+}
+
+std::string ClassCmd::ShowOption(const std::vector &strList) {
+ std::string msg;
+ int length = 10;
+ fmt::format_to(std::back_inserter(msg), "{:>{}}: {}\n", "path", length, db_path);
+ fmt::format_to(std::back_inserter(msg), "{:>{}}: {}\n", "output", length, print_delegate_->GetFileName());
+
+ for (auto item : m_print_type)
+ if (item.second == print_type) {
+ fmt::format_to(std::back_inserter(msg), "{:>{}}: {}\n", "mode", length, item.first);
+ break;
+ }
+ fmt::format_to(std::back_inserter(msg), "{:>{}}: {}\n", "nullvalue", length, null_str);
+ fmt::format_to(std::back_inserter(msg), "{:>{}}: {}\n", "echo", length, b_echo ? "on" : "off");
+ fmt::format_to(std::back_inserter(msg), "{:>{}}: {}\n", "change", length, b_show_change ? "on" : "off");
+ fmt::format_to(std::back_inserter(msg), "{:>{}}: {}\n", "timer", length, b_timer ? "on" : "off");
+ return msg;
+}
+
+std::string ClassCmd::ShowTable(const std::vector &strList) {
+ std::string result;
+ std::stringstream sql_query;
+ sql_query << "select NAME from \"SYS_TABLES\" where \"SPACE#\"=" << SQL_SPACE_TYPE_USERS;
+ if (strList.size() > 1) {
+ std::string sName = strList[1];
+ transform(sName.begin(), sName.end(), sName.begin(), ::tolower);
+ sql_query << " and (NAME like '" << sName << "' or NAME like '" << strList[1] << "')";
+ }
+ sql_query << ";";
+ auto records = sql_conn->Query(sql_query.str().c_str())->GetRecords();
+ for (std::size_t i = 1; i < records.size(); i++) {
+ for (auto item : records[i]) {
+ result += item + "\n";
+ }
+ }
+ return result;
+}
+
+std::string ClassCmd::ShowIndex(const std::vector &strList) {
+ std::string result;
+ std::stringstream sql_query;
+ sql_query << "select ti.NAME from \"SYS_INDEXES\" ti join (select \"ID\", "
+ "\"NAME\" from \"SYS_TABLES\" "
+ << "where \"SPACE#\"=" << SQL_SPACE_TYPE_USERS;
+ if (strList.size() > 1) {
+ std::string sName = strList[1];
+ transform(sName.begin(), sName.end(), sName.begin(), ::tolower);
+ sql_query << " and (NAME like '" << sName << "' or NAME like '" << strList[1] << "')";
+ }
+ sql_query << ") tt on ti.\"TABLE#\" = tt.\"ID\";";
+ auto records = sql_conn->Query(sql_query.str().c_str())->GetRecords();
+ for (std::size_t i = 1; i < records.size(); i++) {
+ for (auto item : records[i]) {
+ result += item + "\n";
+ }
+ }
+ return result;
+}
+
+std::string ClassCmd::SHowKeys(const std::vector &strList) {
+ std::string result;
+ std::stringstream sql_query;
+ sql_query << "SELECT KEY FROM \"" << kv_operator->getKVTable() << "\";";
+ auto records = sql_conn->Query(sql_query.str().c_str())->GetRecords();
+ for (std::size_t i = 1; i < records.size(); i++) {
+ for (auto item : records[i]) {
+ result += item + "\n";
+ }
+ }
+ return result;
+}
+
+std::string ClassCmd::Schema(const std::vector &strList) {
+ std::string prompt_msg = "Useage: .schema $TABLE\n";
+ if (strList.size() != 2) return prompt_msg;
+ std::vector v_index_sqls;
+ std::string table = GetTableName(strList[1]);
+ std::string result = sql_conn->ShowCreateTable(table, v_index_sqls) + "\n";
+ for (auto item : v_index_sqls) {
+ result += item + "\n";
+ }
+ return result;
+}
+
+std::string ClassCmd::FullSchema(const std::vector &strList) {
+ std::string result;
+ std::stringstream sql_query;
+ sql_query << "show tables;";
+ auto records = sql_conn->Query(sql_query.str().c_str())->GetRecords();
+ for (std::size_t i = 1; i < records.size(); i++) {
+ for (auto item : records[i]) {
+ std::vector v_index_sqls;
+ result += sql_conn->ShowCreateTable(item, v_index_sqls) + "\n";
+ for (auto item : v_index_sqls) {
+ result += item + "\n";
+ }
+ result += "\n";
+ }
+ }
+ return result;
+}
+
+// TODO: 内置函数的输出也不一定是短小的,需要考虑机制让内置函数也可以选择
+// 构造字符串,或直接使用 print_delegate 输出
+std::string ClassCmd::Dump(const std::vector &strList) {
+ // NOTE: 先特殊处理让结果直接输出,并返回一个空字符串
+ std::string prompt_msg = "Useage: .dump $TABLE\n";
+ if (strList.size() != 2) {
+ return prompt_msg;
+ }
+ print_delegate_->Print("BEGIN;\n");
+ std::vector v_index_sqls;
+ auto create_table_sql = sql_conn->ShowCreateTable(strList[1], v_index_sqls);
+ print_delegate_->Print(create_table_sql);
+ print_delegate_->Print("\n");
+ std::string sql_query = "SELECT * FROM " + strList[1];
+ auto record_iterator = sql_conn->QueryIterator(sql_query.c_str());
+ PrintInsert(*record_iterator, strList[1]);
+ print_delegate_->Print("COMMIT;\n");
+ return "";
+}
+
+void ClassCmd::StringSplit(const std::string &str, const char split, std::vector &res) {
+ std::istringstream iss(str + split);
+ std::string buf;
+ while (getline(iss, buf, split)) {
+ res.push_back(buf);
+ }
+}
+
+std::string ClassCmd::ImportCSV(const std::string &file, const std::string &table, uint16 skip) {
+ std::ifstream fin;
+ fin.open(file.c_str(), std::ios::in);
+ if (!fin.is_open()) {
+ return "open file: " + file + " error\n";
+ }
+
+ std::string str_line;
+ // skip n
+ for (uint16 i = 0; i < skip; i++) getline(fin, str_line);
+
+ sql_conn->Query("BEGIN");
+
+ // 判断数据表是否存在,不存在则创建
+ if (sql_conn->GetTableInfo(table) == nullptr) {
+ getline(fin, str_line);
+ std::vector col_name;
+ StringSplit(str_line, ',', col_name);
+ std::stringstream sql;
+ sql << "CREATE TABLE " << table << " (";
+ for (size_t i = 0; i < col_name.size(); i++) {
+ if (i != 0) sql << ", ";
+ sql << "\"" << col_name[i] << "\" VARCHAR";
+ }
+ sql << ");";
+ auto r = sql_conn->Query(sql.str().c_str());
+ if (r->GetRetCode() != 0) {
+ std::stringstream sErrMsg;
+ sErrMsg << "Error Code:" << r->GetRetCode() << " Msg:" << r->GetRetMsg() << std::endl;
+ return sErrMsg.str();
+ }
+ }
+ std::vector> rows;
+ bool bResult = true;
+ std::string result = "Import OK\n";
+ while (getline(fin, str_line) && str_line.size() > 0) {
+ std::vector row;
+ StringSplit(str_line, ',', row);
+ std::stringstream insert_sql;
+ insert_sql << "INSERT INTO " << table << " VALUES(";
+ for (size_t i = 0; i < row.size(); i++) {
+ if (i != 0) insert_sql << ", ";
+ if (row[i].size() > 0)
+ insert_sql << "'" << row[i] << "'";
+ else
+ insert_sql << "NULL";
+ }
+ insert_sql << ");";
+ auto r = sql_conn->Query(insert_sql.str().c_str());
+ if (r->GetRetCode() != 0) {
+ std::stringstream sErrMsg;
+ sErrMsg << "Error Code:" << r->GetRetCode() << " Msg:" << r->GetRetMsg() << std::endl;
+ result = sErrMsg.str();
+ sql_conn->Query("ROLLBACK");
+ bResult = false;
+ break;
+ }
+ }
+ if (bResult == true) sql_conn->Query("COMMIT");
+ return result;
+}
+
+std::string ClassCmd::Import(const std::vector &strList) {
+ std::string prompt_msg =
+ "Useage: .import $FILE $TABLE [--OPTION]"
+ " Options: \n"
+ " --csv Use , and \\n as column and row separators\n"
+#ifdef SUPPORT_SQLITE
+ " --sqlite TABLE Use SQLite table\n"
+#endif
+ " --skip N Skip the first N rows of input\n";
+ if (strList.size() < 4) return prompt_msg;
+
+ EImportType format_type = EImportType::invalid;
+ uint16 skip = 0;
+ std::vector strcmd;
+ for (size_t i = 1; i < strList.size(); i++) {
+ if (strList[i].substr(0, 2) == "--") {
+ std::string option = strList[i].substr(2);
+ if (option == "csv")
+ format_type = EImportType::csv;
+ else if (option == "skip" && i < strList.size() - 1)
+ skip = atoi(strList[++i].c_str());
+ } else {
+ strcmd.push_back(strList[i]);
+ }
+ }
+
+ switch (format_type) {
+ case EImportType::csv: {
+ return ImportCSV(strcmd[0], GetTableName(strcmd[1]), skip);
+ }
+ default: {
+ return "FORMAT OPTION ERROR! \n" + prompt_msg;
+ }
+ }
+}
+
+std::string ClassCmd::Read(const std::vector &strList) {
+ std::string prompt_msg = "Useage: .read $FILE\n";
+ if (strList.size() < 2) return prompt_msg;
+ std::ifstream fin;
+ fin.open(strList[1].c_str(), std::ios::in);
+ if (!fin.is_open()) {
+ return "open file: " + strList[1] + " error\n";
+ }
+ std::string str_line;
+ std::string cmd;
+ while (getline(fin, str_line)) {
+ cmd += str_line;
+ if (IsFinishQuery(cmd)) {
+ execute(cmd);
+ cmd.clear();
+ }
+ }
+ return "";
+}
+
+std::string ClassCmd::DBConfig(const std::vector &strList) {
+ std::string result;
+ int length = 10;
+ if (strList.size() == 1) { // show all config
+ fmt::format_to(std::back_inserter(result), "{:>{}}: {}\n", "log_level", length,
+ cm_log_param_instance()->log_level);
+ }
+ if (strList.size() >= 3) {
+ if (strList[1] == "log_level") {
+ cm_log_param_instance()->log_level = atoi(strList[2].c_str());
+ fmt::format_to(std::back_inserter(result), "{:>{}}: {}\n", "log_level", length,
+ cm_log_param_instance()->log_level);
+ }
+ }
+ return result;
+}
+
+std::shared_ptr ClassCmd::GetPrintDelegate(const std::string &filename) {
+ if (filename == "stdout") {
+ return std::make_shared();
+ } else {
+ return std::make_shared(filename.c_str());
+ }
+}
+
+auto ClassCmd::PrintCmd(const std::string &cmd) -> void {
+ if (b_echo) {
+ print_delegate_->Print(cmd);
+ print_delegate_->Print("\n");
+ }
+}
+
+auto ClassCmd::UpdateTotalChanges(uint64_t current_changes) -> void { total_changes += current_changes; }
+
+auto ClassCmd::PrintEffectRow(uint64_t current_changes) -> void {
+ if (b_show_change) {
+ print_delegate_->Print(fmt::format("changes: {}\ttotal_changes: {}\n", current_changes, total_changes));
+ }
+}
+
+auto ClassCmd::IsBuildInCmd(const std::string &cmd, func &build_func) -> bool {
+ auto iter = ex_func_map.find(cmd);
+ if (iter != ex_func_map.end()) {
+ build_func = iter->second;
+ }
+ return iter != ex_func_map.end();
+}
+
+auto ClassCmd::IsKVCmd(const std::string &cmd, KvOperator::kv_func &kv_func) -> bool {
+ auto iter = kv_operator->kv_func_map.find(cmd);
+ if (iter != kv_operator->kv_func_map.end()) {
+ kv_func = iter->second;
+ }
+ return iter != kv_operator->kv_func_map.end();
+}
+
+auto ClassCmd::PrintBuildInCmdResult(const std::string &query, const func &build_func,
+ const std::vector &args) -> void {
+ auto result = (this->*build_func)(args);
+ // 短结果直接打印
+ PrintCmd(query);
+ PrintResult(result);
+}
+
+auto ClassCmd::PrintKVCmdResult(const std::string &query, const KvOperator::kv_func &kv_func,
+ const std::vector &args) -> void {
+ auto start = GetTimer();
+ auto result = (kv_operator->*kv_func)(args);
+ // 短结果直接打印
+ PrintCmd(query);
+ PrintResult(result);
+ auto end = GetTimer();
+ PrintTimer(start, end);
+}
+
+auto ClassCmd::PrintSQLCmdResult(const std::string &query) -> void {
+ // handle sql
+ auto start = GetTimer();
+ auto r = sql_conn->QueryIterator(query.c_str());
+ PrintCmd(query);
+ PrintRecords(*r);
+ auto end = GetTimer(); // RecordBatch改为RecordIterator后,需要Print之后,才执行完成。
+ PrintTimer(start, end);
+}
+
+static auto PrintCSVRow(std::shared_ptr &print_delegate,
+ const std::vector &row_content) {
+ for (size_t i = 0; i < row_content.size(); i++) {
+ if (i > 0) {
+ print_delegate->Print(",");
+ }
+ print_delegate->Print(row_content[i]);
+ }
+ print_delegate->Print("\n");
+}
+
+auto ClassCmd::PrintCSVFormat(RecordIterator &record_iterator) -> void {
+ auto headers = record_iterator.GetHeader();
+ PrintCSVRow(print_delegate_, headers);
+
+ while (true) {
+ const auto &[record, eof] = record_iterator.Next();
+ if (eof) {
+ break;
+ }
+ auto row_content = RecordToString(record, null_str);
+ PrintCSVRow(print_delegate_, row_content);
+ }
+}
+
+auto ClassCmd::RecordToString(const Record &record, const std::string &null_format) -> std::vector {
+ std::vector row_content;
+ for (size_t i = 0; i < record.ColumnCount(); i++) {
+ const auto &val = record.FieldRef(i);
+ row_content.emplace_back(val.IsNull() ? null_format : val.ToString());
+ }
+ return row_content;
+}
+
+// TODO: 和 PrintInsertValue 逻辑重复了,考虑抽象合并
+static auto AddJsonValue(cJSON *jrow, const char *field_name, const Value &val) -> void {
+ if (val.IsNull()) {
+ cJSON_AddItemToObject(jrow, field_name, cJSON_CreateNull());
+ } else if (val.IsNumeric()) {
+ cJSON_AddItemToObject(jrow, field_name, cJSON_CreateNumber(val.GetCastAs()));
+ } else if (val.GetLogicalType().TypeId() == GS_TYPE_BOOLEAN) {
+ cJSON_AddItemToObject(jrow, field_name, cJSON_CreateBool(val.GetCastAs()));
+ } else {
+ cJSON_AddItemToObject(jrow, field_name, cJSON_CreateString(val.ToString().c_str()));
+ }
+}
+
+static auto RecordToJSONRow(const std::vector &headers, const Record &record) -> std::string {
+ cJSON *jrow = cJSON_CreateObject();
+ for (size_t j = 0; j < headers.size(); ++j) {
+ const auto &header = headers[j];
+ const auto &v = record.FieldRef(j);
+ AddJsonValue(jrow, header.c_str(), v);
+ }
+ char *str = cJSON_PrintUnformatted(jrow);
+ std::string json_row = str;
+ // 注意释放内存,使用方法参考cJSON测试用例,
+ // 需要保证前面不会抛出异常,否则会导致free语句执行不到
+ // 最好使用RAII的方法保证内存能够被正确释放
+ free(str);
+ cJSON_Delete(jrow);
+ return json_row;
+}
+
+auto ClassCmd::PrintJSONFormat(RecordIterator &record_iterator) -> void {
+ auto headers = record_iterator.GetHeader();
+ print_delegate_->Print("[\n");
+ bool first = true;
+ while (true) {
+ const auto &[record, eof] = record_iterator.Next();
+ if (!eof && !first) {
+ print_delegate_->Print(",\n");
+ }
+ if (eof) {
+ print_delegate_->Print("\n");
+ break;
+ }
+ first = false;
+ print_delegate_->Print(RecordToJSONRow(headers, record));
+ }
+ print_delegate_->Print("]\n");
+}
+
+auto ClassCmd::IsFinishQuery(const std::string &query) -> bool {
+ return (query.length() > 0 && (query[query.length() - 1] == ';' || query[0] == '.'));
+}
diff --git a/tools/intarkdb_cli/cmd.h b/tools/intarkdb_cli/cmd.h
new file mode 100644
index 0000000000000000000000000000000000000000..5f2568f2775dd49ad135469843c5e50f37a22d0d
--- /dev/null
+++ b/tools/intarkdb_cli/cmd.h
@@ -0,0 +1,186 @@
+/*
+ * Copyright (c) GBA-NCTI-ISDC. 2022-2024.
+ *
+ * openGauss embedded is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ *
+ * http://license.coscl.org.cn/MulanPSL2
+ *
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FITFOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * -------------------------------------------------------------------------
+ *
+ * cmd.h
+ *
+ * IDENTIFICATION
+ * openGauss-embedded/tools/intarkdb_cli/cmd.h
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include
+
+#include
+#include
+#include