From ccd306c2f0ed9c02ea432750f589983aa09d0ce5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9F=8E=E5=BA=9C?= <54766209@qq.com> Date: Thu, 17 Nov 2022 22:36:26 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=9B=B4=E6=94=B9kafkareader=E5=90=8D?= =?UTF-8?q?=E7=A7=B0=E9=97=AE=E9=A2=98=EF=BC=8Cmodule=E4=B8=ADkafkawriter?= =?UTF-8?q?=E5=BC=95=E7=94=A8=E9=87=8D=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- {kafkreader => kafkareader}/pom.xml | 2 +- .../src/main/java/www/larkmidtable/com/KafkaReader.java | 0 pom.xml | 3 +-- 3 files changed, 2 insertions(+), 3 deletions(-) rename {kafkreader => kafkareader}/pom.xml (96%) rename {kafkreader => kafkareader}/src/main/java/www/larkmidtable/com/KafkaReader.java (100%) diff --git a/kafkreader/pom.xml b/kafkareader/pom.xml similarity index 96% rename from kafkreader/pom.xml rename to kafkareader/pom.xml index 9096c97..f034919 100644 --- a/kafkreader/pom.xml +++ b/kafkareader/pom.xml @@ -5,7 +5,7 @@ 4.0.0 kafkreader - kafkreader + kafkareader honghu www.larkmidtable.com diff --git a/kafkreader/src/main/java/www/larkmidtable/com/KafkaReader.java b/kafkareader/src/main/java/www/larkmidtable/com/KafkaReader.java similarity index 100% rename from kafkreader/src/main/java/www/larkmidtable/com/KafkaReader.java rename to kafkareader/src/main/java/www/larkmidtable/com/KafkaReader.java diff --git a/pom.xml b/pom.xml index f083077..710413d 100644 --- a/pom.xml +++ b/pom.xml @@ -30,8 +30,7 @@ starRocks-reader starRocks-writer - kafkreader - kafkawriter + kafkareader kafkawriter dorisread -- Gitee From 8fe556ba5eb1bbbfd4d7d06aba457985408e8af8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9F=8E=E5=BA=9C?= <54766209@qq.com> Date: Thu, 17 Nov 2022 23:06:44 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BA=BA=E5=A4=A7=E9=87=91=E4=BB=93=20King?= =?UTF-8?q?Base=E8=AF=BB=E5=86=99=E7=BB=84=E4=BB=B6=E7=BC=96=E5=86=99?= =?UTF-8?q?=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../www/larkmidtable/com/util/DBType.java | 5 ++ kingbasereader/pom.xml | 57 +++++++++++++ .../www/larkmidtable/com/KingBaseReader.java | 80 +++++++++++++++++++ kingbasewriter/pom.xml | 53 ++++++++++++ .../www/larkmidtable/com/KingBaseWriter.java | 74 +++++++++++++++++ pom.xml | 2 + 6 files changed, 271 insertions(+) create mode 100644 kingbasereader/pom.xml create mode 100644 kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java create mode 100644 kingbasewriter/pom.xml create mode 100644 kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java diff --git a/common/src/main/java/www/larkmidtable/com/util/DBType.java b/common/src/main/java/www/larkmidtable/com/util/DBType.java index 0b442c7..fff47b9 100644 --- a/common/src/main/java/www/larkmidtable/com/util/DBType.java +++ b/common/src/main/java/www/larkmidtable/com/util/DBType.java @@ -38,6 +38,11 @@ public enum DBType { return "com.microsoft.sqlserver.jdbc.SQLServerDriver"; } }, + KingBase8{ + public String getDriverClass(){//枚举对象实现抽象方法 + return "com.kingbase8.Driver"; + } + }, ; public abstract String getDriverClass();//定义抽象方法 diff --git a/kingbasereader/pom.xml b/kingbasereader/pom.xml new file mode 100644 index 0000000..d822849 --- /dev/null +++ b/kingbasereader/pom.xml @@ -0,0 +1,57 @@ + + + + honghu + www.larkmidtable.com + 1.0-SNAPSHOT + + 4.0.0 + + www.larkmidtable.com + kingbasereader + 1.0-SNAPSHOT + + + 11 + 11 + + + + + www.larkmidtable.com + common + 1.0-SNAPSHOT + + + slf4j-log4j12 + org.slf4j + + + + + + com.kingbase8 + kingbase8-jdbc-driver + 8.6.0 + system + ${pom.basedir}/lib/kingbase8-8.6.0.jar + + + + + + + lib + /BOOT-INF/lib/ + + **/*.jar + + + + + \ No newline at end of file diff --git a/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java b/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java new file mode 100644 index 0000000..e180f8a --- /dev/null +++ b/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java @@ -0,0 +1,80 @@ +package www.larkmidtable.com; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import www.larkmidtable.com.reader.Reader; +import www.larkmidtable.com.util.DBType; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; + +/** + * @projectName: honghu + * @package: www.larkmidtable.com + * @className: KingBaseReader + * @author: qd.liu + * @description: 人大金仓读组件 + * @date: 2022/11/17 21:09 + * @version: 1.0 + */ +public class KingBaseReader extends Reader { + private Connection connection; + private PreparedStatement statement; + private static Logger logger = LoggerFactory.getLogger(KingBaseReader.class); + + @Override + public void open() { + try { + logger.info("KingBase的Reader建立连接开始...."); + Class.forName(DBType.DM.getDriverClass()); + connection = DriverManager + .getConnection("jdbc:kingbase8://127.0.0.1:54321/test", "system", "123456"); + logger.info("KingBase的Reader建立连接结束...."); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Queue> startRead(String[] inputSplits) { + logger.info("KingBase读取数据操作...."); + try { + List records = new ArrayList<>(); + String sql = "select * from student"; + statement = connection.prepareCall(sql); + ResultSet resultSet = statement.executeQuery(); + while (resultSet.next()) { + String name = resultSet.getString("name"); + records.add(name); + } + getQueue().add(records); + } catch (Exception e) { + e.printStackTrace(); + } + logger.info("KingBase读取数据结束...."); + return getQueue(); + } + + + @Override + public String[] createInputSplits() { + logger.info("KingBase的Reader开始进行分片开始...."); + logger.info("Kingbase的Reader开始进行分片结束...."); + return new String[5]; + } + + @Override + public void close() { + try { + logger.info("KingBase的Reader开始进行关闭连接开始...."); + statement.close(); + connection.close(); + logger.info("KingBase的Reader开始进行关闭连接结束...."); + } catch (SQLException e) { + e.printStackTrace(); + } + + } +} diff --git a/kingbasewriter/pom.xml b/kingbasewriter/pom.xml new file mode 100644 index 0000000..b3bc640 --- /dev/null +++ b/kingbasewriter/pom.xml @@ -0,0 +1,53 @@ + + + + honghu + www.larkmidtable.com + 1.0-SNAPSHOT + + 4.0.0 + + www.larkmidtable.com + kingbasewriter + 1.0-SNAPSHOT + + + + + www.larkmidtable.com + common + 1.0-SNAPSHOT + + + slf4j-log4j12 + org.slf4j + + + + + + com.kingbase8 + kingbase8-jdbc-driver + 8.6.0 + system + ${pom.basedir}/lib/kingbase8-8.6.0.jar + + + + + + + lib + /BOOT-INF/lib/ + + **/*.jar + + + + + \ No newline at end of file diff --git a/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java b/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java new file mode 100644 index 0000000..5253de6 --- /dev/null +++ b/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java @@ -0,0 +1,74 @@ +package www.larkmidtable.com; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import www.larkmidtable.com.util.DBType; +import www.larkmidtable.com.writer.Writer; + +import java.sql.*; +import java.util.List; +import java.util.Queue; + +/** + * @projectName: honghu + * @package: www.larkmidtable.com + * @className: KingBaseReader + * @author: qd.liu + * @description: 人大金仓读组件 + * @date: 2022/11/17 22:09 + * @version: 1.0 + */ +public class KingBaseWriter extends Writer { + private Connection connection ; + private PreparedStatement statement ; + private static Logger logger = LoggerFactory.getLogger(KingBaseWriter.class); + @Override + public void open() { + try { + logger.info("KingBase的Writer建立连接开始...."); + Class.forName(DBType.KingBase8.getDriverClass()); + connection = DriverManager + .getConnection("jdbc:kingbase8://127.0.0.1:54321/test", "system", "123456"); + connection.setAutoCommit(false); + logger.info("KingBase的Writer建立连接结束...."); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void startWrite(Queue> queue) { + logger.info("开始写数据...."); + List poll = queue.poll(); + String sql = "insert into student(id,name) values (?,?)"; + try { + statement = connection.prepareStatement(sql); // 批量插入时ps对象必须放到for循环外面 + for (int i = 0; i < poll.size(); i++) { + statement.setString(1, "mary_" + i); + statement.setString(2, poll.get(i)); + statement.addBatch(); + if (i % 10000 == 0) { + statement.executeBatch(); + connection.commit(); + statement.clearBatch(); + } + } + statement.executeBatch(); + }catch (Exception e) { + e.printStackTrace(); + } + logger.info("写数据完成...."); + } + + @Override + public void close() { + try { + logger.info("KingBase的Writer开始进行关闭连接开始...."); + statement.close(); + connection.close(); + logger.info("KingBase的Writer开始进行关闭连接结束...."); + } catch (SQLException e) { + e.printStackTrace(); + } + } +} diff --git a/pom.xml b/pom.xml index 710413d..ba2e6ab 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,8 @@ kafkareader kafkawriter dorisread + kingbasereader + kingbasewriter -- Gitee