diff --git a/common/src/main/java/www/larkmidtable/com/util/DBType.java b/common/src/main/java/www/larkmidtable/com/util/DBType.java
index 3d75c96566fb08286b2526cf67378c6046550fcc..828578e278ae225f56ccf8aad3e179c4a86a7e5d 100644
--- a/common/src/main/java/www/larkmidtable/com/util/DBType.java
+++ b/common/src/main/java/www/larkmidtable/com/util/DBType.java
@@ -12,6 +12,11 @@ public enum DBType {
public String getDriverClass(){//枚举对象实现抽象方法
return "com.mysql.jdbc.Driver";
}
+ },
+ Oracle{
+ public String getDriverClass(){//枚举对象实现抽象方法
+ return "oracle.jdbc.OracleDriver";
+ }
};
public abstract String getDriverClass();//定义抽象方法
diff --git a/common/src/main/java/www/larkmidtable/com/util/DBUtil.java b/common/src/main/java/www/larkmidtable/com/util/DBUtil.java
index ae4ce54ee6259215e095347e61cc41d360de96f4..baee290b336497de7cde5b09eeaae485d66d5dc2 100644
--- a/common/src/main/java/www/larkmidtable/com/util/DBUtil.java
+++ b/common/src/main/java/www/larkmidtable/com/util/DBUtil.java
@@ -1,8 +1,72 @@
-package www.larkmidtable.com.util;/**
- * @title: DBUtil
- * @projectName honghu
- * @description: TODO
- * @author stave_zhao
- * @date 2022/11/1509:13
- */public class DBUtil {
+package www.larkmidtable.com.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+
+/**
+ * @author stave_zhao
+ * @title: DBUtil
+ * @projectName honghu
+ * @description: 数据库连接工具类
+ * @date 2022/11/1509:13
+ */
+public class DBUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DBUtil.class);
+
+
+ /**
+ * 获取数据库连接
+ * @param databaseDriver 驱动类型(根据数据库类型选择)
+ * @param jdbcUrl 数据库连接信息
+ * @param username 用户名
+ * @param password 密码
+ * @return
+ */
+ public static Connection getConnection(String databaseDriver,String jdbcUrl, String username, String password) throws SQLException {
+ // 连接数据库
+ return DriverManager.getConnection(jdbcUrl, username,password);
+ }
+
+ /**
+ * 关闭连接
+ * @param stmt
+ * @param conn
+ */
+ public static void close(PreparedStatement stmt,Connection conn) {
+ close(stmt,conn,null);
+ }
+ /**
+ * 关闭连接
+ * @param stmt
+ * @param conn
+ * @param rs
+ */
+ public static void close(PreparedStatement stmt,Connection conn,ResultSet rs) {
+ if(stmt!=null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ if(conn!=null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ if(rs!=null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
}
diff --git a/core/pom.xml b/core/pom.xml
index 62c2233e8f4ef0280781d69a8fa6504259c5fd28..06723eec620fa67d056747d2b6f0b1719f934cfb 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -47,6 +47,31 @@
+
+
+
+ www.larkmidtable.com
+ oraclereader
+ 1.0-SNAPSHOT
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+ www.larkmidtable.com
+ oraclewriter
+ 1.0-SNAPSHOT
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
diff --git a/core/src/main/java/HongHuOracleStart.java b/core/src/main/java/HongHuOracleStart.java
index 89b8fee5735211b8b00873d12fedf865b66d14f1..8e5e50f7752b0fdf3014e35a4bf8bcd21df43675 100644
--- a/core/src/main/java/HongHuOracleStart.java
+++ b/core/src/main/java/HongHuOracleStart.java
@@ -5,11 +5,12 @@ import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import www.larkmidtable.com.MySQLReader;
-
import www.larkmidtable.com.MySQLWriter;
import www.larkmidtable.com.channel.Channel;
import www.larkmidtable.com.reader.Reader;
+import www.larkmidtable.com.reader.oraclereader.OracleReader;
import www.larkmidtable.com.writer.Writer;
+import www.larkmidtable.com.writer.oraclewriter.OracleWriter;
/**
@@ -18,9 +19,9 @@ import www.larkmidtable.com.writer.Writer;
* @Date: 2022/11/10 14:28
* @Description:
**/
-public class HongHuStart {
+public class HongHuOracleStart {
// 定义日志对象
- private static Logger logger = LoggerFactory.getLogger(HongHuStart.class);
+ private static Logger logger = LoggerFactory.getLogger(HongHuOracleStart.class);
// 程序的入口类
public static void main(String[] args) throws ParseException {
@@ -33,9 +34,9 @@ public class HongHuStart {
String jobName = cl.getOptionValue("job");
logger.info("传递的参数:{} ", jobName);
// 2.创建Reader
- Reader reader = new MySQLReader();
+ Reader reader = new OracleReader();
// 3.创建Writer
- Writer writer = new MySQLWriter();
+ Writer writer = new OracleWriter();
// 4.Channel
Channel channel = new Channel();
channel.channel(reader,writer);
diff --git a/oraclereader/pom.xml b/oraclereader/pom.xml
index 294c44d21b554b775df9359478d7b69865054c8b..3bd69e253f8e3c202dc1c5618d4a3f5a17761227 100644
--- a/oraclereader/pom.xml
+++ b/oraclereader/pom.xml
@@ -11,9 +11,27 @@
oraclereader
-
- 11
- 11
-
+
+
+ www.larkmidtable.com
+ common
+ 1.0-SNAPSHOT
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+
+ com.oracle
+ ojdbc6
+ 11.2.0.3
+
+
+
+
+
diff --git a/oraclereader/src/main/java/www/larkmidtable/com/reader/oraclereader/OracleReader.java b/oraclereader/src/main/java/www/larkmidtable/com/reader/oraclereader/OracleReader.java
index dcfa32122faa85212202b72757750f62deb16ea6..06df1e05b9480b0d8cc50e7dbf3583feda4584e8 100644
--- a/oraclereader/src/main/java/www/larkmidtable/com/reader/oraclereader/OracleReader.java
+++ b/oraclereader/src/main/java/www/larkmidtable/com/reader/oraclereader/OracleReader.java
@@ -1,8 +1,77 @@
-package www.larkmidtable.com.reader.oraclereader;/**
- * @title: OracleReader
- * @projectName honghu
- * @description: TODO
- * @author stave_zhao
- * @date 2022/11/1509:10
- */public class OracleReader {
+package www.larkmidtable.com.reader.oraclereader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import www.larkmidtable.com.reader.Reader;
+import www.larkmidtable.com.util.DBType;
+import www.larkmidtable.com.util.DBUtil;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * @author stave_zhao
+ * @title: OracleReader
+ * @projectName honghu
+ * @description: oracle读数据 工具类
+ * @date 2022/11/1509:10
+ */
+public class OracleReader extends Reader {
+ private Connection connection ;
+ private PreparedStatement statement ;
+ private static Logger logger = LoggerFactory.getLogger(OracleReader.class);
+
+ @Override
+ public void open() {
+ logger.info("Oracle的Reader建立连接开始....");
+
+ try {
+ String url="jdbc:oracle:thin:@localhost:1521:orcl";
+ String username="root";
+ String password="12345678";
+ DBUtil.getConnection(DBType.Oracle.getDriverClass(),url,username,password);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ logger.info("Oracle的Reader建立连接结束....");
+ }
+
+ @Override
+ public Queue> startRead(String[] inputSplits) {
+ logger.info("Oracle读取数据操作....");
+ try {
+ List records = new ArrayList<>();
+ String sql = "select * from student";
+ statement = connection.prepareCall(sql);
+ ResultSet resultSet = statement.executeQuery();
+ while (resultSet.next()) {
+ String name = resultSet.getString("name");
+ records.add(name);
+ }
+ getQueue().add(records);
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ logger.info("Oracle读取数据结束....");
+ return getQueue();
+ }
+
+ @Override
+ public String[] createInputSplits() {
+ logger.info("Oracle的Reader开始进行分片开始....");
+ logger.info("Oracle的Reader开始进行分片结束....");
+ return new String[5];
+ }
+
+ @Override
+ public void close() {
+ logger.info("Oracle的Reader开始进行关闭连接开始....");
+ DBUtil.close(statement,connection);
+ logger.info("Oracle的Reader开始进行关闭连接结束....");
+ }
}
diff --git a/oraclewriter/pom.xml b/oraclewriter/pom.xml
index 01292041bbfe2aca9dda90b9d1e5c293dbfbdae9..06ad8837ba25c6e3c548500288978bcec1b0349b 100644
--- a/oraclewriter/pom.xml
+++ b/oraclewriter/pom.xml
@@ -11,9 +11,24 @@
oraclewriter
-
- 11
- 11
-
+
+
+ www.larkmidtable.com
+ common
+ 1.0-SNAPSHOT
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+
+ com.oracle
+ ojdbc6
+ 11.2.0.3
+
+
diff --git a/oraclewriter/src/main/java/www/larkmidtable/com/writer/oraclewriter/OracleWriter.java b/oraclewriter/src/main/java/www/larkmidtable/com/writer/oraclewriter/OracleWriter.java
index 561ebc36e1e3fe5983403728c334dcd22a6b69ec..b6fedf19a2b63b164ac519ffdf234ebbb65c9d66 100644
--- a/oraclewriter/src/main/java/www/larkmidtable/com/writer/oraclewriter/OracleWriter.java
+++ b/oraclewriter/src/main/java/www/larkmidtable/com/writer/oraclewriter/OracleWriter.java
@@ -1,8 +1,71 @@
-package www.larkmidtable.com.writer.oraclewriter;/**
- * @title: OracleWriter
- * @projectName honghu
- * @description: TODO
- * @author stave_zhao
- * @date 2022/11/1509:12
- */public class OracleWriter {
+package www.larkmidtable.com.writer.oraclewriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import www.larkmidtable.com.util.DBType;
+import www.larkmidtable.com.util.DBUtil;
+import www.larkmidtable.com.writer.Writer;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * @author stave_zhao
+ * @title: OracleWriter
+ * @projectName honghu
+ * @description: oracle 写数据工具类
+ * @date 2022/11/1509:12
+ */
+public class OracleWriter extends Writer {
+ private Connection connection ;
+ private PreparedStatement statement ;
+ private static Logger logger = LoggerFactory.getLogger(OracleWriter.class);
+ @Override
+ public void open() {
+ logger.info("Oracle的Writer建立连接开始....");
+ try {
+ String url="jdbc:oracle:thin:@localhost:1521:orcl";
+ String username="root";
+ String password="12345678";
+ DBUtil.getConnection(DBType.Oracle.getDriverClass(),url,username,password);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ logger.info("Oracle的Writer建立连接结束....");
+ }
+
+ @Override
+ public void startWrite(Queue> queue) {
+ logger.info("Oracle开始写数据....");
+ List poll = queue.poll();
+ String sql = "insert into student(id,name) values (?,?)";
+ try {
+ // 批量插入时ps对象必须放到for循环外面
+ statement = connection.prepareStatement(sql);
+ for (int i = 0; i < poll.size(); i++) {
+ statement.setString(1, "mary_" + i);
+ statement.setString(2, poll.get(i));
+ statement.addBatch();
+ if (i % 10000 == 0) {
+ statement.executeBatch();
+ connection.commit();
+ statement.clearBatch();
+ }
+ }
+ statement.executeBatch();
+ }catch (Exception e) {
+ e.printStackTrace();
+ }
+ logger.info("Oracle写数据完成....");
+ }
+
+ @Override
+ public void close() {
+ logger.info("Oracle的Writter开始进行关闭连接开始....");
+ DBUtil.close(statement,connection);
+ logger.info("Oracle的Writer开始进行关闭连接结束....");
+ }
}
diff --git a/pom.xml b/pom.xml
index 3e024700487a20ebc36954028e215da8509a7e9c..c2e16841081be43924a7cfd2cba23d3e91dc6030 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,6 +13,8 @@
core
mysqlreader
mysqlwriter
+ oraclereader
+ oraclewriter