diff --git a/common/src/main/java/www/larkmidtable/com/util/DBType.java b/common/src/main/java/www/larkmidtable/com/util/DBType.java
index 0b442c778c31bddd412581b40317949824fa9240..fff47b96b0aa326223cf5e873f76c6058aade6e0 100644
--- a/common/src/main/java/www/larkmidtable/com/util/DBType.java
+++ b/common/src/main/java/www/larkmidtable/com/util/DBType.java
@@ -38,6 +38,11 @@ public enum DBType {
return "com.microsoft.sqlserver.jdbc.SQLServerDriver";
}
},
+ KingBase8{
+ public String getDriverClass(){//枚举对象实现抽象方法
+ return "com.kingbase8.Driver";
+ }
+ },
;
public abstract String getDriverClass();//定义抽象方法
diff --git a/kafkreader/pom.xml b/kafkareader/pom.xml
similarity index 96%
rename from kafkreader/pom.xml
rename to kafkareader/pom.xml
index 9096c973df9e352c622866fef8543fe69672fee6..f034919005b6716bb7430e1606e68840d54aa1be 100644
--- a/kafkreader/pom.xml
+++ b/kafkareader/pom.xml
@@ -5,7 +5,7 @@
4.0.0
kafkreader
- kafkreader
+ kafkareader
honghu
www.larkmidtable.com
diff --git a/kafkreader/src/main/java/www/larkmidtable/com/KafkaReader.java b/kafkareader/src/main/java/www/larkmidtable/com/KafkaReader.java
similarity index 100%
rename from kafkreader/src/main/java/www/larkmidtable/com/KafkaReader.java
rename to kafkareader/src/main/java/www/larkmidtable/com/KafkaReader.java
diff --git a/kingbasereader/pom.xml b/kingbasereader/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..d822849b7483c2076d535659b38ea0cffdcc3eab
--- /dev/null
+++ b/kingbasereader/pom.xml
@@ -0,0 +1,57 @@
+
+
+
+ honghu
+ www.larkmidtable.com
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ www.larkmidtable.com
+ kingbasereader
+ 1.0-SNAPSHOT
+
+
+ 11
+ 11
+
+
+
+
+ www.larkmidtable.com
+ common
+ 1.0-SNAPSHOT
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+
+ com.kingbase8
+ kingbase8-jdbc-driver
+ 8.6.0
+ system
+ ${pom.basedir}/lib/kingbase8-8.6.0.jar
+
+
+
+
+
+
+ lib
+ /BOOT-INF/lib/
+
+ **/*.jar
+
+
+
+
+
\ No newline at end of file
diff --git a/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java b/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java
new file mode 100644
index 0000000000000000000000000000000000000000..e180f8af0e194d93dafbc597d50266f9c94d3477
--- /dev/null
+++ b/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java
@@ -0,0 +1,80 @@
+package www.larkmidtable.com;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import www.larkmidtable.com.reader.Reader;
+import www.larkmidtable.com.util.DBType;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * @projectName: honghu
+ * @package: www.larkmidtable.com
+ * @className: KingBaseReader
+ * @author: qd.liu
+ * @description: 人大金仓读组件
+ * @date: 2022/11/17 21:09
+ * @version: 1.0
+ */
+public class KingBaseReader extends Reader {
+ private Connection connection;
+ private PreparedStatement statement;
+ private static Logger logger = LoggerFactory.getLogger(KingBaseReader.class);
+
+ @Override
+ public void open() {
+ try {
+ logger.info("KingBase的Reader建立连接开始....");
+ Class.forName(DBType.DM.getDriverClass());
+ connection = DriverManager
+ .getConnection("jdbc:kingbase8://127.0.0.1:54321/test", "system", "123456");
+ logger.info("KingBase的Reader建立连接结束....");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Queue> startRead(String[] inputSplits) {
+ logger.info("KingBase读取数据操作....");
+ try {
+ List records = new ArrayList<>();
+ String sql = "select * from student";
+ statement = connection.prepareCall(sql);
+ ResultSet resultSet = statement.executeQuery();
+ while (resultSet.next()) {
+ String name = resultSet.getString("name");
+ records.add(name);
+ }
+ getQueue().add(records);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ logger.info("KingBase读取数据结束....");
+ return getQueue();
+ }
+
+
+ @Override
+ public String[] createInputSplits() {
+ logger.info("KingBase的Reader开始进行分片开始....");
+ logger.info("Kingbase的Reader开始进行分片结束....");
+ return new String[5];
+ }
+
+ @Override
+ public void close() {
+ try {
+ logger.info("KingBase的Reader开始进行关闭连接开始....");
+ statement.close();
+ connection.close();
+ logger.info("KingBase的Reader开始进行关闭连接结束....");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ }
+}
diff --git a/kingbasewriter/pom.xml b/kingbasewriter/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..b3bc64040d025c4f355d15aad9356c0a9c685fbb
--- /dev/null
+++ b/kingbasewriter/pom.xml
@@ -0,0 +1,53 @@
+
+
+
+ honghu
+ www.larkmidtable.com
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ www.larkmidtable.com
+ kingbasewriter
+ 1.0-SNAPSHOT
+
+
+
+
+ www.larkmidtable.com
+ common
+ 1.0-SNAPSHOT
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+
+ com.kingbase8
+ kingbase8-jdbc-driver
+ 8.6.0
+ system
+ ${pom.basedir}/lib/kingbase8-8.6.0.jar
+
+
+
+
+
+
+ lib
+ /BOOT-INF/lib/
+
+ **/*.jar
+
+
+
+
+
\ No newline at end of file
diff --git a/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java b/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java
new file mode 100644
index 0000000000000000000000000000000000000000..5253de644108f45eb0f9426023d877f2583dbea3
--- /dev/null
+++ b/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java
@@ -0,0 +1,74 @@
+package www.larkmidtable.com;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import www.larkmidtable.com.util.DBType;
+import www.larkmidtable.com.writer.Writer;
+
+import java.sql.*;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * @projectName: honghu
+ * @package: www.larkmidtable.com
+ * @className: KingBaseReader
+ * @author: qd.liu
+ * @description: 人大金仓读组件
+ * @date: 2022/11/17 22:09
+ * @version: 1.0
+ */
+public class KingBaseWriter extends Writer {
+ private Connection connection ;
+ private PreparedStatement statement ;
+ private static Logger logger = LoggerFactory.getLogger(KingBaseWriter.class);
+ @Override
+ public void open() {
+ try {
+ logger.info("KingBase的Writer建立连接开始....");
+ Class.forName(DBType.KingBase8.getDriverClass());
+ connection = DriverManager
+ .getConnection("jdbc:kingbase8://127.0.0.1:54321/test", "system", "123456");
+ connection.setAutoCommit(false);
+ logger.info("KingBase的Writer建立连接结束....");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void startWrite(Queue> queue) {
+ logger.info("开始写数据....");
+ List poll = queue.poll();
+ String sql = "insert into student(id,name) values (?,?)";
+ try {
+ statement = connection.prepareStatement(sql); // 批量插入时ps对象必须放到for循环外面
+ for (int i = 0; i < poll.size(); i++) {
+ statement.setString(1, "mary_" + i);
+ statement.setString(2, poll.get(i));
+ statement.addBatch();
+ if (i % 10000 == 0) {
+ statement.executeBatch();
+ connection.commit();
+ statement.clearBatch();
+ }
+ }
+ statement.executeBatch();
+ }catch (Exception e) {
+ e.printStackTrace();
+ }
+ logger.info("写数据完成....");
+ }
+
+ @Override
+ public void close() {
+ try {
+ logger.info("KingBase的Writer开始进行关闭连接开始....");
+ statement.close();
+ connection.close();
+ logger.info("KingBase的Writer开始进行关闭连接结束....");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index f0830771a99c2d6313c662b354f4f7ebe3b4c631..ba2e6ab6790eb47c52b2c070da0d3ffb2125102e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,10 +30,11 @@
starRocks-reader
starRocks-writer
- kafkreader
- kafkawriter
+ kafkareader
kafkawriter
dorisread
+ kingbasereader
+ kingbasewriter