diff --git a/OpenGauss-Project-main/lib/__init__.py b/OpenGauss-Project-main/lib/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/OpenGauss-Project-main/lib/connection.py b/OpenGauss-Project-main/lib/connection.py new file mode 100644 index 0000000000000000000000000000000000000000..e639b1833f4508b7abf7ee6005e8d83a3ab006fc --- /dev/null +++ b/OpenGauss-Project-main/lib/connection.py @@ -0,0 +1,103 @@ +import time +import logging +import sqlite3 +from psycopg2 import pool +from threading import Semaphore + + +class OpenGaussConnectionPool(pool.AbstractConnectionPool): + def __init__(self, minconn, maxconn, *args, **kwargs): + self.semaphore = Semaphore(maxconn) + self.maxconn = maxconn + self.current = 0 + super().__init__(minconn, maxconn, *args, **kwargs) + + def getconn(self, key=None): + self.semaphore.acquire() + self.current += 1 + return self._getconn(key) + + def putconn(self, *args, **kwargs): + self._putconn(*args, **kwargs) + self.semaphore.release() + self.current -= 1 + + def closeall(self) -> None: + super().closeall() + self.semaphore.release(self.current) + + +class OpenGaussConnection: + def __init__(self, opengauss_properties, error_log: logging.Logger, info_log: logging.Logger): + super().__init__() + self.pool = None + self.opengauss_properties = opengauss_properties + i = 0 + ex = None + while True: + if i == 5: + print("Fail to connect to OpenGauss database") + raise ex + try: + self.pool = OpenGaussConnectionPool(1, 300, + database=self.opengauss_properties['database.name'], + user=self.opengauss_properties['database.user'], + password=self.opengauss_properties[ + 'database.password'], + host=self.opengauss_properties['database.host'], + port=self.opengauss_properties['database.port'], + keepalives=1, + keepalives_idle=30, + keepalives_interval=10, + keepalives_count=15) + info_log.info( + "Successfully Log In OpenGauss Database %s As %s" % ( + self.opengauss_properties['database.name'], self.opengauss_properties['database.user'])) + print("Successfully Log In OpenGauss Database %s As %s" % ( + self.opengauss_properties['database.name'], self.opengauss_properties['database.user'])) + break + except Exception as e: + i += 1 + if i == 1: + print("Fail to connect to OpenGauss database. Retry 1 time") + else: + print("Fail to connect to OpenGauss database. Retry %d times" % i) + error_log.error(e) + ex = e + time.sleep(5) + + def getconn(self, key=None): + return self.pool.getconn(key) + + def putconn(self, *args, **kwargs): + self.pool.putconn(*args, **kwargs) + + def closeall(self): + self.pool.closeall() + + +class SqliteConnection: + def __init__(self, sqlite_properties, error_log: logging.Logger, info_log: logging.Logger): + i = 0 + ex = None + while True: + if i == 5: + print("Fail to connect to Sqlite3 database") + raise ex + try: + self.conn_sqlite = sqlite3.connect("sqlite/" + sqlite_properties['database.filename']) + info_log.info("Successfully Log In Sqlite3 Database %s" % (sqlite_properties['database.filename'])) + print("Successfully Log In Sqlite3 Database %s" % (sqlite_properties['database.filename'])) + break + except Exception as e: + i += 1 + if i == 1: + print("Fail to connect to Sqlite3 database. Retry 1 time") + else: + print("Fail to connect to Sqlite3 database. Retry %d times" % i) + error_log.error(e) + ex = e + time.sleep(5) + + def getconn(self): + return self.conn_sqlite diff --git a/OpenGauss-Project-main/lib/decorator.py b/OpenGauss-Project-main/lib/decorator.py new file mode 100644 index 0000000000000000000000000000000000000000..0733bd73fe1c5236125a81cdb909c80bac434760 --- /dev/null +++ b/OpenGauss-Project-main/lib/decorator.py @@ -0,0 +1,292 @@ +import re + + +def remove_comment(sql: str): + while sql.find("--") != -1: + i = index = sql.find("--") + while i < len(sql) and sql[i] != '\n': + i += 1 + if i == len(sql): + return sql[0:index] + sql = sql[0:index] + sql[(i + 1):] + return sql + + +def get_table_name(sql: str): + x = sql.find("CREATE TABLE") + y = sql.find("(") + return sql[x + 12:y] + + +def find_n_sub_str(src: str, sub: str, n: int, start: int): + index = src.find(sub, start) + if index != -1 and n > 0: + return find_n_sub_str(src, sub, n - 1, index + 1) + return index + + +def find_n_sub_str_re(src: str, pattern: re.Pattern, n: int, start: int): + search = pattern.search(src, start) + index = search.start() + sub = search.group() + if index != -1 and n > 0: + return find_n_sub_str_re(src, pattern, n - 1, index + 1) + return index, sub + + +def check_integrity(src: str, sub: str, index: int): + flag = True + if index > 0: + flag = (src[index - 1] == ' ') + if not flag: + return flag + if index + len(sub) < len(src): + flag = (src[index + len(sub)] == ' ') + return flag + + +def check_is_column_name(src: str, index: int): + index -= 1 + if src[index] != ' ': + return False + else: + return True + + +def convert_to_not_null(sql: str): + cnt = sql.count("''") + flag = True + for n in range(cnt): + index = find_n_sub_str(sql, "''", n, 0) + if index > 0: + if sql[index - 1] != '(' and sql[index - 1] != ',' and sql[index - 1] != ' ': + flag = False + if index + 2 < len(sql): + if sql[index + 2] != ')' and sql[index + 2] != ',' and sql[index + 2] != ' ': + flag = False + if flag: + sql = sql[0:index] + "' '" + sql[(index + 2):] + flag = True + return sql + + +def convert_double_quote(sql: str): + oldsql = None + if sql.startswith("CREATE TABLE"): + index = sql.upper().find("(") + oldsql = sql[0:(index + 1)] + sql = sql[(index + 1):] + if sql.strip().startswith("'"): + start = sql.find("'") + for i in range(start + 1, len(sql)): + if sql[i] == "'": + if oldsql is not None: + return oldsql + '"' + sql[(start + 1):i] + '"' + sql[i + 1:] + else: + return '"' + sql[(start + 1):i] + '"' + sql[i + 1:] + else: + if oldsql is not None: + return oldsql + sql + else: + return sql + + +def convert_varchar(sql: str): + pattern = re.compile('VARCHAR[(]\d+[)]') + cnt = len(pattern.findall(sql.upper())) + for n in range(cnt): + index, sub = find_n_sub_str_re(sql.upper(), pattern, n, 0) + if check_integrity(sql.upper(), sub, index): + num = int(sub[8:-1]) + num *= 3 + num = str(num) + result = "VARCHAR(%s)" % num + sql = sql[0:index] + result + sql[(index + len(sub)):] + return sql + + +def try_to_convert(oldstr: str, newstr: str, sql: str): + cnt = sql.upper().count(oldstr.upper()) + if cnt > 0: + for n in range(cnt): + index = find_n_sub_str(sql.upper(), oldstr.upper(), n, 0) + if check_integrity(sql.upper(), oldstr.upper(), index) \ + and check_is_column_name(sql.upper(), index): + return sql[0:index] + newstr + sql[(index + len(oldstr)):] + return sql + + +def try_to_remove_fk(sql: str): + sql = convert_double_quote(sql) + if sql.upper().find("FOREIGN KEY") != -1: + if sql.endswith("));") or sql.endswith(");"): + return ");" + else: + return "" + cnt = sql.upper().count("REFERENCES") + if cnt > 0: + for n in range(cnt): + index1 = find_n_sub_str(sql.upper(), "REFERENCES", n, 0) + if check_integrity(sql.upper(), "REFERENCES", index1): + index2 = sql.upper().find("CONSTRAINT") + if index2 != -1: + return sql[0:index2] + else: + return sql[0:index1] + return sql + + +def remove_foreign_key(sql: str): + ss = sql.split(',') + ss = map(lambda x: + try_to_remove_fk(x.replace('\n', '').replace('\r', '').replace(' ', ' ').strip()), ss) + sss = list(filter(lambda x: (x != ''), ss)) + sql = ",".join(sss) + sql = sql.replace(",);", ");").replace("(,", "(").replace(",,", ",") + return sql + + +def extract_foreign_key(sql: str): + sqls = [] + ss = sql.split(',') + for sql in ss: + sql = sql.replace('\n', '').replace('\r', '').replace(' ', ' ').strip() + index = sql.upper().find("FOREIGN KEY") + if index != -1: + sql = sql[index:] + while sql.endswith(")") or sql.endswith(";") or sql.endswith(","): + sql = sql[0:-1] + index = sql.upper().find("REFERENCES") + if sql.find("(", index) != -1 and sql.find(")", index) == -1: + sql = sql + ")" + sqls.append(sql) + continue + cnt = sql.upper().count("REFERENCES") + if cnt > 0: + for n in range(cnt): + index = find_n_sub_str(sql.upper(), "REFERENCES", n, 0) + fk = sql[index:] + if check_integrity(sql.upper(), "REFERENCES", index): + column_name = None + if sql.upper().startswith("CREATE TABLE"): + index = sql.upper().find("(") + sql = sql[(index + 1):] + if sql.strip().upper().startswith('"'): + start = sql.find('"') + for i in range(start + 1, len(sql)): + if sql[i] == '"': + column_name = sql[start:i + 1] + break + elif sql.upper().startswith("'"): + start = sql.find("'") + for i in range(start + 1, len(sql)): + if sql[i] == "'": + column_name = '"' + sql[(start + 1):i] + '"' + break + else: + sql = sql.strip() + for i in range(len(sql)): + if sql[i] == ' ': + column_name = sql[0:i] + break + result = "foreign key (%s) %s" % (column_name, fk) + sqls.append(result) + break + return sqls + + +def convert_datatype(sql: str): + sql = convert_varchar(sql) + sql = try_to_convert("datetime", "timestamp without time zone", sql) + sql = try_to_convert("real", "double precision", sql) + sql = try_to_convert("nvarchar", "nvarchar2", sql) + sql = try_to_convert("varying character", "character varying", sql) + sql = try_to_convert("graphic", "nchar", sql) + sql = try_to_convert("year", "integer", sql) + sql = try_to_convert("line", "path", sql) + sql = try_to_convert("autoincrement", "integer", sql) + return sql + + +def create_without_fk(sql: str): + sql = remove_foreign_key(sql) + sql = convert_datatype(sql) + return sql + + +def alter_fk(sql: str): + sqls = extract_foreign_key(sql) + table_name = get_table_name(sql) + alter_sqls = [] + for sql in sqls: + alter_sqls.append("alter table " + table_name + " add " + sql + ";") + return alter_sqls + + +def insert_array(sql: str): + x = sql.find(",'[") + y = sql.find("]") + if x != -1 and y != -1: + sql = sql[:x + 1] + 'array' + sql[x + 2:y + 1] + sql[y + 2:] + return sql + + +def autoincrement(sql: str, table_name: str, num: int): + sqls = [] + sql = sql.replace("\n", " ") + cnt = sql.upper().count("AUTOINCREMENT") + column_name = None + if cnt > 0: + for n in range(cnt): + index = find_n_sub_str(sql.upper(), "AUTOINCREMENT", n, 0) + if check_integrity(sql.upper(), "AUTOINCREMENT", index) \ + and check_is_column_name(sql.upper(), index): + index_e = index - 1 + while index_e >= 0: + if sql[index_e] != ' ': + break + index_e -= 1 + index_s = index_e + while index_s >= 0: + if sql[index_s] == ' ': + break + index_s -= 1 + column_name = sql[(index_s + 1):(index_e + 1)] + if column_name is not None: + seq_sql = "CREATE SEQUENCE sq_" + table_name + " START " + str(num) + " INCREMENT 1 CACHE 20;" + alter_sql = "ALTER TABLE " + table_name + " ALTER COLUMN " + column_name + \ + " set default nextval('sq_" + table_name + "');" + sqls.append(seq_sql) + sqls.append(alter_sql) + column_name = None + return sqls + + +def insert(sql: str): + if sql.find('INSERT INTO') != -1: + sql = insert_array(sql) + sql = convert_to_not_null(sql) + return sql + + +def trigger_to_function(trigger_name: str, sql: str): + function_name = trigger_name + "()" + sql = sql.upper() + sql_L = sql.split() + sql = " ".join(sql_L) + ll = sql.find("BEGIN") + 6 + rr = sql.find("END") - 2 + action = sql[ll:rr] + function = "CREATE FUNCTION function_name RETURNS TRIGGER AS $example_table$\n" + " BEGIN\n" + " action;\n" + " RETURN NEW;\n" + " END;\n" + "$example_table$ LANGUAGE plpgsql;" + function = function.replace("function_name", function_name) + function = function.replace("action", action) + function = function.replace("DATETIME('NOW')", "CURRENT_TIMESTAMP") + function = function.replace("json_array", "array") + return function + + +def new_trigger(trigger_name: str, sql: str): + rr = sql.find("BEGIN") + new_sql = sql[:rr] + trigger_sql = new_sql + "FOR EACH ROW EXECUTE PROCEDURE " + trigger_name + "()" + ";" + return trigger_sql diff --git a/OpenGauss-Project-main/lib/multi_thread.py b/OpenGauss-Project-main/lib/multi_thread.py new file mode 100644 index 0000000000000000000000000000000000000000..93459ccada8d42fd57179e9cf5a27f3a5fe9b484 --- /dev/null +++ b/OpenGauss-Project-main/lib/multi_thread.py @@ -0,0 +1,172 @@ +import time +import psycopg2 + +from lib import decorator +from lib.connection import OpenGaussConnection, SqliteConnection +from lib.opengauss_thread import OpenGaussLogThread, OpenGaussThread + + +def multi_thread(opengauss_properties, sqlite_properties, error_log, info_log, sqls_log, is_record_sqls): + opengauss = OpenGaussConnection(opengauss_properties, error_log, info_log) + sqlite = SqliteConnection(sqlite_properties, error_log, info_log) + + conn_sqlite = sqlite.getconn() + + dbusername = opengauss_properties['database.user'] + dbschema = opengauss_properties['database.schema'] + conn_opengauss = opengauss.getconn() + cursor_opengauss = conn_opengauss.cursor() + try: + cursor_opengauss.execute("create schema %s authorization %s;" % (dbschema, dbusername)) + cursor_opengauss.execute("grant usage on schema %s to %s;" % (dbschema, dbusername)) + conn_opengauss.commit() + cursor_opengauss.close() + opengauss.putconn(conn_opengauss) + except psycopg2.errors.DuplicateSchema as e: + info_log.info(e) + cursor_opengauss.close() + opengauss.putconn(conn_opengauss) + + print("The data migration operation is in progress...") + time_start = time.time() + + cursor_sqlite = conn_sqlite.cursor() + all_table = cursor_sqlite.execute("select * from sqlite_master where type = 'table';") + create_sqls = [] + for row in all_table: + s = row[4] + create_sqls.append(s + ";") + try: + conn_opengauss = opengauss.getconn() + cursor_opengauss = conn_opengauss.cursor() + cursor_opengauss.execute("set search_path to %s;" % dbschema) + for sql in create_sqls: + sql = decorator.remove_comment(sql) + sql = decorator.create_without_fk(sql) + cursor_opengauss.execute(sql) + if is_record_sqls: + sqls_log.info(sql.replace("\n", "")) + conn_opengauss.commit() + except Exception as e: + error_log.error(e) + finally: + if conn_opengauss is not None: + opengauss.putconn(conn_opengauss) + + count = 0 + sqls = [] + thread_list = [] + for sql in conn_sqlite.iterdump(): + sql = decorator.remove_comment(sql) + if sql.upper().startswith("CREATE"): + continue + sqls.append(sql) + count += 1 + if count == 50: + if is_record_sqls: + t = OpenGaussLogThread(opengauss, sqls, dbschema, error_log, sqls_log) + else: + t = OpenGaussThread(opengauss, sqls, dbschema, error_log) + thread_list.append(t) + t.start() + sqls = [] + count = 0 + if is_record_sqls: + t = OpenGaussLogThread(opengauss, sqls, dbschema, error_log, sqls_log) + else: + t = OpenGaussThread(opengauss, sqls, dbschema, error_log) + thread_list.append(t) + t.start() + for t in thread_list: + t.join() + + try: + conn_opengauss = opengauss.getconn() + cursor_opengauss = conn_opengauss.cursor() + cursor_opengauss.execute("set search_path to %s;" % dbschema) + for create_sql in create_sqls: + create_sql = decorator.remove_comment(create_sql) + sqls = decorator.alter_fk(create_sql) + for alter_sql in sqls: + cursor_opengauss.execute(alter_sql) + if is_record_sqls: + sqls_log.info(alter_sql.replace("\n", "")) + table_name = decorator.get_table_name(create_sql) + row_num = cursor_sqlite.execute("SELECT COUNT(*) FROM " + table_name) + sqls = decorator.autoincrement(create_sql, table_name, row_num) + for alter_sql in sqls: + cursor_opengauss.execute(alter_sql) + if is_record_sqls: + sqls_log.info(alter_sql.replace("\n", "")) + conn_opengauss.commit() + except Exception as e: + error_log.error(e) + finally: + if conn_opengauss is not None: + opengauss.putconn(conn_opengauss) + + try: + conn_opengauss = opengauss.getconn() + cursor_opengauss = conn_opengauss.cursor() + cursor_opengauss.execute("set search_path to %s;" % dbschema) + indexes = cursor_sqlite.execute("select * from sqlite_master where type = 'index' and sql is not null;") + for row in indexes: + sql = row[4] + ";" + sql = decorator.remove_comment(sql) + cursor_opengauss.execute(sql) + if is_record_sqls: + sqls_log.info(sql) + conn_opengauss.commit() + except Exception as e: + error_log.error(e) + finally: + if conn_opengauss is not None: + opengauss.putconn(conn_opengauss) + + try: + conn_opengauss = opengauss.getconn() + cursor_opengauss = conn_opengauss.cursor() + cursor_opengauss.execute("set search_path to %s;" % dbschema) + views = cursor_sqlite.execute("select * from sqlite_master where type = 'view';") + for row in views: + sql = row[4] + sql = decorator.remove_comment(sql) + cursor_opengauss.execute(sql) + if is_record_sqls: + sqls_log.info(sql) + conn_opengauss.commit() + except Exception as e: + error_log.error(e) + finally: + if conn_opengauss is not None: + opengauss.putconn(conn_opengauss) + + try: + conn_opengauss = opengauss.getconn() + cursor_opengauss = conn_opengauss.cursor() + cursor_opengauss.execute("set search_path to %s;" % dbschema) + triggers = cursor_sqlite.execute("select * from sqlite_master where type = 'trigger';") + for row in triggers: + trigger_name = row[1] + trigger_sql = row[4] + trigger_sql = decorator.remove_comment(trigger_sql) + function = decorator.trigger_to_function(trigger_name, trigger_sql) + function = decorator.remove_comment(function) + trigger = decorator.new_trigger(trigger_name, trigger_sql) + trigger = decorator.remove_comment(trigger) + cursor_opengauss.execute(function) + cursor_opengauss.execute(trigger) + if is_record_sqls: + sqls_log.info(function) + sqls_log.info(trigger) + conn_opengauss.commit() + except Exception as e: + error_log.error(e) + finally: + if conn_opengauss is not None: + opengauss.putconn(conn_opengauss) + + time_end = time.time() + + time_c = time_end - time_start + print('Time Cost = %.2f seconds' % time_c) diff --git a/OpenGauss-Project-main/lib/opengauss_thread.py b/OpenGauss-Project-main/lib/opengauss_thread.py new file mode 100644 index 0000000000000000000000000000000000000000..60d48c6c62b0f292d69ffac43b836414d32fdd52 --- /dev/null +++ b/OpenGauss-Project-main/lib/opengauss_thread.py @@ -0,0 +1,67 @@ +import logging +import threading + +from lib import decorator +from lib.connection import OpenGaussConnection + + +class OpenGaussThread(threading.Thread): + + def __init__(self, opengauss: OpenGaussConnection, sqls, dbschema, error_log: logging.Logger): + super().__init__() + self.opengauss = opengauss + self.sqls = sqls + self.dbschema = dbschema + self.error_log = error_log + + def run(self) -> None: + conn = None + try: + conn = self.opengauss.getconn() + cursor_opengauss = conn.cursor() + cursor_opengauss.execute("set search_path to %s;" % self.dbschema) + for sql in self.sqls: + if sql.upper().startswith("CREATE"): + sql = decorator.create_without_fk(sql) + cursor_opengauss.execute(sql) + elif sql.upper().startswith("BEGIN TRANSACTION;") or sql.upper().startswith("COMMIT;"): + continue + else: + sql = decorator.insert(sql) + cursor_opengauss.execute(sql) + conn.commit() + except Exception as e: + self.error_log.error(e) + finally: + if conn is not None: + self.opengauss.putconn(conn) + + +class OpenGaussLogThread(OpenGaussThread): + + def __init__(self, opengauss: OpenGaussConnection, sqls, dbschema, error_log: logging.Logger, + sqls_log: logging.Logger): + super().__init__(opengauss, sqls, dbschema, error_log) + self.sqls_log = sqls_log + + def run(self) -> None: + conn = None + try: + conn = self.opengauss.getconn() + cursor_opengauss = conn.cursor() + cursor_opengauss.execute("set search_path to %s;" % self.dbschema) + for sql in self.sqls: + if sql.upper().startswith("CREATE"): + continue + elif sql.upper().startswith("BEGIN TRANSACTION;") or sql.upper().startswith("COMMIT;"): + continue + else: + sql = decorator.insert(sql) + cursor_opengauss.execute(sql) + self.sqls_log.info(sql.replace("\n", "")) + conn.commit() + except Exception as e: + self.error_log.error(e) + finally: + if conn is not None: + self.opengauss.putconn(conn) diff --git a/OpenGauss-Project-main/lib/single_thread.py b/OpenGauss-Project-main/lib/single_thread.py new file mode 100644 index 0000000000000000000000000000000000000000..8c20682d7e4228ad0bc0a20bca06a999b1f80d0d --- /dev/null +++ b/OpenGauss-Project-main/lib/single_thread.py @@ -0,0 +1,112 @@ +import time +import psycopg2 + +from lib import decorator +from lib.connection import OpenGaussConnection, SqliteConnection + + +def single_thread(opengauss_properties, sqlite_properties, error_log, info_log, sqls_log, is_record_sqls): + opengauss = OpenGaussConnection(opengauss_properties, error_log, info_log) + sqlite = SqliteConnection(sqlite_properties, error_log, info_log) + + conn_sqlite = sqlite.getconn() + + dbusername = opengauss_properties['database.user'] + dbschema = opengauss_properties['database.schema'] + conn_opengauss = opengauss.getconn() + cursor_opengauss = conn_opengauss.cursor() + try: + cursor_opengauss.execute("create schema %s authorization %s;" % (dbschema, dbusername)) + cursor_opengauss.execute("grant usage on schema %s to %s;" % (dbschema, dbusername)) + conn_opengauss.commit() + cursor_opengauss.close() + opengauss.putconn(conn_opengauss) + except psycopg2.errors.DuplicateSchema as e: + info_log.info(e) + cursor_opengauss.close() + opengauss.putconn(conn_opengauss) + + print("The data migration operation is in progress...") + time_start = time.time() + + cursor_sqlite = conn_sqlite.cursor() + all_table = cursor_sqlite.execute("select * from sqlite_master where type = 'table';") + create_sqls = [] + for row in all_table: + s = row[4] + create_sqls.append(s + ";") + try: + conn_opengauss = opengauss.getconn() + cursor_opengauss = conn_opengauss.cursor() + cursor_opengauss.execute("set search_path to %s;" % dbschema) + for sql in create_sqls: + sql = decorator.remove_comment(sql) + sql = decorator.create_without_fk(sql) + cursor_opengauss.execute(sql) + if is_record_sqls: + sqls_log.info(sql.replace("\n", "")) + for sql in conn_sqlite.iterdump(): + sql = decorator.remove_comment(sql) + if sql.upper().startswith("CREATE"): + continue + elif sql.upper().startswith("BEGIN TRANSACTION;") or sql.upper().startswith("COMMIT;"): + continue + else: + sql = decorator.insert(sql) + cursor_opengauss.execute(sql) + if is_record_sqls: + sqls_log.info(sql.replace("\n", "")) + for create_sql in create_sqls: + create_sql = decorator.remove_comment(create_sql) + sqls = decorator.alter_fk(create_sql) + for alter_sql in sqls: + cursor_opengauss.execute(alter_sql) + if is_record_sqls: + sqls_log.info(alter_sql.replace("\n", "")) + table_name = decorator.get_table_name(create_sql) + row_num = cursor_sqlite.execute("SELECT COUNT(*) FROM " + table_name) + sqls = decorator.autoincrement(create_sql, table_name, row_num) + for alter_sql in sqls: + cursor_opengauss.execute(alter_sql) + if is_record_sqls: + sqls_log.info(alter_sql.replace("\n", "")) + indexes = cursor_sqlite.execute("select * from sqlite_master where type = 'index' and sql is not null;") + for row in indexes: + sql = row[4] + ";" + sql = decorator.remove_comment(sql) + cursor_opengauss.execute(sql) + if is_record_sqls: + sqls_log.info(sql) + views = cursor_sqlite.execute("select * from sqlite_master where type = 'view';") + for row in views: + sql = row[4] + sql = decorator.remove_comment(sql) + cursor_opengauss.execute(sql) + if is_record_sqls: + sqls_log.info(sql) + triggers = cursor_sqlite.execute("select * from sqlite_master where type = 'trigger';") + for row in triggers: + trigger_name = row[1] + trigger_sql = row[4] + trigger_sql = decorator.remove_comment(trigger_sql) + function = decorator.trigger_to_function(trigger_name, trigger_sql) + function = decorator.remove_comment(function) + trigger = decorator.new_trigger(trigger_name, trigger_sql) + trigger = decorator.remove_comment(trigger) + cursor_opengauss.execute(function) + cursor_opengauss.execute(trigger) + if is_record_sqls: + sqls_log.info(function) + sqls_log.info(trigger) + + conn_opengauss.commit() + except Exception as e: + error_log.error(e) + finally: + if conn_opengauss is not None: + opengauss.putconn(conn_opengauss) + + time_end = time.time() + + time_c = time_end - time_start + print('Time Cost = %.2f seconds' % time_c) diff --git a/OpenGauss-Project-main/main.py b/OpenGauss-Project-main/main.py new file mode 100644 index 0000000000000000000000000000000000000000..e6e7f2bae420e74a6321f1c01eeef50826e9b10b --- /dev/null +++ b/OpenGauss-Project-main/main.py @@ -0,0 +1,99 @@ +import os +import argparse +import logging +from lib import multi_thread, single_thread +from prop.properties import Properties + + +def main(): + fmt = logging.Formatter(fmt='[%(asctime)s] [%(levelname)s] >>> %(message)s', datefmt='%Y-%m-%d %I:%M:%S') + if not os.path.exists("log"): + os.mkdir("log") + if not os.path.exists("sqlite"): + os.mkdir("sqlite") + file1 = logging.FileHandler(filename='log/error.log', mode='a', encoding='utf-8') + file1.setFormatter(fmt) + error_log = logging.Logger(name='ERROR_LOG', level=logging.ERROR) + error_log.addHandler(file1) + file2 = logging.FileHandler(filename='log/info.log', mode='a', encoding='utf-8') + file2.setFormatter(fmt) + info_log = logging.Logger(name='INFO_LOG', level=logging.INFO) + info_log.addHandler(file2) + + parser = argparse.ArgumentParser(description='Data Migration Script') + parser.add_argument("--opengauss", "-o", default="") + parser.add_argument("--sqlite", "-s", default="") + parser.add_argument("--multithreading", "-m", action="store_true") + + args = parser.parse_args() + + opengauss_properties = {} + is_file_update = False + if args.opengauss != '': + opengauss_file = 'prop/' + str(args.opengauss) + p = Properties(opengauss_file) + opengauss_properties = p.get_properties() + else: + opengauss_file = 'prop/opengauss.properties' + if not opengauss_properties.__contains__('database.name') or opengauss_properties['database.name'] == '': + opengauss_properties['database.name'] = input("Input the database name of OpenGauss:") + is_file_update = True + if not opengauss_properties.__contains__('database.schema') or opengauss_properties['database.schema'] == '': + opengauss_properties['database.schema'] = input("Input the schema name of OpenGauss:") + is_file_update = True + if not opengauss_properties.__contains__('database.host') or opengauss_properties['database.host'] == '': + opengauss_properties['database.host'] = input("Input the host of OpenGauss:") + is_file_update = True + if not opengauss_properties.__contains__('database.port') or opengauss_properties['database.port'] == '': + opengauss_properties['database.port'] = input("Input the port of OpenGauss:") + is_file_update = True + if not opengauss_properties.__contains__('database.user') or opengauss_properties['database.user'] == '': + opengauss_properties['database.user'] = input("Input the username of OpenGauss:") + is_file_update = True + if not opengauss_properties.__contains__('database.password') or opengauss_properties['database.password'] == '': + opengauss_properties['database.password'] = input("Input the user password of OpenGauss:") + is_file_update = True + if is_file_update: + save_message = "Save your input in the %s? [y/n]" % opengauss_file + flag = input(save_message) + if flag.upper() == 'Y' or flag.upper() == 'YES': + Properties.write_properties(opengauss_file, opengauss_properties) + + sqlite_properties = {} + is_file_update = False + if args.sqlite != '': + sqlite_file = str(args.sqlite) + sqlite_file = "prop/" + sqlite_file + p = Properties(sqlite_file) + sqlite_properties = p.get_properties() + else: + sqlite_file = 'prop/sqlite.properties' + if not sqlite_properties.__contains__('database.filename'): + sqlite_properties['database.filename'] = input("Input the filename of Sqlite3:") + is_file_update = True + if is_file_update: + save_message = "Save your input in the %s? [y/n]" % sqlite_file + flag = input(save_message) + if flag.upper() == 'Y' or flag.upper() == 'YES': + Properties.write_properties(sqlite_file, sqlite_properties) + + sqls_log = None + flag = input("Save the SQL statements in Data Migration? [y/n]") + is_record_sqls = False + if flag.upper() == 'Y' or flag.upper() == 'YES': + is_record_sqls = True + file3 = logging.FileHandler(filename='log/sqls.log', mode='a', encoding='utf-8') + file3.setFormatter(fmt) + sqls_log = logging.Logger(name='SQLS_LOG', level=logging.DEBUG) + sqls_log.addHandler(file3) + + if args.multithreading: + multi_thread.multi_thread(opengauss_properties, sqlite_properties, error_log, info_log, sqls_log, + is_record_sqls) + else: + single_thread.single_thread(opengauss_properties, sqlite_properties, error_log, info_log, sqls_log, + is_record_sqls) + + +if __name__ == '__main__': + main() diff --git a/OpenGauss-Project-main/prop/opengauss.properties b/OpenGauss-Project-main/prop/opengauss.properties new file mode 100644 index 0000000000000000000000000000000000000000..71cd8dce07a1004e45158e49dc81f712d901fd08 --- /dev/null +++ b/OpenGauss-Project-main/prop/opengauss.properties @@ -0,0 +1,6 @@ +database.name= +database.schema= +database.host= +database.port= +database.user= +database.password= diff --git a/OpenGauss-Project-main/prop/properties.py b/OpenGauss-Project-main/prop/properties.py new file mode 100644 index 0000000000000000000000000000000000000000..b3df7ddf562d3ebaf62558865da143ec88e6a3fd --- /dev/null +++ b/OpenGauss-Project-main/prop/properties.py @@ -0,0 +1,39 @@ +import logging + + +class Properties: + def __init__(self, file_name: str): + self.file_name = file_name + self.properties = {} + try: + pro_file = open(self.file_name, 'r', encoding='utf-8') + for line in pro_file: + if line.find('=') > 0: + strs = line.replace('\n', '').split('=') + self.properties[strs[0]] = strs[1] + except Exception as e: + logging.error(e) + raise e + else: + pro_file.close() + + def get_properties(self): + return self.properties + + @staticmethod + def write_properties(file_name: str, properties: dict): + try: + properties_file = open(file_name, 'w', encoding='utf-8') + for k in properties: + s = k + "=" + properties[k] + "\n" + if k == 'database.password': + flag = input("Save the password? [y/n]") + if flag.upper() == 'Y' or flag.upper() == 'YES': + properties_file.write(s) + else: + properties_file.write(s) + except Exception as e: + logging.error(e) + raise e + else: + properties_file.close() diff --git a/OpenGauss-Project-main/prop/sqlite.properties b/OpenGauss-Project-main/prop/sqlite.properties new file mode 100644 index 0000000000000000000000000000000000000000..a4c502e7fa13bddc8658435e0bae890b66f830b8 --- /dev/null +++ b/OpenGauss-Project-main/prop/sqlite.properties @@ -0,0 +1 @@ +database.filename= diff --git a/OpenGauss-Project-main/sqlite/shenzhen_metro.sqlite b/OpenGauss-Project-main/sqlite/shenzhen_metro.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..9cb59817059ee2c32da6069cd366dda79332d146 Binary files /dev/null and b/OpenGauss-Project-main/sqlite/shenzhen_metro.sqlite differ diff --git a/examples b/examples new file mode 160000 index 0000000000000000000000000000000000000000..fdf0697f87f2f8a600aa718c467fef9838d54405 --- /dev/null +++ b/examples @@ -0,0 +1 @@ +Subproject commit fdf0697f87f2f8a600aa718c467fef9838d54405