diff --git a/.gitignore b/.gitignore index c455e6caeb63c93d81aa8a20de3b3c4f38acde58..70785948429ec1698ae04940303f7803e00c6d43 100644 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,5 @@ src/note.md *.xlsx *.swp .editorconfig -.travis.yml \ No newline at end of file +.travis.yml +.factorypath diff --git a/pom.xml b/pom.xml index 64fe81ddba22af7c0a54234c3cdc5a3dc660a1e5..f3e1c7bff4c33843b72c97ab7955044508f02483 100644 --- a/pom.xml +++ b/pom.xml @@ -10,17 +10,17 @@ - maven_nexus @@ -70,6 +70,12 @@ runtime + + com.clickhouse + clickhouse-jdbc + 0.5.0 + + org.postgresql postgresql @@ -116,16 +122,22 @@ - com.oracle.jdbc + com.oracle.database.jdbc ojdbc8 19.3.0.0 + + + com.oracle.database.nls + orai18n + 19.3.0.0 + - com.ibm.db2.jcc - db2jcc4 - 10.1 + com.ibm.db2 + jcc + 11.1.4.4 runtime diff --git a/src/main/java/com/data/database/App.java b/src/main/java/com/data/database/App.java index 774f475d311c2f8cb4a770017a61989c9980f451..ca40128ad18ff0f4e971a977abc757d503a939af 100644 --- a/src/main/java/com/data/database/App.java +++ b/src/main/java/com/data/database/App.java @@ -2,7 +2,7 @@ package com.data.database; import java.sql.*; -import com.data.database.utils.Tools; +// import com.data.database.utils.Tools; import java.io.IOException; import com.data.database.api.impl.DataBaseSyncFactory; @@ -57,7 +57,6 @@ public final class App { return this.toTable; } - public boolean getIsSyncTableDDL(){ return this.isSyncTableDDL; } @@ -112,7 +111,6 @@ public final class App { return false; } - this.fromDBId = argList.get(0); this.fromSchema = argList.get(1); this.fromTable = argList.get(2); @@ -163,8 +161,7 @@ public final class App { * @throws SQLException */ - public static String getAlterSqlFromMetaData(String toType, String toSchema, String toTable, ResultSet fromColumnMeta, - ResultSet toColumnMeta, ColSizeTimes colSizeTimes) throws SQLException { + public static String getAlterSqlFromMetaData(String toType, String toSchema, String toTable, ResultSet fromColumnMeta, ResultSet toColumnMeta, ColSizeTimes colSizeTimes) throws SQLException { StringBuilder sb = new StringBuilder(); HashMap> fromColMap = new HashMap>(); @@ -227,7 +224,7 @@ public final class App { } } - } else if (colType == Types.DECIMAL) {// 处理小数字段类型 + } else if (colType == Types.DECIMAL || colType == Types.NUMERIC) {// 处理小数字段类型 Integer fromColLength = (Integer) fromMap.get("COLUMN_SIZE"); Integer fromColDigitLength = (Integer) fromMap.get("DECIMAL_DIGITS"); Integer toColLength = (Integer) toMap.get("COLUMN_SIZE"); @@ -237,9 +234,12 @@ public final class App { if ("db2".equals(toType)) { sb.append(String.format("alter table %s.%s alter column %s set data type decimal(%d,%d);", toSchema, toTable, toColName, fromColLength, fromColDigitLength)); - } else if ("mysql".equals(toType) || "oracle".equals(toType)) { + } else if ("mysql".equals(toType)) { sb.append(String.format("alter table %s.%s modify column %s decimal(%d,%d);", toSchema, toTable, toColName, fromColLength, fromColDigitLength)); + } else if ("oracle".equals(toType)) { + sb.append(String.format("alter table %s.%s modify %s number(%d,%d);", toSchema, toTable, + toColName, fromColLength, fromColDigitLength)); } else if ("postgres".equals(toType)) { sb.append(String.format("alter table %s.%s alter column %s type decimal(%d,%d);", toSchema, toTable, toColName, fromColLength, fromColDigitLength)); @@ -267,16 +267,35 @@ public final class App { switch (colType) { case Types.CHAR: case Types.VARCHAR: - sb.append(String.format("alter table %s.%s add column %s varchar(%d);", toSchema, toTable, - fromColName, fromColLength * colSizeTimes.getTimes())); + if ("oracle".equals(toType)) { + sb.append(String.format("alter table %s.%s add %s varchar2(%d);", toSchema, toTable, + fromColName, fromColLength * colSizeTimes.getTimes())); + } else { + sb.append(String.format("alter table %s.%s add column %s varchar(%d);", toSchema, toTable, + fromColName, fromColLength * colSizeTimes.getTimes())); + } break; case Types.DECIMAL: sb.append(String.format("alter table %s.%s add column %s decimal(%d,%d);", toSchema, toTable, fromColName, fromColLength, fromColDigitLength)); break; + case Types.NUMERIC: + if ("oracle".equals(toType)) { + sb.append(String.format("alter table %s.%s add %s number(%d,%d);", toSchema, toTable, + fromColName, fromColLength, fromColDigitLength)); + } else { + sb.append(String.format("alter table %s.%s add column %s numeric(%d,%d);", toSchema, toTable, + fromColName, fromColLength, fromColDigitLength)); + } + break; default: - sb.append(String.format("alter table %s.%s add column %s %s;", toSchema, toTable, fromColName, - typeName)); + if ("oracle".equals(toType)) { + sb.append(String.format("alter table %s.%s add %s %s;", toSchema, toTable, fromColName, + typeName)); + } else { + sb.append(String.format("alter table %s.%s add column %s %s;", toSchema, toTable, fromColName, + typeName)); + } } } @@ -345,6 +364,8 @@ public final class App { toColumnNames = toDataBase.getTableColumns(app.getToSchema(), app.getToTable()); } + logger.info(String.format("ColumnNames (%s)%s.%s %s", app.getToDbId(), app.getToSchema(), app.getToTable(), toColumnNames)); + if (app.getFromTableFields().size() > 0) { // 如果指定源列,则使用源列,不指定则使用目标列 fromColumnNames = app.getFromTableFields(); @@ -404,6 +425,9 @@ public final class App { toColumnNames = app.getToTableFields(); } else { toColumnNames = toDataBase.getTableColumns(app.getToSchema(),app.getToTable()); + if(toColumnNames.size() == 0) { + throw new RuntimeException(String.format("failed to get table columns from (%s)%s.%s. Please pay attention to the case sensitivity if the table exists or has just been created successfully.", app.getToDbId(), app.getToSchema(), app.getToTable())); + } } if (app.getFromTableFields().size() > 0) { diff --git a/src/main/java/com/data/database/api/impl/DataBaseSync.java b/src/main/java/com/data/database/api/impl/DataBaseSync.java index f0cbdc224c08ada6d66b75caf9df45349d65426d..93d16bda576daa7b60454718279d57ae2978afe4 100644 --- a/src/main/java/com/data/database/api/impl/DataBaseSync.java +++ b/src/main/java/com/data/database/api/impl/DataBaseSync.java @@ -4,7 +4,7 @@ import com.data.database.Config; import com.data.database.api.DataSync; import java.io.IOException; -import java.net.SecureCacheResponse; +// import java.net.SecureCacheResponse; import java.sql.Connection; import java.sql.SQLException; import java.sql.ResultSet; @@ -25,12 +25,10 @@ import com.data.database.api.MyEnum.ColSizeTimes; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import java.lang.reflect.Field; + public class DataBaseSync implements DataSync { - /* - * 获取一个表的列名,用于 readData 时指定一个表的字段,之所以不使用 * 是因为源数据库表的字段有可能新增, - * 也可以用于表结构的自动同步:当目标表的字段数少于源表时,对目标表自动执行 alter table add columns xxx - */ protected final String jdbcDriver; protected final String dbUrl; // 数据库的用户名与密码,需要根据自己的设置 @@ -51,9 +49,7 @@ public class DataBaseSync implements DataSync { protected final HashMap sqlserverMap = new HashMap<>(); - - public DataBaseSync(final String dbType, final String jdbcDriver, final String dbUrl, final String dbUser, - final String dbPass, final Integer bufferRows) throws SQLException, ClassNotFoundException { + public DataBaseSync(final String dbType, final String jdbcDriver, final String dbUrl, final String dbUser, final String dbPass, final Integer bufferRows) throws SQLException, ClassNotFoundException { this.dbType = dbType; this.jdbcDriver = jdbcDriver; this.dbUrl = dbUrl; @@ -77,6 +73,7 @@ public class DataBaseSync implements DataSync { //以下为 java.sql.types 到各数据库类型的映射关系,可以添加其他类型。 + mysqlMap.put("CLOB","LONGTEXT"); mysqlMap.put("BINARY","TINYBLOB"); mysqlMap.put("BIT",""); mysqlMap.put("LONGVARBINARY","MEDIUMBLOB"); @@ -93,7 +90,6 @@ public class DataBaseSync implements DataSync { db2Map.put("TEXT","CLOB"); db2Map.put("INT","INTEGER"); - oracleMap.put("BIGINT","NUMBER"); oracleMap.put("BINARY","RAW"); oracleMap.put("DECIMAL","NUMBER"); @@ -105,7 +101,7 @@ public class DataBaseSync implements DataSync { oracleMap.put("TINYINT","TINYINT"); oracleMap.put("VARBINARY","RAW"); - + postgresqlMap.put("CLOB","TEXT"); postgresqlMap.put("BINARY",""); postgresqlMap.put("BIT",""); postgresqlMap.put("LONGVARBINARY",""); @@ -121,6 +117,20 @@ public class DataBaseSync implements DataSync { } + public static String getJdbcTypeName(int jdbcType) { + Field[] fields = Types.class.getFields(); + for (Field field : fields) { + try { + if (field.getInt(null) == jdbcType) { + return field.getName(); + } + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + } + return "UNKNOWN"; + } + public String convertColumnType(String dbType, String colType) { if (dbType.equals("postgres")) { return postgresqlMap.getOrDefault(colType,colType); @@ -133,56 +143,36 @@ public class DataBaseSync implements DataSync { }else{ return colType; } - } + // to do 如果源表与目标表不一致,则以源表为准,修改目标表: + // 用于 扩字段长度,增加字段,删除字段,修改字段类型。 + /* 由于 mysql 、 postgres 数据库大小写是区分的,因此这里不做任何统一转换,自行控制输入参数大小写*/ public ResultSet getColMetaData(String schemaName, String tableName) throws SQLException { - // to do 如果源表与目标表不一致,则以愿表为准,修改目标表: - // 用于 扩字段长度,增加字段,删除字段,修改字段类型。 - /* 由于 mysql postgres 数据库大小写是区分的,因此这里不做任何统一转换,自行控制输入参数大小写*/ - - String tableCatalog = null; - if (this.dbType.equals("mysql")) { - tableCatalog = schemaName; - } + String tableCatalog = this.dbType.equals("mysql") || this.dbType.equals("clickhouse") ? schemaName : null; + schemaName = schemaName.equals("null") ? null : schemaName; DatabaseMetaData metaData = this.dbConn.getMetaData(); - ResultSet resultSet = metaData.getColumns(tableCatalog, schemaName, tableName, null); + ResultSet resultSet = metaData.getColumns(tableCatalog, schemaName, tableName, "%"); return resultSet; - } public List getColumnTypes(String schemaName, String tableName) throws SQLException { List colTypes = new ArrayList<>(); - String tableCatalog = null; - if(schemaName.equals("null")){ - schemaName = null; - } - if (this.dbType.equals("mysql")) { - tableCatalog = schemaName; - } - DatabaseMetaData metaData = this.dbConn.getMetaData(); - ResultSet resultSet = metaData.getColumns(tableCatalog, schemaName, tableName, null); + ResultSet resultSet = this.getColMetaData(schemaName, tableName); while (resultSet.next()) { colTypes.add(resultSet.getInt(5)); } return colTypes; - } + /* + * 获取一个表的列名,用于 readData 时指定一个表的字段,之所以不使用 * 是因为源数据库表的字段有可能新增, + * 也可以用于表结构的自动同步:当目标表的字段数少于源表时,对目标表自动执行 alter table add columns xxx + */ public List getTableColumns(String schemaName, String tableName) throws SQLException { logger.info(String.format("get table columns from %s.%s ", schemaName, tableName)); List columnNames = new ArrayList<>(); - DatabaseMetaData metaData = this.dbConn.getMetaData(); - - if(schemaName.equals("null")){ - schemaName = null; - } - - String tableCatalog = null; - if (this.dbType.equals("mysql")) { - tableCatalog = schemaName; - } - ResultSet resultSet = metaData.getColumns(tableCatalog, schemaName, tableName, null); + ResultSet resultSet = this.getColMetaData(schemaName, tableName); while (resultSet.next()) { // for(int i =0 ;i < 24; i++){ // System.out.print(resultSet.getObject(i+1)+"\t"); @@ -191,7 +181,6 @@ public class DataBaseSync implements DataSync { columnNames.add(resultSet.getString(4)); } return columnNames; - }; /* 判断一个表是否存在 */ @@ -209,15 +198,14 @@ public class DataBaseSync implements DataSync { // schemaName = schemaName == null ? null : schemaName.toUpperCase(); // tableName = tableName == null ? null : tableName.toUpperCase(); // } - + String tableCatalog = this.dbType.equals("mysql") ? schemaName : null; schemaName = schemaName.equals("null") ? null : schemaName; - tableName = tableName.equals("null") ? null : tableName; - ResultSet resultSet = metaData.getTables(null, schemaName, tableName, null); + // tableName = tableName.equals("null") ? null : tableName; // what the fuck? + ResultSet resultSet = metaData.getTables(tableCatalog, schemaName, tableName, null); if (resultSet.next()) { return true; } return false; - }; /* 获取一个表的 DDL 语句,用于在目标数据库创建表 lenSize 默认值 1 @@ -235,22 +223,16 @@ public class DataBaseSync implements DataSync { StringBuilder sb = new StringBuilder(1024); StringBuilder sb_remark = new StringBuilder(1024); String table_remark = ""; - - String tableCatalog = null; - - if(schemaName.equals("null")){ - schemaName = null; - } + String tableCatalog = this.dbType.equals("mysql") ? schemaName : null; ResultSet tbResultset = metaData.getTables(tableCatalog, schemaName, tableName, null); if (tbResultset.next()) { table_remark = tbResultset.getString(5); } - - ResultSet resultSet = metaData.getColumns(tableCatalog, schemaName, tableName, null); + ResultSet resultSet = this.getColMetaData(schemaName, tableName); int i = 0; ArrayList pks = new ArrayList(); - ResultSet resultSetPKS = metaData.getPrimaryKeys(null, schemaName, tableName); + ResultSet resultSetPKS = metaData.getPrimaryKeys(tableCatalog, schemaName, tableName); while (resultSetPKS.next()) { pks.add(resultSetPKS.getString(4)); } @@ -262,10 +244,10 @@ public class DataBaseSync implements DataSync { Integer dataType = resultSet.getInt(5); String columnType = ""; if(this.dbType.equals(config.getDbType())){ - //说明原数据库与目标数据库一致,不需要任何字段类型转换 + //说明源数据库与目标数据库一致,不需要任何字段类型转换 columnType = resultSet.getString(6); } else{ - columnType = convertColumnType(config.getDbType(), resultSet.getString(6)); + columnType = convertColumnType(config.getDbType(), getJdbcTypeName(dataType)); } Integer columnSize = resultSet.getInt(7); @@ -349,8 +331,7 @@ public class DataBaseSync implements DataSync { }; /* 获取一个表的数据,采用流式读取,可以提供 whereClause 表示增量读取 ,如果 whereClause 为空,表示全量读取 */ - public ResultSet readData(String schemaName, final String tableName, final List columnNames, - String whereClause) throws SQLException { + public ResultSet readData(String schemaName, final String tableName, final List columnNames, String whereClause) throws SQLException { this.dbConn.setAutoCommit(false); logger.info(String.format("read data from %s.%s ", schemaName, tableName)); final String columns = Joiner.on(", ").join(columnNames); @@ -370,8 +351,7 @@ public class DataBaseSync implements DataSync { }; /* 将 ResultSet 类型的数据定入一张表,写入成功返回 true,否则返回 false */ - public boolean writeData(String schemaName, final String tableName, final List columnNames, ResultSet rs, - final String whereClause) throws IOException, SQLException { + public boolean writeData(String schemaName, final String tableName, final List columnNames, ResultSet rs, final String whereClause) throws IOException, SQLException { this.dbConn.setAutoCommit(false); logger.info(String.format("begin insert into %s.%s...", schemaName, tableName)); String tbName = ""; @@ -397,16 +377,20 @@ public class DataBaseSync implements DataSync { logger.info(String.format("clear data for %s is begin...", tbName)); logger.info("clearSql : " + clearSql); - this.dbConn.commit(); + int supportsTransactions = 0; + try {supportsTransactions = this.dbConn.getTransactionIsolation();} catch(SQLException e){logger.info("clearSql : " + e.getMessage()); supportsTransactions = Integer.parseInt(e.getMessage().replaceAll("[^0-9]", ""));} + logger.info("clearSql : supportsTransactions " + supportsTransactions); + logger.info("clearSql : getTransactionIsolation " + this.dbConn.getTransactionIsolation()); // what the fuck? why is the getTransactionIsolation return 1 on clickhouse + // if(supportsTransactions != Connection.TRANSACTION_NONE)this.dbConn.commit(); PreparedStatement pStemt = this.dbConn.prepareStatement(clearSql); pStemt.executeUpdate(); - this.dbConn.commit(); + if(supportsTransactions != Connection.TRANSACTION_NONE)this.commit(); logger.info(String.format("clear data for %s is done", tbName)); List colTypes = this.getColumnTypes(schemaName, tableName); final String insertSql = Tools.buildInsertSql(tbName, columnNames.stream().toArray(String[]::new)); - // System.out.println(insertSql); + logger.info("insertSql : " + insertSql); pStemt = this.dbConn.prepareStatement(insertSql); // final int numberOfCols = rs.getMetaData().getColumnCount(); final int numberOfCols = columnNames.size(); @@ -415,9 +399,15 @@ public class DataBaseSync implements DataSync { long starttime = System.currentTimeMillis(); while (rs.next()) { for (int i = 0; i < numberOfCols; i++) { + // logger.info(String.format("table:%s type:%s colname:%s coltype:%s timetype:%s", tbName, this.dbType, columnNames.get(i), colTypes.get(i), Types.TIMESTAMP)); if (colTypes.get(i) == Types.VARCHAR || colTypes.get(i) == Types.CHAR || colTypes.get(i) == Types.CLOB) { pStemt.setString(i + 1, Objects.toString(rs.getString(i + 1), "")); // 将 null 转化为空 + } else if (this.dbType.equals("mysql") && (colTypes.get(i) == Types.TIMESTAMP // specifies mysql + || colTypes.get(i) == Types.DATE + || columnNames.get(i).length() >=4 && columnNames.get(i).substring(columnNames.get(i).length() - 4).equalsIgnoreCase("TIME") // some 64bit clickhouse colType return 1111 cannot match jdbc type + || columnNames.get(i).length() >=4 && columnNames.get(i).substring(columnNames.get(i).length() - 4).equalsIgnoreCase("DATE"))) { + pStemt.setString(i + 1, rs.getString(i + 1)); // 不能将 null 转化为空 } else { pStemt.setObject(i + 1, rs.getObject(i + 1)); } @@ -428,9 +418,10 @@ public class DataBaseSync implements DataSync { // 每10万行提交一次记录 int affectRows = 0; for (int i : pStemt.executeBatch()) { - affectRows += i; + if(i >= 0 || i == PreparedStatement.SUCCESS_NO_INFO) + affectRows += i == PreparedStatement.SUCCESS_NO_INFO ? 1 : i; } - this.dbConn.commit(); + if(supportsTransactions != Connection.TRANSACTION_NONE)this.commit(); logger.info(String.format("rows insert into %s is %d", tbName, affectRows)); totalAffectRows += affectRows; rowCount = 0; @@ -439,9 +430,10 @@ public class DataBaseSync implements DataSync { // 处理剩余的记录 int affectRows = 0; for (int i : pStemt.executeBatch()) { - affectRows += i; + if(i >= 0 || i == PreparedStatement.SUCCESS_NO_INFO) + affectRows += i == PreparedStatement.SUCCESS_NO_INFO ? 1 : i; } - this.dbConn.commit(); + if(supportsTransactions != Connection.TRANSACTION_NONE)this.commit(); logger.info(String.format("rows insert into %s is %d", tbName, affectRows)); totalAffectRows += affectRows; rowCount = 0; @@ -453,7 +445,6 @@ public class DataBaseSync implements DataSync { /* 执行提供的 ddl 建表 */ public boolean createTable(String schemaName, String tableName, String ddl) throws SQLException { - this.dbConn.setAutoCommit(true); String newDDL = ""; if (schemaName == null || schemaName.length() == 0) { @@ -482,4 +473,8 @@ public class DataBaseSync implements DataSync { return true; } + public boolean commit() throws SQLException { + try {this.dbConn.commit(); return true;} catch(SQLException e){logger.info("commit : " + e.getMessage()); return false;} + } + } \ No newline at end of file diff --git a/src/main/java/com/data/database/api/impl/DataBaseSyncFactory.java b/src/main/java/com/data/database/api/impl/DataBaseSyncFactory.java index aecd7d00581f55f1b60b7216a6139fb8f1a3e5a2..247c77db12fbbb678d8d5f4b56489b9f0e707f41 100644 --- a/src/main/java/com/data/database/api/impl/DataBaseSyncFactory.java +++ b/src/main/java/com/data/database/api/impl/DataBaseSyncFactory.java @@ -1,6 +1,6 @@ package com.data.database.api.impl; import com.data.database.Config; -import java.util.Map; +// import java.util.Map; import java.sql.SQLException; public class DataBaseSyncFactory { diff --git a/src/main/java/com/data/database/api/impl/Db2DataBaseSync.java b/src/main/java/com/data/database/api/impl/Db2DataBaseSync.java index 2e1d6a0c0f0f131d37287c67a3eb804590970fa1..2926b4a396c0581fe56532da8741df7126f0fc95 100644 --- a/src/main/java/com/data/database/api/impl/Db2DataBaseSync.java +++ b/src/main/java/com/data/database/api/impl/Db2DataBaseSync.java @@ -1,20 +1,17 @@ package com.data.database.api.impl; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.StringReader; -import java.sql.PreparedStatement; -import java.sql.ResultSet; +// import java.io.BufferedReader; +// import java.io.IOException; +// import java.io.StringReader; +// import java.sql.PreparedStatement; +// import java.sql.ResultSet; import java.sql.SQLException; -import java.util.List; - +// import java.util.List; public class Db2DataBaseSync extends DataBaseSync { //在此编写db2特性的代码逻辑 - - public Db2DataBaseSync(String dbType, String jdbcDriver, String dbUrl, String dbUser, String dbPass, - Integer bufferRows) throws SQLException, ClassNotFoundException { + public Db2DataBaseSync(String dbType, String jdbcDriver, String dbUrl, String dbUser, String dbPass, Integer bufferRows) throws SQLException, ClassNotFoundException { super(dbType, jdbcDriver, dbUrl, dbUser, dbPass, bufferRows); } @@ -26,5 +23,4 @@ public class Db2DataBaseSync extends DataBaseSync { // return true; // } - }; \ No newline at end of file diff --git a/src/main/java/com/data/database/api/impl/OracleDataBaseSync.java b/src/main/java/com/data/database/api/impl/OracleDataBaseSync.java index 1e05e9b67962f7bd039c0efd6124018c6d86c3e6..f3910cd6c4cb9b68f6de2b15ad40f46f62e03ccc 100644 --- a/src/main/java/com/data/database/api/impl/OracleDataBaseSync.java +++ b/src/main/java/com/data/database/api/impl/OracleDataBaseSync.java @@ -1,24 +1,20 @@ package com.data.database.api.impl; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.StringReader; -import java.sql.PreparedStatement; -import java.sql.ResultSet; +// import java.io.BufferedReader; +// import java.io.IOException; +// import java.io.StringReader; +// import java.sql.PreparedStatement; +// import java.sql.ResultSet; import java.sql.SQLException; -import java.util.List; - +// import java.util.List; public class OracleDataBaseSync extends DataBaseSync { - //在此编写db2特性的代码逻辑 - + //在此编写Oracle特性的代码逻辑 - public OracleDataBaseSync(String dbType, String jdbcDriver, String dbUrl, String dbUser, String dbPass, - Integer bufferRows) throws SQLException, ClassNotFoundException { + public OracleDataBaseSync(String dbType, String jdbcDriver, String dbUrl, String dbUser, String dbPass, Integer bufferRows) throws SQLException, ClassNotFoundException { super(dbType, jdbcDriver, dbUrl, dbUser, dbPass, bufferRows); } - // @Override // public boolean writeData(String schemaName, final String tableName, List columnNames, ResultSet rs, // final String whereClause) { diff --git a/src/main/java/com/data/database/api/impl/PostgresDataBaseSync.java b/src/main/java/com/data/database/api/impl/PostgresDataBaseSync.java index 577aff6e2fde40bdbd4a5f00c3787c530be99890..a10ca2f8ee18cf0a68a02486a7c95983fff61ad6 100644 --- a/src/main/java/com/data/database/api/impl/PostgresDataBaseSync.java +++ b/src/main/java/com/data/database/api/impl/PostgresDataBaseSync.java @@ -13,14 +13,12 @@ import org.postgresql.core.BaseConnection; public class PostgresDataBaseSync extends DataBaseSync { - public PostgresDataBaseSync(String dbType, String jdbcDriver, String dbUrl, String dbUser, String dbPass, - Integer bufferRows) throws SQLException, ClassNotFoundException { + public PostgresDataBaseSync(String dbType, String jdbcDriver, String dbUrl, String dbUser, String dbPass, Integer bufferRows) throws SQLException, ClassNotFoundException { super(dbType, jdbcDriver, dbUrl, dbUser, dbPass, bufferRows); } @Override - public boolean writeData(String schemaName, final String tableName, List columnNames, ResultSet rs, - final String whereClause) throws IOException, SQLException { + public boolean writeData(String schemaName, final String tableName, List columnNames, ResultSet rs, final String whereClause) throws IOException, SQLException { this.dbConn.setAutoCommit(false); logger.info(String.format("begin insert into %s.%s...", schemaName, tableName)); String tbName = ""; @@ -64,6 +62,7 @@ public class PostgresDataBaseSync extends DataBaseSync { tbName,String.join(",",columnNames)); logger.info("copyFromSql = "+copyFromSql); long starttime = System.currentTimeMillis(); + CopyManager copyManager = new CopyManager((BaseConnection) this.dbConn); while (rs.next()) { for (int i = 0; i < numberOfCols - 1; i++) { String col = rs.getString(i+1); @@ -93,11 +92,10 @@ public class PostgresDataBaseSync extends DataBaseSync { rowCount++; if (rowCount >= bufferRows) { // 每10万行提交一次记录 - CopyManager copyManager = new CopyManager((BaseConnection) this.dbConn); + // CopyManager copyManager = new CopyManager((BaseConnection) this.dbConn); // What the funck. Why defined it in the while loop // long rowsInserted = copyManager.copyIn("copy " + tbName + " from stdin // delimiter e'\\x02' NULL as ''", - long rowsInserted = copyManager.copyIn(copyFromSql, - new BufferedReader(new StringReader(sBuilder.toString()))); + long rowsInserted = copyManager.copyIn(copyFromSql, new BufferedReader(new StringReader(sBuilder.toString()))); this.dbConn.commit(); // sBuilder.delete(0, sBuilder.length());//清空缓冲区 sBuilder.setLength(0);// 清空缓冲区 @@ -108,7 +106,7 @@ public class PostgresDataBaseSync extends DataBaseSync { } // 处理剩余的记录 - CopyManager copyManager = new CopyManager((BaseConnection) this.dbConn); + // CopyManager copyManager = new CopyManager((BaseConnection) this.dbConn); // Why not reuse it? long rowsInserted = copyManager.copyIn(copyFromSql, new BufferedReader(new StringReader(sBuilder.toString()))); this.dbConn.commit(); sBuilder.setLength(0);// 清空缓冲区