From a66fa001a8bb0bfba904d81d065860be60e5057e Mon Sep 17 00:00:00 2001 From: zhaowd Date: Tue, 15 Nov 2022 12:40:57 +0800 Subject: [PATCH] =?UTF-8?q?oracle=E4=BB=A3=E7=A0=81=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../www/larkmidtable/com/util/DBType.java | 5 ++ .../www/larkmidtable/com/util/DBUtil.java | 78 +++++++++++++++-- core/pom.xml | 25 ++++++ core/src/main/java/HongHuOracleStart.java | 11 +-- oraclereader/pom.xml | 26 +++++- .../com/reader/oraclereader/OracleReader.java | 83 +++++++++++++++++-- oraclewriter/pom.xml | 23 ++++- .../com/writer/oraclewriter/OracleWriter.java | 77 +++++++++++++++-- pom.xml | 2 + 9 files changed, 296 insertions(+), 34 deletions(-) diff --git a/common/src/main/java/www/larkmidtable/com/util/DBType.java b/common/src/main/java/www/larkmidtable/com/util/DBType.java index 3d75c96..828578e 100644 --- a/common/src/main/java/www/larkmidtable/com/util/DBType.java +++ b/common/src/main/java/www/larkmidtable/com/util/DBType.java @@ -12,6 +12,11 @@ public enum DBType { public String getDriverClass(){//枚举对象实现抽象方法 return "com.mysql.jdbc.Driver"; } + }, + Oracle{ + public String getDriverClass(){//枚举对象实现抽象方法 + return "oracle.jdbc.OracleDriver"; + } }; public abstract String getDriverClass();//定义抽象方法 diff --git a/common/src/main/java/www/larkmidtable/com/util/DBUtil.java b/common/src/main/java/www/larkmidtable/com/util/DBUtil.java index ae4ce54..baee290 100644 --- a/common/src/main/java/www/larkmidtable/com/util/DBUtil.java +++ b/common/src/main/java/www/larkmidtable/com/util/DBUtil.java @@ -1,8 +1,72 @@ -package www.larkmidtable.com.util;/** - * @title: DBUtil - * @projectName honghu - * @description: TODO - * @author stave_zhao - * @date 2022/11/1509:13 - */public class DBUtil { +package www.larkmidtable.com.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; + +/** + * @author stave_zhao + * @title: DBUtil + * @projectName honghu + * @description: 数据库连接工具类 + * @date 2022/11/1509:13 + */ +public class DBUtil { + + private static final Logger LOG = LoggerFactory.getLogger(DBUtil.class); + + + /** + * 获取数据库连接 + * @param databaseDriver 驱动类型(根据数据库类型选择) + * @param jdbcUrl 数据库连接信息 + * @param username 用户名 + * @param password 密码 + * @return + */ + public static Connection getConnection(String databaseDriver,String jdbcUrl, String username, String password) throws SQLException { + // 连接数据库 + return DriverManager.getConnection(jdbcUrl, username,password); + } + + /** + * 关闭连接 + * @param stmt + * @param conn + */ + public static void close(PreparedStatement stmt,Connection conn) { + close(stmt,conn,null); + } + /** + * 关闭连接 + * @param stmt + * @param conn + * @param rs + */ + public static void close(PreparedStatement stmt,Connection conn,ResultSet rs) { + if(stmt!=null) { + try { + stmt.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + if(conn!=null) { + try { + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + if(rs!=null) { + try { + rs.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } + + } diff --git a/core/pom.xml b/core/pom.xml index 62c2233..06723ee 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -47,6 +47,31 @@ + + + + www.larkmidtable.com + oraclereader + 1.0-SNAPSHOT + + + slf4j-log4j12 + org.slf4j + + + + + www.larkmidtable.com + oraclewriter + 1.0-SNAPSHOT + + + slf4j-log4j12 + org.slf4j + + + + diff --git a/core/src/main/java/HongHuOracleStart.java b/core/src/main/java/HongHuOracleStart.java index 89b8fee..8e5e50f 100644 --- a/core/src/main/java/HongHuOracleStart.java +++ b/core/src/main/java/HongHuOracleStart.java @@ -5,11 +5,12 @@ import org.apache.commons.cli.ParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import www.larkmidtable.com.MySQLReader; - import www.larkmidtable.com.MySQLWriter; import www.larkmidtable.com.channel.Channel; import www.larkmidtable.com.reader.Reader; +import www.larkmidtable.com.reader.oraclereader.OracleReader; import www.larkmidtable.com.writer.Writer; +import www.larkmidtable.com.writer.oraclewriter.OracleWriter; /** @@ -18,9 +19,9 @@ import www.larkmidtable.com.writer.Writer; * @Date: 2022/11/10 14:28 * @Description: **/ -public class HongHuStart { +public class HongHuOracleStart { // 定义日志对象 - private static Logger logger = LoggerFactory.getLogger(HongHuStart.class); + private static Logger logger = LoggerFactory.getLogger(HongHuOracleStart.class); // 程序的入口类 public static void main(String[] args) throws ParseException { @@ -33,9 +34,9 @@ public class HongHuStart { String jobName = cl.getOptionValue("job"); logger.info("传递的参数:{} ", jobName); // 2.创建Reader - Reader reader = new MySQLReader(); + Reader reader = new OracleReader(); // 3.创建Writer - Writer writer = new MySQLWriter(); + Writer writer = new OracleWriter(); // 4.Channel Channel channel = new Channel(); channel.channel(reader,writer); diff --git a/oraclereader/pom.xml b/oraclereader/pom.xml index 294c44d..3bd69e2 100644 --- a/oraclereader/pom.xml +++ b/oraclereader/pom.xml @@ -11,9 +11,27 @@ oraclereader - - 11 - 11 - + + + www.larkmidtable.com + common + 1.0-SNAPSHOT + + + slf4j-log4j12 + org.slf4j + + + + + + com.oracle + ojdbc6 + 11.2.0.3 + + + + + diff --git a/oraclereader/src/main/java/www/larkmidtable/com/reader/oraclereader/OracleReader.java b/oraclereader/src/main/java/www/larkmidtable/com/reader/oraclereader/OracleReader.java index dcfa321..06df1e0 100644 --- a/oraclereader/src/main/java/www/larkmidtable/com/reader/oraclereader/OracleReader.java +++ b/oraclereader/src/main/java/www/larkmidtable/com/reader/oraclereader/OracleReader.java @@ -1,8 +1,77 @@ -package www.larkmidtable.com.reader.oraclereader;/** - * @title: OracleReader - * @projectName honghu - * @description: TODO - * @author stave_zhao - * @date 2022/11/1509:10 - */public class OracleReader { +package www.larkmidtable.com.reader.oraclereader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import www.larkmidtable.com.reader.Reader; +import www.larkmidtable.com.util.DBType; +import www.larkmidtable.com.util.DBUtil; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; + +/** + * @author stave_zhao + * @title: OracleReader + * @projectName honghu + * @description: oracle读数据 工具类 + * @date 2022/11/1509:10 + */ +public class OracleReader extends Reader { + private Connection connection ; + private PreparedStatement statement ; + private static Logger logger = LoggerFactory.getLogger(OracleReader.class); + + @Override + public void open() { + logger.info("Oracle的Reader建立连接开始...."); + + try { + String url="jdbc:oracle:thin:@localhost:1521:orcl"; + String username="root"; + String password="12345678"; + DBUtil.getConnection(DBType.Oracle.getDriverClass(),url,username,password); + } catch (SQLException e) { + e.printStackTrace(); + } + logger.info("Oracle的Reader建立连接结束...."); + } + + @Override + public Queue> startRead(String[] inputSplits) { + logger.info("Oracle读取数据操作...."); + try { + List records = new ArrayList<>(); + String sql = "select * from student"; + statement = connection.prepareCall(sql); + ResultSet resultSet = statement.executeQuery(); + while (resultSet.next()) { + String name = resultSet.getString("name"); + records.add(name); + } + getQueue().add(records); + }catch (Exception e){ + e.printStackTrace(); + } + logger.info("Oracle读取数据结束...."); + return getQueue(); + } + + @Override + public String[] createInputSplits() { + logger.info("Oracle的Reader开始进行分片开始...."); + logger.info("Oracle的Reader开始进行分片结束...."); + return new String[5]; + } + + @Override + public void close() { + logger.info("Oracle的Reader开始进行关闭连接开始...."); + DBUtil.close(statement,connection); + logger.info("Oracle的Reader开始进行关闭连接结束...."); + } } diff --git a/oraclewriter/pom.xml b/oraclewriter/pom.xml index 0129204..06ad883 100644 --- a/oraclewriter/pom.xml +++ b/oraclewriter/pom.xml @@ -11,9 +11,24 @@ oraclewriter - - 11 - 11 - + + + www.larkmidtable.com + common + 1.0-SNAPSHOT + + + slf4j-log4j12 + org.slf4j + + + + + + com.oracle + ojdbc6 + 11.2.0.3 + + diff --git a/oraclewriter/src/main/java/www/larkmidtable/com/writer/oraclewriter/OracleWriter.java b/oraclewriter/src/main/java/www/larkmidtable/com/writer/oraclewriter/OracleWriter.java index 561ebc3..b6fedf1 100644 --- a/oraclewriter/src/main/java/www/larkmidtable/com/writer/oraclewriter/OracleWriter.java +++ b/oraclewriter/src/main/java/www/larkmidtable/com/writer/oraclewriter/OracleWriter.java @@ -1,8 +1,71 @@ -package www.larkmidtable.com.writer.oraclewriter;/** - * @title: OracleWriter - * @projectName honghu - * @description: TODO - * @author stave_zhao - * @date 2022/11/1509:12 - */public class OracleWriter { +package www.larkmidtable.com.writer.oraclewriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import www.larkmidtable.com.util.DBType; +import www.larkmidtable.com.util.DBUtil; +import www.larkmidtable.com.writer.Writer; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.List; +import java.util.Queue; + +/** + * @author stave_zhao + * @title: OracleWriter + * @projectName honghu + * @description: oracle 写数据工具类 + * @date 2022/11/1509:12 + */ +public class OracleWriter extends Writer { + private Connection connection ; + private PreparedStatement statement ; + private static Logger logger = LoggerFactory.getLogger(OracleWriter.class); + @Override + public void open() { + logger.info("Oracle的Writer建立连接开始...."); + try { + String url="jdbc:oracle:thin:@localhost:1521:orcl"; + String username="root"; + String password="12345678"; + DBUtil.getConnection(DBType.Oracle.getDriverClass(),url,username,password); + } catch (SQLException e) { + e.printStackTrace(); + } + logger.info("Oracle的Writer建立连接结束...."); + } + + @Override + public void startWrite(Queue> queue) { + logger.info("Oracle开始写数据...."); + List poll = queue.poll(); + String sql = "insert into student(id,name) values (?,?)"; + try { + // 批量插入时ps对象必须放到for循环外面 + statement = connection.prepareStatement(sql); + for (int i = 0; i < poll.size(); i++) { + statement.setString(1, "mary_" + i); + statement.setString(2, poll.get(i)); + statement.addBatch(); + if (i % 10000 == 0) { + statement.executeBatch(); + connection.commit(); + statement.clearBatch(); + } + } + statement.executeBatch(); + }catch (Exception e) { + e.printStackTrace(); + } + logger.info("Oracle写数据完成...."); + } + + @Override + public void close() { + logger.info("Oracle的Writter开始进行关闭连接开始...."); + DBUtil.close(statement,connection); + logger.info("Oracle的Writer开始进行关闭连接结束...."); + } } diff --git a/pom.xml b/pom.xml index 3e02470..c2e1684 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,8 @@ core mysqlreader mysqlwriter + oraclereader + oraclewriter -- Gitee