From ccd306c2f0ed9c02ea432750f589983aa09d0ce5 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=9F=8E=E5=BA=9C?= <54766209@qq.com>
Date: Thu, 17 Nov 2022 22:36:26 +0800
Subject: [PATCH 1/2] =?UTF-8?q?=E6=9B=B4=E6=94=B9kafkareader=E5=90=8D?=
=?UTF-8?q?=E7=A7=B0=E9=97=AE=E9=A2=98=EF=BC=8Cmodule=E4=B8=ADkafkawriter?=
=?UTF-8?q?=E5=BC=95=E7=94=A8=E9=87=8D=E5=A4=8D?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
{kafkreader => kafkareader}/pom.xml | 2 +-
.../src/main/java/www/larkmidtable/com/KafkaReader.java | 0
pom.xml | 3 +--
3 files changed, 2 insertions(+), 3 deletions(-)
rename {kafkreader => kafkareader}/pom.xml (96%)
rename {kafkreader => kafkareader}/src/main/java/www/larkmidtable/com/KafkaReader.java (100%)
diff --git a/kafkreader/pom.xml b/kafkareader/pom.xml
similarity index 96%
rename from kafkreader/pom.xml
rename to kafkareader/pom.xml
index 9096c97..f034919 100644
--- a/kafkreader/pom.xml
+++ b/kafkareader/pom.xml
@@ -5,7 +5,7 @@
4.0.0
kafkreader
- kafkreader
+ kafkareader
honghu
www.larkmidtable.com
diff --git a/kafkreader/src/main/java/www/larkmidtable/com/KafkaReader.java b/kafkareader/src/main/java/www/larkmidtable/com/KafkaReader.java
similarity index 100%
rename from kafkreader/src/main/java/www/larkmidtable/com/KafkaReader.java
rename to kafkareader/src/main/java/www/larkmidtable/com/KafkaReader.java
diff --git a/pom.xml b/pom.xml
index f083077..710413d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,8 +30,7 @@
starRocks-reader
starRocks-writer
- kafkreader
- kafkawriter
+ kafkareader
kafkawriter
dorisread
--
Gitee
From 8fe556ba5eb1bbbfd4d7d06aba457985408e8af8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=9F=8E=E5=BA=9C?= <54766209@qq.com>
Date: Thu, 17 Nov 2022 23:06:44 +0800
Subject: [PATCH 2/2] =?UTF-8?q?=E4=BA=BA=E5=A4=A7=E9=87=91=E4=BB=93=20King?=
=?UTF-8?q?Base=E8=AF=BB=E5=86=99=E7=BB=84=E4=BB=B6=E7=BC=96=E5=86=99?=
=?UTF-8?q?=E5=AE=8C=E6=88=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../www/larkmidtable/com/util/DBType.java | 5 ++
kingbasereader/pom.xml | 57 +++++++++++++
.../www/larkmidtable/com/KingBaseReader.java | 80 +++++++++++++++++++
kingbasewriter/pom.xml | 53 ++++++++++++
.../www/larkmidtable/com/KingBaseWriter.java | 74 +++++++++++++++++
pom.xml | 2 +
6 files changed, 271 insertions(+)
create mode 100644 kingbasereader/pom.xml
create mode 100644 kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java
create mode 100644 kingbasewriter/pom.xml
create mode 100644 kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java
diff --git a/common/src/main/java/www/larkmidtable/com/util/DBType.java b/common/src/main/java/www/larkmidtable/com/util/DBType.java
index 0b442c7..fff47b9 100644
--- a/common/src/main/java/www/larkmidtable/com/util/DBType.java
+++ b/common/src/main/java/www/larkmidtable/com/util/DBType.java
@@ -38,6 +38,11 @@ public enum DBType {
return "com.microsoft.sqlserver.jdbc.SQLServerDriver";
}
},
+ KingBase8{
+ public String getDriverClass(){//枚举对象实现抽象方法
+ return "com.kingbase8.Driver";
+ }
+ },
;
public abstract String getDriverClass();//定义抽象方法
diff --git a/kingbasereader/pom.xml b/kingbasereader/pom.xml
new file mode 100644
index 0000000..d822849
--- /dev/null
+++ b/kingbasereader/pom.xml
@@ -0,0 +1,57 @@
+
+
+
+ honghu
+ www.larkmidtable.com
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ www.larkmidtable.com
+ kingbasereader
+ 1.0-SNAPSHOT
+
+
+ 11
+ 11
+
+
+
+
+ www.larkmidtable.com
+ common
+ 1.0-SNAPSHOT
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+
+ com.kingbase8
+ kingbase8-jdbc-driver
+ 8.6.0
+ system
+ ${pom.basedir}/lib/kingbase8-8.6.0.jar
+
+
+
+
+
+
+ lib
+ /BOOT-INF/lib/
+
+ **/*.jar
+
+
+
+
+
\ No newline at end of file
diff --git a/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java b/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java
new file mode 100644
index 0000000..e180f8a
--- /dev/null
+++ b/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java
@@ -0,0 +1,80 @@
+package www.larkmidtable.com;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import www.larkmidtable.com.reader.Reader;
+import www.larkmidtable.com.util.DBType;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * @projectName: honghu
+ * @package: www.larkmidtable.com
+ * @className: KingBaseReader
+ * @author: qd.liu
+ * @description: 人大金仓读组件
+ * @date: 2022/11/17 21:09
+ * @version: 1.0
+ */
+public class KingBaseReader extends Reader {
+ private Connection connection;
+ private PreparedStatement statement;
+ private static Logger logger = LoggerFactory.getLogger(KingBaseReader.class);
+
+ @Override
+ public void open() {
+ try {
+ logger.info("KingBase的Reader建立连接开始....");
+ Class.forName(DBType.DM.getDriverClass());
+ connection = DriverManager
+ .getConnection("jdbc:kingbase8://127.0.0.1:54321/test", "system", "123456");
+ logger.info("KingBase的Reader建立连接结束....");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Queue> startRead(String[] inputSplits) {
+ logger.info("KingBase读取数据操作....");
+ try {
+ List records = new ArrayList<>();
+ String sql = "select * from student";
+ statement = connection.prepareCall(sql);
+ ResultSet resultSet = statement.executeQuery();
+ while (resultSet.next()) {
+ String name = resultSet.getString("name");
+ records.add(name);
+ }
+ getQueue().add(records);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ logger.info("KingBase读取数据结束....");
+ return getQueue();
+ }
+
+
+ @Override
+ public String[] createInputSplits() {
+ logger.info("KingBase的Reader开始进行分片开始....");
+ logger.info("Kingbase的Reader开始进行分片结束....");
+ return new String[5];
+ }
+
+ @Override
+ public void close() {
+ try {
+ logger.info("KingBase的Reader开始进行关闭连接开始....");
+ statement.close();
+ connection.close();
+ logger.info("KingBase的Reader开始进行关闭连接结束....");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ }
+}
diff --git a/kingbasewriter/pom.xml b/kingbasewriter/pom.xml
new file mode 100644
index 0000000..b3bc640
--- /dev/null
+++ b/kingbasewriter/pom.xml
@@ -0,0 +1,53 @@
+
+
+
+ honghu
+ www.larkmidtable.com
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ www.larkmidtable.com
+ kingbasewriter
+ 1.0-SNAPSHOT
+
+
+
+
+ www.larkmidtable.com
+ common
+ 1.0-SNAPSHOT
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+
+ com.kingbase8
+ kingbase8-jdbc-driver
+ 8.6.0
+ system
+ ${pom.basedir}/lib/kingbase8-8.6.0.jar
+
+
+
+
+
+
+ lib
+ /BOOT-INF/lib/
+
+ **/*.jar
+
+
+
+
+
\ No newline at end of file
diff --git a/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java b/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java
new file mode 100644
index 0000000..5253de6
--- /dev/null
+++ b/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java
@@ -0,0 +1,74 @@
+package www.larkmidtable.com;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import www.larkmidtable.com.util.DBType;
+import www.larkmidtable.com.writer.Writer;
+
+import java.sql.*;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * @projectName: honghu
+ * @package: www.larkmidtable.com
+ * @className: KingBaseReader
+ * @author: qd.liu
+ * @description: 人大金仓读组件
+ * @date: 2022/11/17 22:09
+ * @version: 1.0
+ */
+public class KingBaseWriter extends Writer {
+ private Connection connection ;
+ private PreparedStatement statement ;
+ private static Logger logger = LoggerFactory.getLogger(KingBaseWriter.class);
+ @Override
+ public void open() {
+ try {
+ logger.info("KingBase的Writer建立连接开始....");
+ Class.forName(DBType.KingBase8.getDriverClass());
+ connection = DriverManager
+ .getConnection("jdbc:kingbase8://127.0.0.1:54321/test", "system", "123456");
+ connection.setAutoCommit(false);
+ logger.info("KingBase的Writer建立连接结束....");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void startWrite(Queue> queue) {
+ logger.info("开始写数据....");
+ List poll = queue.poll();
+ String sql = "insert into student(id,name) values (?,?)";
+ try {
+ statement = connection.prepareStatement(sql); // 批量插入时ps对象必须放到for循环外面
+ for (int i = 0; i < poll.size(); i++) {
+ statement.setString(1, "mary_" + i);
+ statement.setString(2, poll.get(i));
+ statement.addBatch();
+ if (i % 10000 == 0) {
+ statement.executeBatch();
+ connection.commit();
+ statement.clearBatch();
+ }
+ }
+ statement.executeBatch();
+ }catch (Exception e) {
+ e.printStackTrace();
+ }
+ logger.info("写数据完成....");
+ }
+
+ @Override
+ public void close() {
+ try {
+ logger.info("KingBase的Writer开始进行关闭连接开始....");
+ statement.close();
+ connection.close();
+ logger.info("KingBase的Writer开始进行关闭连接结束....");
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 710413d..ba2e6ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,6 +33,8 @@
kafkareader
kafkawriter
dorisread
+ kingbasereader
+ kingbasewriter
--
Gitee