diff --git a/common/src/main/java/www/larkmidtable/com/util/DBType.java b/common/src/main/java/www/larkmidtable/com/util/DBType.java index 0b442c778c31bddd412581b40317949824fa9240..fff47b96b0aa326223cf5e873f76c6058aade6e0 100644 --- a/common/src/main/java/www/larkmidtable/com/util/DBType.java +++ b/common/src/main/java/www/larkmidtable/com/util/DBType.java @@ -38,6 +38,11 @@ public enum DBType { return "com.microsoft.sqlserver.jdbc.SQLServerDriver"; } }, + KingBase8{ + public String getDriverClass(){//枚举对象实现抽象方法 + return "com.kingbase8.Driver"; + } + }, ; public abstract String getDriverClass();//定义抽象方法 diff --git a/kafkreader/pom.xml b/kafkareader/pom.xml similarity index 96% rename from kafkreader/pom.xml rename to kafkareader/pom.xml index 9096c973df9e352c622866fef8543fe69672fee6..f034919005b6716bb7430e1606e68840d54aa1be 100644 --- a/kafkreader/pom.xml +++ b/kafkareader/pom.xml @@ -5,7 +5,7 @@ 4.0.0 kafkreader - kafkreader + kafkareader honghu www.larkmidtable.com diff --git a/kafkreader/src/main/java/www/larkmidtable/com/KafkaReader.java b/kafkareader/src/main/java/www/larkmidtable/com/KafkaReader.java similarity index 100% rename from kafkreader/src/main/java/www/larkmidtable/com/KafkaReader.java rename to kafkareader/src/main/java/www/larkmidtable/com/KafkaReader.java diff --git a/kingbasereader/pom.xml b/kingbasereader/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..d822849b7483c2076d535659b38ea0cffdcc3eab --- /dev/null +++ b/kingbasereader/pom.xml @@ -0,0 +1,57 @@ + + + + honghu + www.larkmidtable.com + 1.0-SNAPSHOT + + 4.0.0 + + www.larkmidtable.com + kingbasereader + 1.0-SNAPSHOT + + + 11 + 11 + + + + + www.larkmidtable.com + common + 1.0-SNAPSHOT + + + slf4j-log4j12 + org.slf4j + + + + + + com.kingbase8 + kingbase8-jdbc-driver + 8.6.0 + system + ${pom.basedir}/lib/kingbase8-8.6.0.jar + + + + + + + lib + /BOOT-INF/lib/ + + **/*.jar + + + + + \ No newline at end of file diff --git a/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java b/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java new file mode 100644 index 0000000000000000000000000000000000000000..e180f8af0e194d93dafbc597d50266f9c94d3477 --- /dev/null +++ b/kingbasereader/src/main/java/www/larkmidtable/com/KingBaseReader.java @@ -0,0 +1,80 @@ +package www.larkmidtable.com; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import www.larkmidtable.com.reader.Reader; +import www.larkmidtable.com.util.DBType; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; + +/** + * @projectName: honghu + * @package: www.larkmidtable.com + * @className: KingBaseReader + * @author: qd.liu + * @description: 人大金仓读组件 + * @date: 2022/11/17 21:09 + * @version: 1.0 + */ +public class KingBaseReader extends Reader { + private Connection connection; + private PreparedStatement statement; + private static Logger logger = LoggerFactory.getLogger(KingBaseReader.class); + + @Override + public void open() { + try { + logger.info("KingBase的Reader建立连接开始...."); + Class.forName(DBType.DM.getDriverClass()); + connection = DriverManager + .getConnection("jdbc:kingbase8://127.0.0.1:54321/test", "system", "123456"); + logger.info("KingBase的Reader建立连接结束...."); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Queue> startRead(String[] inputSplits) { + logger.info("KingBase读取数据操作...."); + try { + List records = new ArrayList<>(); + String sql = "select * from student"; + statement = connection.prepareCall(sql); + ResultSet resultSet = statement.executeQuery(); + while (resultSet.next()) { + String name = resultSet.getString("name"); + records.add(name); + } + getQueue().add(records); + } catch (Exception e) { + e.printStackTrace(); + } + logger.info("KingBase读取数据结束...."); + return getQueue(); + } + + + @Override + public String[] createInputSplits() { + logger.info("KingBase的Reader开始进行分片开始...."); + logger.info("Kingbase的Reader开始进行分片结束...."); + return new String[5]; + } + + @Override + public void close() { + try { + logger.info("KingBase的Reader开始进行关闭连接开始...."); + statement.close(); + connection.close(); + logger.info("KingBase的Reader开始进行关闭连接结束...."); + } catch (SQLException e) { + e.printStackTrace(); + } + + } +} diff --git a/kingbasewriter/pom.xml b/kingbasewriter/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..b3bc64040d025c4f355d15aad9356c0a9c685fbb --- /dev/null +++ b/kingbasewriter/pom.xml @@ -0,0 +1,53 @@ + + + + honghu + www.larkmidtable.com + 1.0-SNAPSHOT + + 4.0.0 + + www.larkmidtable.com + kingbasewriter + 1.0-SNAPSHOT + + + + + www.larkmidtable.com + common + 1.0-SNAPSHOT + + + slf4j-log4j12 + org.slf4j + + + + + + com.kingbase8 + kingbase8-jdbc-driver + 8.6.0 + system + ${pom.basedir}/lib/kingbase8-8.6.0.jar + + + + + + + lib + /BOOT-INF/lib/ + + **/*.jar + + + + + \ No newline at end of file diff --git a/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java b/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..5253de644108f45eb0f9426023d877f2583dbea3 --- /dev/null +++ b/kingbasewriter/src/main/java/www/larkmidtable/com/KingBaseWriter.java @@ -0,0 +1,74 @@ +package www.larkmidtable.com; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import www.larkmidtable.com.util.DBType; +import www.larkmidtable.com.writer.Writer; + +import java.sql.*; +import java.util.List; +import java.util.Queue; + +/** + * @projectName: honghu + * @package: www.larkmidtable.com + * @className: KingBaseReader + * @author: qd.liu + * @description: 人大金仓读组件 + * @date: 2022/11/17 22:09 + * @version: 1.0 + */ +public class KingBaseWriter extends Writer { + private Connection connection ; + private PreparedStatement statement ; + private static Logger logger = LoggerFactory.getLogger(KingBaseWriter.class); + @Override + public void open() { + try { + logger.info("KingBase的Writer建立连接开始...."); + Class.forName(DBType.KingBase8.getDriverClass()); + connection = DriverManager + .getConnection("jdbc:kingbase8://127.0.0.1:54321/test", "system", "123456"); + connection.setAutoCommit(false); + logger.info("KingBase的Writer建立连接结束...."); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void startWrite(Queue> queue) { + logger.info("开始写数据...."); + List poll = queue.poll(); + String sql = "insert into student(id,name) values (?,?)"; + try { + statement = connection.prepareStatement(sql); // 批量插入时ps对象必须放到for循环外面 + for (int i = 0; i < poll.size(); i++) { + statement.setString(1, "mary_" + i); + statement.setString(2, poll.get(i)); + statement.addBatch(); + if (i % 10000 == 0) { + statement.executeBatch(); + connection.commit(); + statement.clearBatch(); + } + } + statement.executeBatch(); + }catch (Exception e) { + e.printStackTrace(); + } + logger.info("写数据完成...."); + } + + @Override + public void close() { + try { + logger.info("KingBase的Writer开始进行关闭连接开始...."); + statement.close(); + connection.close(); + logger.info("KingBase的Writer开始进行关闭连接结束...."); + } catch (SQLException e) { + e.printStackTrace(); + } + } +} diff --git a/pom.xml b/pom.xml index f0830771a99c2d6313c662b354f4f7ebe3b4c631..ba2e6ab6790eb47c52b2c070da0d3ffb2125102e 100644 --- a/pom.xml +++ b/pom.xml @@ -30,10 +30,11 @@ starRocks-reader starRocks-writer - kafkreader - kafkawriter + kafkareader kafkawriter dorisread + kingbasereader + kingbasewriter