diff --git a/common/src/main/java/www/larkmidtable/com/constant/ReaderPluginEnum.java b/common/src/main/java/www/larkmidtable/com/constant/ReaderPluginEnum.java index beafef19f841f69197535eafe74dcb3c5aa81e94..d262c3fd074a14a7b405b9740f69989592f5045b 100644 --- a/common/src/main/java/www/larkmidtable/com/constant/ReaderPluginEnum.java +++ b/common/src/main/java/www/larkmidtable/com/constant/ReaderPluginEnum.java @@ -7,6 +7,8 @@ import java.util.stream.Stream; * @author */ public enum ReaderPluginEnum { + //TODO 需补充插件种类 + MYSQLREADER("mysqlreader","www.larkmidtable.com.MySQLReader"), ORACLEREADER("oraclelreader","www.larkmidtable.com.reader.oraclereader.OracleReader"); private String name; diff --git a/common/src/main/java/www/larkmidtable/com/constant/WriterPluginEnum.java b/common/src/main/java/www/larkmidtable/com/constant/WriterPluginEnum.java index 9504b39f65d070ff8545a3def6aba07aa5981ffc..36f3835299eb401bf82344949f028b68bbf6696d 100644 --- a/common/src/main/java/www/larkmidtable/com/constant/WriterPluginEnum.java +++ b/common/src/main/java/www/larkmidtable/com/constant/WriterPluginEnum.java @@ -3,6 +3,7 @@ package www.larkmidtable.com.constant; import java.util.stream.Stream; public enum WriterPluginEnum { + MYSQLWRITER("mysqlwriter","www.larkmidtable.com.MySQLWriter"), ORACLEWRITER("oraclewriter","ww.larkmidtable.com.writer.oraclewriter.OracleWriter"); private String name; diff --git a/common/src/main/java/www/larkmidtable/com/reader/Reader.java b/common/src/main/java/www/larkmidtable/com/reader/Reader.java index 8ed20341a33cbee68557cc21c6d14b8141e8268d..530a57d4c1aad7078fb30cca5e0b09a1a4bb3dc9 100644 --- a/common/src/main/java/www/larkmidtable/com/reader/Reader.java +++ b/common/src/main/java/www/larkmidtable/com/reader/Reader.java @@ -17,7 +17,15 @@ import java.util.concurrent.LinkedBlockingQueue; public abstract class Reader { + protected ConfigBean configBean; + public ConfigBean getConfigBean() { + return configBean; + } + + public void setConfigBean(ConfigBean configBean) { + this.configBean = configBean; + } // 初始化操作 public abstract void open(); @@ -34,7 +42,9 @@ public abstract class Reader { public static Reader getReaderPlugin(String name, ConfigBean readerConfigBean) { try { - return (Reader) Class.forName(ReaderPluginEnum.getByName(name).getClassPath()).newInstance(); + Reader reader = (Reader) Class.forName(ReaderPluginEnum.getByName(readerConfigBean.getPlugin()).getClassPath()).newInstance(); + reader.setConfigBean(readerConfigBean); + return reader; } catch (Exception e) { throw new HongHuException("文件获取不到", e); } diff --git a/common/src/main/java/www/larkmidtable/com/writer/Writer.java b/common/src/main/java/www/larkmidtable/com/writer/Writer.java index 4b6d0a2134d0dee46ee1893b61fc0d9917637b08..e6681bccc728bde2385d068170b63be530db310b 100644 --- a/common/src/main/java/www/larkmidtable/com/writer/Writer.java +++ b/common/src/main/java/www/larkmidtable/com/writer/Writer.java @@ -15,9 +15,23 @@ import java.util.Queue; * @Description: **/ public abstract class Writer { - public static Writer getWriterPlugin(String name, ConfigBean readerConfigBean) { + + protected ConfigBean configBean; + + public ConfigBean getConfigBean() { + return configBean; + } + + public void setConfigBean(ConfigBean configBean) { + this.configBean = configBean; + } + + public static Writer getWriterPlugin(String name, ConfigBean writerConfigBean) { try { - return (Writer) Class.forName(WriterPluginEnum.getByName(name).getClassPath()).newInstance(); + Writer writer = (Writer) Class.forName(WriterPluginEnum.getByName(name).getClassPath()).newInstance(); + writer.setConfigBean(writerConfigBean); + writer.getClass(); + return writer; } catch (Exception e) { throw new HongHuException("文件获取不到", e); } diff --git a/mysqlreader/src/main/java/www/larkmidtable/com/MySQLReader.java b/mysqlreader/src/main/java/www/larkmidtable/com/MySQLReader.java index 653be9b0751745e5f38d377bca802811a7d42a92..c70d8a0413cb10eb2b73238b1180e79accb57716 100644 --- a/mysqlreader/src/main/java/www/larkmidtable/com/MySQLReader.java +++ b/mysqlreader/src/main/java/www/larkmidtable/com/MySQLReader.java @@ -24,14 +24,15 @@ public class MySQLReader extends Reader { private Connection connection ; private PreparedStatement statement ; private static Logger logger = LoggerFactory.getLogger(MySQLReader.class); + + @Override public void open() { try { logger.info("MySQL的Reader建立连接开始...."); Class.forName(DBType.MySql.getDriverClass()); connection = DriverManager - .getConnection("jdbc:mysql://localhost:3306/filedb?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC", - "root","root"); + .getConnection(configBean.getUrl(),configBean.getUsername(),configBean.getPassword()); logger.info("MySQL的Reader建立连接结束...."); } catch (Exception e) { e.printStackTrace(); @@ -43,11 +44,11 @@ public class MySQLReader extends Reader { logger.info("MySQL读取数据操作...."); try { List records = new ArrayList<>(); - String sql = "select * from student"; + String sql = String.format("select * from %s",configBean.getTable()); statement = connection.prepareCall(sql); ResultSet resultSet = statement.executeQuery(); while (resultSet.next()) { - String name = resultSet.getString("name"); + String name = resultSet.getString(configBean.getColumn()); records.add(name); } Channel.getQueue().add(records); diff --git a/mysqlwriter/src/main/java/www/larkmidtable/com/MySQLWriter.java b/mysqlwriter/src/main/java/www/larkmidtable/com/MySQLWriter.java index 65984cc815890f9643ac98211be1f948bed25ac5..0e98a87452fe757c36e27d976ac12af93bec9ead 100644 --- a/mysqlwriter/src/main/java/www/larkmidtable/com/MySQLWriter.java +++ b/mysqlwriter/src/main/java/www/larkmidtable/com/MySQLWriter.java @@ -29,8 +29,7 @@ public class MySQLWriter extends Writer { try { logger.info("MySQL的Writer建立连接开始...."); Class.forName(DBType.MySql.getDriverClass()); - connection = DriverManager - .getConnection("jdbc:mysql://localhost:3306/filedb?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC","root","root"); + connection = DriverManager.getConnection(configBean.getUrl(),configBean.getUsername(),configBean.getPassword()); connection.setAutoCommit(false); logger.info("MySQL的Writer建立连接结束...."); } catch (Exception e) { @@ -42,7 +41,7 @@ public class MySQLWriter extends Writer { public void startWrite() { logger.info("开始写数据...."); List poll = Channel.getQueue().poll(); - String sql = "insert into student(id,name) values (?,?)"; + String sql = String.format("insert into %s(%s) values (?,?)",configBean.getTable(),configBean.getColumn()); try { statement = connection.prepareStatement(sql); // 批量插入时ps对象必须放到for循环外面 for (int i = 0; i < poll.size(); i++) {