From c710b7d8c6b31646422add79034207129d4be11f Mon Sep 17 00:00:00 2001 From: luqichao Date: Sat, 18 May 2024 14:34:49 +0800 Subject: [PATCH 1/7] add uwal delegate and hook point --- sql/binlog.cc | 29 ++++++++++- sql/mysqld.cc | 4 ++ sql/replication.h | 89 +++++++++++++++++++++++++++++++++ sql/rpl_handler.cc | 120 +++++++++++++++++++++++++++++++++++++++++++++ sql/rpl_handler.h | 46 +++++++++++++++++ sql/rpl_replica.cc | 109 +++++++++++++++++++++++----------------- sql/sql_parse.cc | 4 +- 7 files changed, 352 insertions(+), 49 deletions(-) diff --git a/sql/binlog.cc b/sql/binlog.cc index bec6a1f6c..aceef27f7 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -429,6 +429,10 @@ class MYSQL_BIN_LOG::Binlog_ofile : public Basic_ostream { m_pipeline_head = std::move(encrypted_ostream); } + if (RUN_HOOK(binlog_uwal, after_open, (binlog_name))) { + return true; + } + return false; } @@ -511,6 +515,14 @@ class MYSQL_BIN_LOG::Binlog_ofile : public Basic_ostream { bool write(const unsigned char *buffer, my_off_t length) override { assert(m_pipeline_head != nullptr); + if (!NO_HOOK(binlog_uwal)) { + if (RUN_HOOK(binlog_uwal, before_flush, (buffer, length, m_position))) { + return true; + } + m_position += length; + return false; + } + if (m_pipeline_head->write(buffer, length)) return true; m_position += length; @@ -550,8 +562,21 @@ class MYSQL_BIN_LOG::Binlog_ofile : public Basic_ostream { return false; } - bool flush() { return m_pipeline_head->flush(); } - bool sync() { return m_pipeline_head->sync(); } + bool flush() { + if (RUN_HOOK(binlog_uwal, before_sync, ())) { + return true; + } + return m_pipeline_head->flush(); + } + + bool sync() { + bool ret = m_pipeline_head->sync(); + if (RUN_HOOK(binlog_uwal, uwal_after_sync, ())) { + return true; + } + return ret; + } + bool flush_and_sync() { return flush() || sync(); } my_off_t position() { return m_position; } bool is_empty() { return position() == 0; } diff --git a/sql/mysqld.cc b/sql/mysqld.cc index dda66f3ff..4d8e8d43d 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -11802,6 +11802,8 @@ PSI_rwlock_key key_rwlock_channel_to_filter_lock; PSI_rwlock_key key_rwlock_Trans_delegate_lock; PSI_rwlock_key key_rwlock_Server_state_delegate_lock; PSI_rwlock_key key_rwlock_Binlog_storage_delegate_lock; +PSI_rwlock_key key_rwlock_Binlog_uwal_delegate_lock; +PSI_rwlock_key key_rwlock_Binlog_relay_uwal_delegate_lock; PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock; PSI_rwlock_key key_rwlock_Binlog_relay_IO_delegate_lock; PSI_rwlock_key key_rwlock_resource_group_mgr_map_lock; @@ -11822,6 +11824,8 @@ static PSI_rwlock_info all_server_rwlocks[]= { &key_rwlock_Trans_delegate_lock, "Trans_delegate::lock", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}, { &key_rwlock_Server_state_delegate_lock, "Server_state_delegate::lock", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}, { &key_rwlock_Binlog_storage_delegate_lock, "Binlog_storage_delegate::lock", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}, + { &key_rwlock_Binlog_uwal_delegate_lock, "Binlog_uwal_delegate::lock", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}, + { &key_rwlock_Binlog_relay_uwal_delegate_lock, "Binlog_relay_uwal_delegate::lock", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}, { &key_rwlock_receiver_sid_lock, "gtid_retrieved", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}, { &key_rwlock_rpl_filter_lock, "rpl_filter_lock", 0, 0, PSI_DOCUMENT_ME}, { &key_rwlock_channel_to_filter_lock, "channel_to_filter_lock", 0, 0, PSI_DOCUMENT_ME}, diff --git a/sql/replication.h b/sql/replication.h index ff233d8e2..6f04c8026 100644 --- a/sql/replication.h +++ b/sql/replication.h @@ -433,6 +433,47 @@ typedef struct Binlog_storage_observer { after_sync_t after_sync; } Binlog_storage_observer; +/** + Binlog uwal observer paramters +*/ +typedef struct Binlog_uwal_param { + uint32 server_id; +} Binlog_uwal_param; + +typedef int (*before_flush_t)(const unsigned char *buffer, my_off_t length, my_off_t position); +typedef int (*before_sync_t)(); +typedef int (*after_register_t)(uint32 server_id); +typedef int (*uwal_after_sync_t)(); +typedef int (*after_open_t)(const char *binlog_name); + +/** + Observe binlog logging uwal +*/ +typedef struct Binlog_uwal_observer { + uint32 len; + + before_flush_t before_flush; + before_sync_t before_sync; + after_register_t after_register; + uwal_after_sync_t uwal_after_sync; + after_open_t after_open; +} Binlog_uwal_observer; + +typedef int (*relay_after_register_t)(); +typedef int (*relay_read_t)(ulong &len, const char **buffer); +typedef int (*relay_is_uwal_relay_t)(int &uwal_skip); + +/** + Observe relaylog logging uwal +*/ +typedef struct Binlog_relay_uwal_observer { + uint32 len; + + relay_after_register_t after_register; + relay_read_t read; + relay_is_uwal_relay_t is_uwal_relay; +} Binlog_relay_uwal_observer; + /** Replication binlog transmitter (binlog dump) observer parameter. */ @@ -774,6 +815,54 @@ int register_binlog_storage_observer(Binlog_storage_observer *observer, int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p); +/** + Register a binlog uwal observer + + @param observer The binlog uwal observer to register + @param p pointer to the internal plugin structure + + @retval 0 Success + @retval 1 Observer already exists +*/ +int register_binlog_uwal_observer(Binlog_uwal_observer *observer, + void *p); + +/** + Unregister a binlog uwal observer + + @param observer The binlog uwal observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Success + @retval 1 Observer not exists +*/ +int unregister_binlog_uwal_observer(Binlog_uwal_observer *observer, + void *p); + +/** + Register a binlog relay uwal observer + + @param observer The binlog relay uwal observer to register + @param p pointer to the internal plugin structure + + @retval 0 Success + @retval 1 Observer already exists +*/ +int register_binlog_relay_uwal_observer(Binlog_relay_uwal_observer *observer, + void *p); + +/** + Unregister a binlog relay uwal observer + + @param observer The binlog relay uwal observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Success + @retval 1 Observer not exists +*/ +int unregister_binlog_relay_uwal_observer(Binlog_relay_uwal_observer *observer, + void *p); + /** Register a binlog transmit observer diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc index 2124b3041..ae2bbe2f7 100644 --- a/sql/rpl_handler.cc +++ b/sql/rpl_handler.cc @@ -69,6 +69,8 @@ Trans_delegate *transaction_delegate; Binlog_storage_delegate *binlog_storage_delegate; +Binlog_uwal_delegate *binlog_uwal_delegate; +Binlog_relay_uwal_delegate *binlog_relay_uwal_delegate; Server_state_delegate *server_state_delegate; Binlog_transmit_delegate *binlog_transmit_delegate; @@ -353,6 +355,10 @@ int delegates_init() { alignas(Trans_delegate) static char place_trans_mem[sizeof(Trans_delegate)]; alignas(Binlog_storage_delegate) static char place_storage_mem[sizeof(Binlog_storage_delegate)]; + alignas(Binlog_uwal_delegate) static char + place_uwal_mem[sizeof(Binlog_uwal_delegate)]; + alignas(Binlog_relay_uwal_delegate) static char + place_relay_uwal_mem[sizeof(Binlog_relay_uwal_delegate)]; alignas(Server_state_delegate) static char place_state_mem[sizeof(Server_state_delegate)]; alignas(Binlog_transmit_delegate) static char @@ -372,6 +378,16 @@ int delegates_init() { return 1; } + binlog_uwal_delegate = new (place_uwal_mem) Binlog_uwal_delegate; + if (!binlog_uwal_delegate->is_inited()) { + return 1; + } + + binlog_relay_uwal_delegate = new (place_relay_uwal_mem) Binlog_relay_uwal_delegate; + if (!binlog_relay_uwal_delegate->is_inited()) { + return 1; + } + server_state_delegate = new (place_state_mem) Server_state_delegate; binlog_transmit_delegate = new (place_transmit_mem) Binlog_transmit_delegate; if (!binlog_transmit_delegate->is_inited()) { @@ -401,6 +417,10 @@ void delegates_destroy() { if (transaction_delegate) transaction_delegate->~Trans_delegate(); if (binlog_storage_delegate) binlog_storage_delegate->~Binlog_storage_delegate(); + if (binlog_uwal_delegate) + binlog_uwal_delegate->~Binlog_uwal_delegate(); + if (binlog_relay_uwal_delegate) + binlog_relay_uwal_delegate->~Binlog_relay_uwal_delegate(); if (server_state_delegate) server_state_delegate->~Server_state_delegate(); if (binlog_transmit_delegate) binlog_transmit_delegate->~Binlog_transmit_delegate(); @@ -412,6 +432,10 @@ static void delegates_update_plugin_ref_count() { if (transaction_delegate) transaction_delegate->update_plugin_ref_count(); if (binlog_storage_delegate) binlog_storage_delegate->update_plugin_ref_count(); + if (binlog_uwal_delegate) + binlog_uwal_delegate->update_plugin_ref_count(); + if (binlog_relay_uwal_delegate) + binlog_relay_uwal_delegate->update_plugin_ref_count(); if (server_state_delegate) server_state_delegate->update_plugin_ref_count(); if (binlog_transmit_delegate) binlog_transmit_delegate->update_plugin_ref_count(); @@ -422,6 +446,8 @@ static void delegates_update_plugin_ref_count() { void delegates_acquire_locks() { if (transaction_delegate) transaction_delegate->write_lock(); if (binlog_storage_delegate) binlog_storage_delegate->write_lock(); + if (binlog_uwal_delegate) binlog_uwal_delegate->write_lock(); + if (binlog_relay_uwal_delegate) binlog_relay_uwal_delegate->write_lock(); if (server_state_delegate) server_state_delegate->write_lock(); if (binlog_transmit_delegate) binlog_transmit_delegate->write_lock(); if (binlog_relay_io_delegate) binlog_relay_io_delegate->write_lock(); @@ -430,6 +456,8 @@ void delegates_acquire_locks() { void delegates_release_locks() { if (transaction_delegate) transaction_delegate->unlock(); if (binlog_storage_delegate) binlog_storage_delegate->unlock(); + if (binlog_uwal_delegate) binlog_uwal_delegate->unlock(); + if (binlog_relay_uwal_delegate) binlog_relay_uwal_delegate->unlock(); if (server_state_delegate) server_state_delegate->unlock(); if (binlog_transmit_delegate) binlog_transmit_delegate->unlock(); if (binlog_relay_io_delegate) binlog_relay_io_delegate->unlock(); @@ -440,6 +468,8 @@ void delegates_update_lock_type() { if (transaction_delegate) transaction_delegate->update_lock_type(); if (binlog_storage_delegate) binlog_storage_delegate->update_lock_type(); + if (binlog_uwal_delegate) binlog_uwal_delegate->update_lock_type(); + if (binlog_relay_uwal_delegate) binlog_relay_uwal_delegate->update_lock_type(); if (server_state_delegate) server_state_delegate->update_lock_type(); if (binlog_transmit_delegate) binlog_transmit_delegate->update_lock_type(); if (binlog_relay_io_delegate) binlog_relay_io_delegate->update_lock_type(); @@ -910,6 +940,70 @@ int Binlog_storage_delegate::after_flush(THD *thd, const char *log_file, return ret; } +int Binlog_uwal_delegate::before_flush(const unsigned char *buffer, my_off_t length, my_off_t position) { + DBUG_TRACE; + + int ret = 0; + FOREACH_OBSERVER(ret, before_flush, (buffer, length, position)); + return ret; +} + +int Binlog_uwal_delegate::before_sync() { + DBUG_TRACE; + + int ret = 0; + FOREACH_OBSERVER(ret, before_sync, ()); + return ret; +} + +int Binlog_uwal_delegate::after_register(uint32 server_id) { + DBUG_TRACE; + + int ret = 0; + FOREACH_OBSERVER(ret, after_register, (server_id)); + return ret; +} + +int Binlog_uwal_delegate::uwal_after_sync() { + DBUG_TRACE; + + int ret = 0; + FOREACH_OBSERVER(ret, uwal_after_sync, ()); + return ret; +} + +int Binlog_uwal_delegate::after_open(const char *binlog_name) { + DBUG_TRACE; + + int ret = 0; + FOREACH_OBSERVER(ret, after_open, (binlog_name)); + return ret; +} + +int Binlog_relay_uwal_delegate::after_register() { + DBUG_TRACE; + + int ret = 0; + FOREACH_OBSERVER(ret, after_register, ()); + return ret; +} + +int Binlog_relay_uwal_delegate::read(ulong &len, const char **buffer) { + DBUG_TRACE; + + int ret = 0; + FOREACH_OBSERVER(ret, read, (len, buffer)); + return ret; +} + +int Binlog_relay_uwal_delegate::is_uwal_relay(int &uwal_skip) { + DBUG_TRACE; + + int ret = 0; + FOREACH_OBSERVER(ret, is_uwal_relay, (uwal_skip)); + return ret; +} + /** * This hook MUST be invoked after ALL recovery operations are performed * and the server is ready to serve clients. @@ -1313,6 +1407,32 @@ int unregister_binlog_storage_observer(Binlog_storage_observer *observer, return binlog_storage_delegate->remove_observer(observer); } +int register_binlog_uwal_observer(Binlog_uwal_observer *observer, + void *p) { + DBUG_TRACE; + int result = + binlog_uwal_delegate->add_observer(observer, (st_plugin_int *)p); + return result; +} + +int unregister_binlog_uwal_observer(Binlog_uwal_observer *observer, + void *) { + return binlog_uwal_delegate->remove_observer(observer); +} + +int register_binlog_relay_uwal_observer(Binlog_relay_uwal_observer *observer, + void *p) { + DBUG_TRACE; + int result = + binlog_relay_uwal_delegate->add_observer(observer, (st_plugin_int *)p); + return result; +} + +int unregister_binlog_relay_uwal_observer(Binlog_relay_uwal_observer *observer, + void *) { + return binlog_relay_uwal_delegate->remove_observer(observer); +} + int register_server_state_observer(Server_state_observer *observer, void *plugin_var) { DBUG_TRACE; diff --git a/sql/rpl_handler.h b/sql/rpl_handler.h index 4b2d11977..9576279d3 100644 --- a/sql/rpl_handler.h +++ b/sql/rpl_handler.h @@ -47,6 +47,8 @@ class THD; struct Binlog_relay_IO_observer; struct Binlog_relay_IO_param; struct Binlog_storage_observer; +struct Binlog_uwal_observer; +struct Binlog_relay_uwal_observer; struct Binlog_transmit_observer; struct Server_state_observer; struct Trans_observer; @@ -367,6 +369,48 @@ class Binlog_storage_delegate : public Delegate { int after_sync(THD *thd, const char *log_file, my_off_t log_pos); }; +#ifdef HAVE_PSI_RWLOCK_INTERFACE +extern PSI_rwlock_key key_rwlock_Binlog_uwal_delegate_lock; +#endif + +class Binlog_uwal_delegate : public Delegate { + public: + Binlog_uwal_delegate() + : Delegate( +#ifdef HAVE_PSI_RWLOCK_INTERFACE + key_rwlock_Binlog_uwal_delegate_lock +#endif + ) { + } + + typedef Binlog_uwal_observer Observer; + int before_flush(const unsigned char *buffer, my_off_t length, my_off_t position); + int before_sync(); + int after_register(uint32 server_id); + int uwal_after_sync(); + int after_open(const char *binlog_name); +}; + +#ifdef HAVE_PSI_RWLOCK_INTERFACE +extern PSI_rwlock_key key_rwlock_Binlog_relay_uwal_delegate_lock; +#endif + +class Binlog_relay_uwal_delegate : public Delegate { + public: + Binlog_relay_uwal_delegate() + : Delegate( +#ifdef HAVE_PSI_RWLOCK_INTERFACE + key_rwlock_Binlog_relay_uwal_delegate_lock +#endif + ) { + } + + typedef Binlog_relay_uwal_observer Observer; + int after_register(); + int read(ulong &len, const char **buffer); + int is_uwal_relay(int &uwal_skip); +}; + #ifdef HAVE_PSI_RWLOCK_INTERFACE extern PSI_rwlock_key key_rwlock_Binlog_transmit_delegate_lock; #endif @@ -446,6 +490,8 @@ void delegates_update_lock_type(); extern Trans_delegate *transaction_delegate; extern Binlog_storage_delegate *binlog_storage_delegate; +extern Binlog_uwal_delegate *binlog_uwal_delegate; +extern Binlog_relay_uwal_delegate *binlog_relay_uwal_delegate; extern Server_state_delegate *server_state_delegate; extern Binlog_transmit_delegate *binlog_transmit_delegate; extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; diff --git a/sql/rpl_replica.cc b/sql/rpl_replica.cc index bf4a2fbb6..70bc0890e 100644 --- a/sql/rpl_replica.cc +++ b/sql/rpl_replica.cc @@ -5263,6 +5263,8 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, return 0; } +int uwal_skip = 0; +bool skip_fake_rotate = false; /** Slave IO thread entry point. @@ -5442,6 +5444,7 @@ extern "C" void *handle_slave_io(void *arg) { goto err; goto connected; } + RUN_HOOK(binlog_relay_uwal, after_register, ()); DBUG_PRINT("info", ("Starting reading binary log from master")); while (!io_slave_killed(thd, mi)) { @@ -5473,56 +5476,70 @@ extern "C" void *handle_slave_io(void *arg) { we're in fact receiving nothing. */ THD_STAGE_INFO(thd, stage_waiting_for_source_to_send_event); - event_len = read_event(mysql, &rpl, mi, &suppress_warnings); - if (check_io_slave_killed(thd, mi, - "Slave I/O thread killed while " - "reading event")) + if (!NO_HOOK(binlog_relay_uwal) && uwal_skip) { + thd->killed = THD::KILL_CONNECTION; goto err; + } else { + event_len = read_event(mysql, &rpl, mi, &suppress_warnings); + if (check_io_slave_killed(thd, mi, + "Slave I/O thread killed while " + "reading event")) + goto err; - if (event_len == packet_error || - DBUG_EVALUATE_IF("simulate_reconnect_after_failed_event_read", 1, - 0)) { - uint mysql_error_number = mysql_errno(mysql); - switch (mysql_error_number) { - case CR_NET_PACKET_TOO_LARGE: - LogErr(ERROR_LEVEL, - ER_RPL_LOG_ENTRY_EXCEEDS_REPLICA_MAX_ALLOWED_PACKET, - replica_max_allowed_packet); - mi->report(ERROR_LEVEL, ER_SERVER_NET_PACKET_TOO_LARGE, "%s", - "Got a packet bigger than " - "'replica_max_allowed_packet' bytes"); - goto err; - case ER_MASTER_FATAL_ERROR_READING_BINLOG: - mi->report(ERROR_LEVEL, - ER_SERVER_MASTER_FATAL_ERROR_READING_BINLOG, - ER_THD(thd, ER_MASTER_FATAL_ERROR_READING_BINLOG), - mysql_error_number, mysql_error(mysql)); - goto err; - case ER_OUT_OF_RESOURCES: - LogErr(ERROR_LEVEL, ER_RPL_SLAVE_STOPPING_AS_MASTER_OOM); - mi->report(ERROR_LEVEL, ER_SERVER_OUT_OF_RESOURCES, "%s", - ER_THD(thd, ER_SERVER_OUT_OF_RESOURCES)); + if (event_len == packet_error || + DBUG_EVALUATE_IF("simulate_reconnect_after_failed_event_read", 1, + 0)) { + uint mysql_error_number = mysql_errno(mysql); + switch (mysql_error_number) { + case CR_NET_PACKET_TOO_LARGE: + LogErr(ERROR_LEVEL, + ER_RPL_LOG_ENTRY_EXCEEDS_REPLICA_MAX_ALLOWED_PACKET, + replica_max_allowed_packet); + mi->report(ERROR_LEVEL, ER_SERVER_NET_PACKET_TOO_LARGE, "%s", + "Got a packet bigger than " + "'replica_max_allowed_packet' bytes"); + goto err; + case ER_MASTER_FATAL_ERROR_READING_BINLOG: + mi->report(ERROR_LEVEL, + ER_SERVER_MASTER_FATAL_ERROR_READING_BINLOG, + ER_THD(thd, ER_MASTER_FATAL_ERROR_READING_BINLOG), + mysql_error_number, mysql_error(mysql)); + goto err; + case ER_OUT_OF_RESOURCES: + LogErr(ERROR_LEVEL, ER_RPL_SLAVE_STOPPING_AS_MASTER_OOM); + mi->report(ERROR_LEVEL, ER_SERVER_OUT_OF_RESOURCES, "%s", + ER_THD(thd, ER_SERVER_OUT_OF_RESOURCES)); + goto err; + } + if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, + reconnect_messages_after_failed_event_read)) goto err; - } - if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, - reconnect_messages_after_failed_event_read)) + goto connected; + } // if (event_len == packet_error) + + retry_count = 0; // ok event, reset retry counter + THD_STAGE_INFO(thd, stage_queueing_source_event_to_the_relay_log); + event_buf = (const char *)mysql->net.read_pos + 1; + DBUG_PRINT("info", ("IO thread received event of type %s", + Log_event::get_type_str( + (Log_event_type)event_buf[EVENT_TYPE_OFFSET]))); + if (RUN_HOOK(binlog_relay_io, after_read_event, + (thd, mi, (const char *)mysql->net.read_pos + 1, event_len, + &event_buf, &event_len))) { + mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, + ER_THD(thd, ER_SLAVE_FATAL_ERROR), + "Failed to run 'after_read_event' hook"); goto err; - goto connected; - } // if (event_len == packet_error) - - retry_count = 0; // ok event, reset retry counter - THD_STAGE_INFO(thd, stage_queueing_source_event_to_the_relay_log); - event_buf = (const char *)mysql->net.read_pos + 1; - DBUG_PRINT("info", ("IO thread received event of type %s", - Log_event::get_type_str( - (Log_event_type)event_buf[EVENT_TYPE_OFFSET]))); - if (RUN_HOOK(binlog_relay_io, after_read_event, - (thd, mi, (const char *)mysql->net.read_pos + 1, event_len, - &event_buf, &event_len))) { - mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, - ER_THD(thd, ER_SLAVE_FATAL_ERROR), - "Failed to run 'after_read_event' hook"); - goto err; + } + + Log_event_type event_type = (Log_event_type) event_buf[EVENT_TYPE_OFFSET]; + if (!NO_HOOK(binlog_relay_uwal) && event_type == binary_log::ROTATE_EVENT) { + if (!skip_fake_rotate) { + skip_fake_rotate = true; + } else { + uwal_skip = 1; + } + } } /* XXX: 'synced' should be updated by queue_event to indicate diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 063c473e9..416562c69 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1829,8 +1829,10 @@ bool dispatch_command(THD *thd, const COM_DATA *com_data, case COM_REGISTER_SLAVE: { // TODO: access of protocol_classic should be removed if (!register_replica(thd, thd->get_protocol_classic()->get_raw_packet(), - thd->get_protocol_classic()->get_packet_length())) + thd->get_protocol_classic()->get_packet_length())) { my_ok(thd); + RUN_HOOK(binlog_uwal, after_register, (thd->server_id)); + } break; } case COM_RESET_CONNECTION: { -- Gitee From 544b59e4fbb5fcb3e06549efcc38bbcb17c29ef3 Mon Sep 17 00:00:00 2001 From: luqichao Date: Sat, 18 May 2024 15:10:47 +0800 Subject: [PATCH 2/7] add error log for uwal plugin --- share/messages_to_error_log.txt | 63 +++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/share/messages_to_error_log.txt b/share/messages_to_error_log.txt index c593f51ca..29c45507f 100644 --- a/share/messages_to_error_log.txt +++ b/share/messages_to_error_log.txt @@ -12072,6 +12072,69 @@ ER_IB_INDEX_LOADER_DONE ER_IB_INDEX_BUILDER_DONE eng "Builder::finish(): Completed building index=%s of table=%s, err=%zu." +ER_UWAL_RELAY_QUERY_FAILED + eng "Relay query uwal failed." + +ER_UWAL_REPLICA_NOTIFY + eng "Replica notify master." + +ER_UWAL_RELAY_READ_INFO + eng "uwal_buffer_read_len %lu uwal_read_offset %lu uwal_write_offset %lu" + +ER_UWAL_RELAY_READ_UWAL_FAILED + eng "uwal_read failed, ret %d uwal_read_offset %lu, read_len %lu." + +ER_UWAL_RELAY_READ_FAILED + eng "Relay read uwal failed, ret %d." + +ER_UWAL_REPLICA_NOTIFY_FAILED + eng "UWAL replica init notify failed, ret: %d." + +ER_UWAL_SYNC_WRITE_FAILED + eng "UWAL sync write failed, ret %d." + +ER_UWAL_NOT_SUPPORT + eng "Runtime enable, not support now." + +ER_UWAL_MASTER_START_NOTIFY + eng "Master start to notify node %u." + +ER_UWAL_MASTER_NOTIFY_REPEATEDLY + eng "Master already notify the node" + +ER_UWAL_PRIMARY_NOTIFY_FAILED + eng "UWAL primary init notify failed, ret %d." + +ER_UWAL_NOTIFY_SUCCESS + eng "UwalNotifyNodeListChange success" + +ER_UWAL_NOTIFY_WAIT + eng "UwalNotifyNodeListChange ret 0 for %u count, wait callback ..." + +ER_UWAL_PATH + eng "uwal path %s" + +ER_UWAL_LOG_PATH + eng "uwal log path %s" + +ER_UWAL_DISK_SIZE + eng "uwal disk size %lu" + +ER_UWAL_ID + eng "uwal id %d" + +ER_UWAL_IP + eng "uwal ip %s" + +ER_UWAL_PORT + eng "uwal port %d" + +ER_UWAL_PROTOCOL + eng "uwal protocol %s" + +ER_UWAL_STILL_IN_USE + eng "The laste uwal is still in use!" + # DO NOT add server-to-client messages here; # they go in messages_to_clients.txt # in the same directory as this file. -- Gitee From 7435065b1eb44d5bbcbbacb3fb58d4a00f248771 Mon Sep 17 00:00:00 2001 From: luqichao Date: Sat, 18 May 2024 14:56:59 +0800 Subject: [PATCH 3/7] dlopen uwal --- plugin/uwal/uwal_adaptor.cc | 198 ++++++++++++++++ plugin/uwal/uwal_adaptor.h | 442 ++++++++++++++++++++++++++++++++++++ 2 files changed, 640 insertions(+) create mode 100644 plugin/uwal/uwal_adaptor.cc create mode 100644 plugin/uwal/uwal_adaptor.h diff --git a/plugin/uwal/uwal_adaptor.cc b/plugin/uwal/uwal_adaptor.cc new file mode 100644 index 000000000..821e5ec00 --- /dev/null +++ b/plugin/uwal/uwal_adaptor.cc @@ -0,0 +1,198 @@ +/* + * Copyright (c) Huawei Technologies Co.,Ltd. 2022-2024. All rights reserved. + */ + +#include +#include +#include +#include +#include +#include +#include +#include "plugin/uwal/uwal_adaptor.h" + +#ifndef MAX_PATH_LEN +#define MAX_PATH_LEN UWAL_MAX_PATH_LEN +#endif + +#define UWAL_ENV_PATH "UWAL_LIB_PATH" +#define UWAL_SO_NAME "libuwal.so" + +typedef struct { + void* g_uwal_handle; + UwalInit init; + UwalExit uninit; + UwalCreate create; + UwalDelete remove; + UwalAppend uappend; + UwalRead uread; + UwalTruncate truncate; + UwalQuery query; + UwalQueryByUser query_by_user; + UwalSetRewindPoint set_rewind_point; + UwalRegisterCertVerifyFunc register_cert_verify_func; + UwalNotifyNodeListChange notify_nodelist_change; + UWAL_Ipv4InetToInt ipv4_inet_to_int; + UwalAppendCopyFree uappend_copy_free; + UwalAllocAppendMem alloc_append_mem; +} uwal_func_t; + +uwal_func_t g_uwal_func; + +#define UWAL_LOAD_SYMBOLS(ACTION) \ + ACTION(init, UwalInit) \ + ACTION(uninit, UwalExit) \ + ACTION(create, UwalCreate) \ + ACTION(remove, UwalDelete) \ + ACTION(uappend, UwalAppend) \ + ACTION(uread, UwalRead) \ + ACTION(truncate, UwalTruncate) \ + ACTION(query, UwalQuery) \ + ACTION(query_by_user, UwalQueryByUser) \ + ACTION(set_rewind_point, UwalSetRewindPoint) \ + ACTION(register_cert_verify_func, UwalRegisterCertVerifyFunc) \ + ACTION(notify_nodelist_change, UwalNotifyNodeListChange) \ + ACTION(ipv4_inet_to_int, UWAL_Ipv4InetToInt) \ + ACTION(uappend_copy_free, UwalAppendCopyFree) \ + ACTION(alloc_append_mem, UwalAllocAppendMem) + +#define UWAL_HANDLE_GET_SYM(op, name) \ + do { \ + const char *dlsym_err = nullptr; \ + g_uwal_func.op = (name)dlsym(g_uwal_func.g_uwal_handle, #name); \ + dlsym_err = dlerror(); \ + if (dlsym_err != nullptr) { \ + return -1; \ + } \ + } while (0); + +static int uwal_resolve_path(char* absolute_path, const char* raw_path, const char* filename) +{ + char path[MAX_PATH_LEN] = { 0 }; + + if (!realpath(raw_path, path)) { + if (errno != ENOENT && errno != EACCES) { + return -1; + } + } + + int ret = snprintf(absolute_path, MAX_PATH_LEN, "%s/%s", path, filename); + if (ret < 0) { + return -1; + } + return 0; +} + +static int uwal_open_dl(void **lib_handle, char *symbol) +{ +#ifdef WIN32 + return -1; +#else + *lib_handle = dlopen(symbol, RTLD_LAZY); + if (*lib_handle == nullptr) { + return -1; + } +#endif + return 0; +} + +static int uwal_load_symbols(char* lib_dl_path) +{ + int ret = uwal_open_dl(&g_uwal_func.g_uwal_handle, lib_dl_path); + if (ret != 0) { + return ret; + } + UWAL_LOAD_SYMBOLS(UWAL_HANDLE_GET_SYM); + return 0; +} + +int uwal_init_symbols() +{ + char lib_dl_path[MAX_PATH_LEN] = { 0 }; + char* raw_path = getenv(UWAL_ENV_PATH); + if (raw_path == nullptr) { + return -1; + } + + int ret = uwal_resolve_path(lib_dl_path, raw_path, UWAL_SO_NAME); + if (ret != 0) { + return ret; + } + return uwal_load_symbols(lib_dl_path); +} + +int ock_uwal_init(IN const char *path, IN const UwalCfgElem *elems, IN int cnt, IN const char *ulogPath) +{ + return g_uwal_func.init(path, elems, cnt, ulogPath); +} + +void ock_uwal_exit(void) +{ + g_uwal_func.uninit(); +} + +int ock_uwal_create(IN UwalCreateParam *param, OUT UwalVector *uwals) +{ + return g_uwal_func.create(param, uwals); +} + +int ock_uwal_delete(IN UwalDeleteParam *param) +{ + return g_uwal_func.remove(param); +} + +int ock_uwal_append(IN UwalAppendParam *param, OUT uint64_t *offset, OUT void* result) +{ + return g_uwal_func.uappend(param, offset, result); +} + +int ock_uwal_read(IN UwalReadParam *param, OUT UwalBufferList *bufferList) +{ + return g_uwal_func.uread(param, bufferList); +} + +int ock_uwal_truncate(IN const UwalId *uwalId, IN uint64_t offset) +{ + return g_uwal_func.truncate(uwalId, offset); +} + +int ock_uwal_query(IN UwalQueryParam *param, OUT UwalInfo *info) +{ + return g_uwal_func.query(param, info); +} + +int ock_uwal_query_by_user(IN UwalUserType user, IN UwalRouteType route, OUT UwalVector *uwals) +{ + return g_uwal_func.query_by_user(user, route, uwals); +} + +int ock_uwal_set_rewind_point(IN UwalId *uwalId, IN uint64_t offset) +{ + return g_uwal_func.set_rewind_point(uwalId, offset); +} + +int ock_uwal_register_cert_verify_func(int32_t (*certVerify)(void* certStoreCtx, const char *crlPath), + int32_t (*getKeyPass)(char *keyPassBuff, uint32_t keyPassBuffLen, char *keyPassPath)) +{ + return g_uwal_func.register_cert_verify_func(certVerify, getKeyPass); +} + +int32_t ock_uwal_notify_nodelist_change(NodeStateList *nodeList, FinishCbFun cb, void *ctx) +{ + return g_uwal_func.notify_nodelist_change(nodeList, cb, ctx); +} + +uint32_t ock_uwal_ipv4_inet_to_int(char ipv4[16UL]) +{ + return g_uwal_func.ipv4_inet_to_int(ipv4); +} + +int ock_uwal_append_copy_free(IN UwalAppendParam *param, OUT uint64_t *offset, OUT void *result) +{ + return g_uwal_func.uappend_copy_free(param, offset, result); +} + +void *ock_uwal_alloc_append_mem(IN uint32_t len) +{ + return g_uwal_func.alloc_append_mem(len); +} \ No newline at end of file diff --git a/plugin/uwal/uwal_adaptor.h b/plugin/uwal/uwal_adaptor.h new file mode 100644 index 000000000..6c6570966 --- /dev/null +++ b/plugin/uwal/uwal_adaptor.h @@ -0,0 +1,442 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2022-2024. All rights reserved. + */ + +#ifndef __UWAL_ADAPTOR_H__ +#define __UWAL_ADAPTOR_H__ + +#include +#include + +#define UWAL_PORT_LEN 8 +#define UWAL_BYTE_SIZE_LEN 32 +#define UWAL_INT_SIZE_LEN 24 +#define UWAL_MAX_PATH_LEN 4096 +#define UWAL_MAX_NUM 5 +#define UWAL_OBJ_ALIGN_LEN (2097152UL) +#define MAX_NODE 8 +#define UWAL_IP_LEN 16 +#define UWAL_PROTOCOL_TCP "tcp" +#define UWAL_PROTOCOL_RDMA "rdma" +#define UWAL_DISK_POOLID "1" +#define UWAL_MIN_DISK_SIZE 8589934592 +#define UWAL_MAX_DISK_SIZE 4398046511104 +#define UWAL_BLOCK_SIZE 2097152 + +#ifndef IN +#define IN +#endif + +#ifndef OUT +#define OUT +#endif + +/* ******** + * return code definition + * ******* */ +typedef enum { + PRIMARY_APPEND_LOG_FAIL = -100002, + REPLICA_APPEND_LOG_FAIL = -100001, + U_BUSY = -100000, // System busy. Please retry later. + U_ARGS_INVAILD = -99999, // Invalid input args. + U_REQUEST_TIMEOUT = -99998, // Request timeout. + U_CAPACITY_NOT_ENOUGH = -99997, // Insufficient capacity. + U_PERFTYPE_INVALID = -99996, // Perftype invalid. + U_UNSERVICEABLE = -99995, // Service unserviceable. + U_DISK_FAULT = -99994, // Disk fault + U_UWAL_ID_NOT_EXIST = -99993, // The uwal does not exist. + U_UWAL_IO_CORRUPTED = -99992, // The uwal has corrupted permanently and only parts of the data can be obtained. + U_UWAL_UNAVAILABLE = -99991, // The uwal is unavailable or pool is fault. Recovery is not guranteed. + U_UWAL_EXCEED_VALID_RANGE = -99990, // Exceeds the valid range, now only for read operation. + U_UWAL_DISCONTINUOUS = -99989, // Cannot append log because of existing append error. + U_ERROR = -1, // Unknown Error + U_OK = 0, // OK +} UwalRetCode; + +/* ******** + * uwal id: + * uwal is a append-only data store unit, and each uwal has an id which is unique in the entire system. + * ******* */ +#pragma pack(push) +#pragma pack(1) + +#define UWAL_ID_LEN (24) // uwal id len by bytes + +typedef struct { + char id[UWAL_ID_LEN]; +} UwalId; + +#pragma pack(pop) + +/* ******** + * uwal durability, for example: + * 3az 20+16 azCnt = 3, originNum = 20 redundancyNum = 16; + * single az 12+3 azCnt = 1, originNum = 12 redundancyNum = 3; + * single az 3rep azCnt = 1, originNum = 1 redundancyNum = 2; + * 3az rep 3 azCnt = 3, originNum = 1 redundancyNum = 2; + * ******* */ +typedef struct { + // how many AZs for this uwal layout + uint8_t azCnt; + + /* origin data num: + * For X replicas, X replicas are regarded as 1+M, set origin_num as 1. + * For EC N+M, set origin_num as N. */ + uint8_t originNum; + + /* redundancy data num: + * For X replicas, X replicas are regarded as 1+M, set redundancy_num as M. + * For EC N+M set redundancy_num as M. */ + uint8_t redundancyNum; + + uint8_t reliabilityType; /* EC_MODE fill 0, REPLICA_MODE fill 1 */ +} UwalDurability; + +/* ******** + * uwal Performance type: + * currently support SCM and NvmeSSD, user only specifies uwal perf type + * ******* */ +typedef enum { + UWAL_PERF_TYPE_SCM = 1, // Media type scm + UWAL_PERF_TYPE_SSD = 2, // Media type ssd + UWAL_PERF_TYPE_BUTT +} UwalPerfType; + +/* ******** + * affinity policy: + * the user use the policy to tell uwal to create the uwal with affinity. + * ******* */ +typedef struct { + uint32_t partId; // partition id + struct { + uint32_t cnt; // server total num + uint32_t *serverId; // server id array point + } detail; +} UwalAffinityPolicy; + +/* ******** + * uwal user type + * ******* */ +typedef enum { + UWAL_USER_OPENGAUSS = 0, // opengauss io stream + UWAL_USER_PLATFORM = 1, // platform io stream + UWAL_USER_TYPE_BUTT +} UwalUserType; + +/* ******** + * uwal io type + * ******* */ +typedef enum { + UWAL_IO_STRIPE = 1, + UWAL_IO_RANDOM = 2, + UWAL_IO_BUTT +} UwalIoType; + +typedef enum { + UWAL_STRIPE_32B = 0, + UWAL_STRIPE_64B = 1, + UWAL_STRIPE_128B = 2, + UWAL_STRIPE_512B = 3, + UWAL_STRIPE_4KB = 4, + UWAL_STRIPE_8KB = 5, + UWAL_STRIPE_32KB = 6, + UWAL_STRIPE_64KB = 7, + UWAL_STRIPE_128KB = 8, + UWAL_STRIPE_256KB = 9, + UWAL_STRIPE_512KB = 10, + UWAL_STRIPE_1MB = 11, + UWAL_STRIPE_2MB = 12, + UWAL_STRIPE_BUTT +} UwalStripeType; + +/* ******** + * uwal create flag + * ******* */ +typedef enum { + UWAL_CREATE_DEFAULT = 0x0000, /* default create */ + UWAL_CREATE_DEGRADE_LOSS1 = 0x0001, /* degrade loss1 create */ + UWAL_CREATE_DEGRADE_LOSS2 = 0x0002, /* degrade loss2 create */ + UWAL_CREATE_DEGRADE_LOSSANY = 0x0003, /* degrade active1 create */ + UWAL_CREATE_BUTT +} UwalCreateFlag; + +/* ******** + * uwal io route flag + * ******* */ +typedef enum { + UWAL_ROUTE_LOCAL = 0, + UWAL_ROUTE_MASTER = 1, + UWAL_ROUTE_RANDOM = 2, + UWAL_ROUTE_BUTT +} UwalRouteType; + +/* ******** + * uwal descriptor: + * the user must use the descriptor to tell Persistence to create the uwal with + * specified ns/durability/perfType/affinity/io/stripe/metaSize/dataSize. + * ******* */ +typedef struct { + UwalDurability durability; // Data Redundancy Mode + UwalPerfType perfType; // Performace type media type + UwalAffinityPolicy affinity; // affinity attribute + UwalUserType user; // user type + UwalIoType io; // io type + UwalStripeType stripe; // stripe type + uint32_t flags; // create flags + uint64_t startTimeLine; // start time line + uint64_t startWriteOffset; // start write offset + uint64_t dataSize; // log data size(unit bytes), align size is 2MB +} UwalDescriptor; + +/* ******** + * call back + * ******* */ +typedef void (*uwalCbFun)(void *cbCtx, int retCode); + +typedef struct { + uwalCbFun cb; + void *cbCtx; +} UwalCallBack; + +/* ******** + * create uwal input param. + * ******* */ +typedef struct { + const UwalDescriptor *desc; + const uint32_t cnt; + UwalCallBack *cb; +} UwalCreateParam; + +/* ******** + * delete uwal input param. + * ******* */ +typedef struct { + const UwalId *uwalId; + UwalCallBack *cb; +} UwalDeleteParam; + +/* ******** + * data to read param + * ******* */ +typedef struct { + uint64_t offset; // offset + uint64_t length; // len +} UwalDataToRead; + +/* ******** + * read vector. Current only support cnt = 1. + * ******* */ +typedef struct { + uint16_t cnt; // cnt + UwalDataToRead *dataToRead; // data want to read +} UwalDataToReadVec; + +/* ******** + * read uwal input param. + * ******* */ +typedef struct { + const UwalId *uwalId; + const UwalRouteType route; + const UwalDataToReadVec *dataV; + UwalCallBack *cb; +} UwalReadParam; + +/* ******** + * IO data buffer list + * ******* */ +typedef struct { + char *buf; // data buffer + uint64_t len; // buffer length +} UwalBuffer; + +typedef struct { + uint16_t cnt; // cnt + UwalBuffer *buffers; // bufs +} UwalBufferList; + +/* ******** + * append uwal input param. + * ******* */ +typedef struct { + const UwalId *uwalId; + const UwalBufferList *bufferList; + UwalCallBack *cb; +} UwalAppendParam; + +/* ******** + * uwal state: + * ******* */ +typedef enum { + UWAL_STATUS_NORMAL = 1, // normal + UWAL_STATUS_DELETING = 2, + UWAL_STATUS_BUTT +} UwalState; + +/* ******** + * struct of return value for the API to query uwal. + * ******* */ +typedef struct { + uint64_t startTimeLine; + uint64_t startWriteOffset; + uint64_t dataSize; // data size(unit bytes) + uint64_t truncateOffset; // truncate offset(unit bytes) + uint64_t writeOffset; // written offset(unit bytes) +} UwalBaseInfo; + +/* ******** + * uwal has an id and basic info. + * ******* */ +typedef struct { + UwalId id; // id + UwalBaseInfo info; // basic info of the uwal +} UwalInfo; + +/* ******** + * query uwal input param. + * ******* */ +typedef struct { + const UwalId *uwalId; + const UwalRouteType route; + UwalCallBack *cb; +} UwalQueryParam; + +/* ******** + * uwal vector: + * currently, the uwal vector. + * ******* */ +typedef struct { + uint32_t cnt; // uwal cnt + UwalInfo *uwals; // uwals +} UwalVector; + +/* ******** + * uwal config elements. + * ******* */ +typedef struct { + const char *substr; + const char *value; +} UwalCfgElem; + +typedef struct { + uint32_t nodeId; + UwalRetCode ret; +} UwalNodeStatus; + +typedef struct { + uint32_t num; + UwalNodeStatus status[MAX_NODE]; +} UwalNodeInfo; + +typedef int (*UwalInit)(IN const char *path, IN const UwalCfgElem *elems, IN int cnt, IN const char *ulogPath); + +typedef void (*UwalExit)(void); + +typedef int (*UwalCreate)(IN UwalCreateParam *param, OUT UwalVector *uwals); + +typedef int (*UwalDelete)(IN UwalDeleteParam *param); + +typedef int (*UwalAppend)(IN UwalAppendParam *param, OUT uint64_t *offset, OUT void* result); + +typedef int (*UwalRead)(IN UwalReadParam *param, OUT UwalBufferList *bufferList); + +typedef int (*UwalTruncate)(IN const UwalId *uwalId, IN uint64_t offset); + +typedef int (*UwalQuery)(IN UwalQueryParam *param, OUT UwalInfo *info); + +typedef int (*UwalQueryByUser)(IN UwalUserType user, IN UwalRouteType route, OUT UwalVector *uwals); + +typedef int (*UwalSetRewindPoint)(IN UwalId *uwalId, IN uint64_t offset); + +typedef int (*UwalRegisterCertVerifyFunc)(int32_t (*certVerify)(void* certStoreCtx, const char *crlPath), + int32_t (*getKeyPass)(char *keyPassBuff, uint32_t keyPassBuffLen, char *keyPassPath)); + +typedef int (*UwalAppendCopyFree)(IN UwalAppendParam *param, OUT uint64_t *offset, OUT void *result); + +typedef void *(*UwalAllocAppendMem)(IN uint32_t len); + +#define MAX_GROUP_NUM (8) +#define MAX_NODE_NUM (8) +#define NODE_ID_INVALID (0xFFFF) +#define NET_LIST_NUM (4) + +typedef enum { + NET_PROTOCOL_TCP = 0, + NET_PROTOCOL_RDMA = 1, + NET_PROTOCOL_BUTT +} NetProtocol; + +typedef enum { + NODE_STATE_INVALID = 0, + NODE_STATE_UP = 1, + NODE_STATE_DOWN = 2, + NODE_STATE_BUTT +} NodeState; + +typedef struct { + uint32_t ipv4Addr; + uint16_t port; + uint16_t protocol; /* see NetProtocol */ +} NetInfo; + +typedef struct { + uint16_t num; + uint16_t resv; + NetInfo list[NET_LIST_NUM]; +} NetList; + +typedef struct { + uint64_t sessionId; + uint16_t nodeId; + uint16_t state; /* see NodeState */ + uint16_t groupId; /* ANY ID */ + uint16_t groupLevel; /* number of sync standbys that we need to wait for */ + NetList netList; +} NodeStateInfo; + +typedef struct { + uint16_t nodeNum; + uint16_t masterNodeId; + uint16_t localNodeId; + NodeStateInfo nodeList[]; +} NodeStateList; + +typedef void (*FinishCbFun)(void *ctx, int ret); + +typedef int32_t (*UwalNotifyNodeListChange)(NodeStateList *nodeList, FinishCbFun cb, void* ctx); + +typedef uint32_t (*UWAL_Ipv4InetToInt)(char ipv4[16UL]); + +int uwal_init_symbols(); + +int ock_uwal_init(IN const char *path, IN const UwalCfgElem *elems, IN int cnt, IN const char *ulogPath); + +void ock_uwal_exit(void); + +int ock_uwal_create(IN UwalCreateParam *param, OUT UwalVector *uwals); + +int ock_uwal_delete(IN UwalDeleteParam *param); + +int ock_uwal_append(IN UwalAppendParam *param, OUT uint64_t *offset, OUT void* result); + +int ock_uwal_read(IN UwalReadParam *param, OUT UwalBufferList *bufferList); + +int ock_uwal_truncate(IN const UwalId *uwalId, IN uint64_t offset); + +int ock_uwal_query(IN UwalQueryParam *param, OUT UwalInfo *info); + +int ock_uwal_query_by_user(IN UwalUserType user, IN UwalRouteType route, OUT UwalVector *uwals); + +int ock_uwal_set_rewind_point(IN UwalId *uwalId, IN uint64_t offset); + +int ock_uwal_register_cert_verify_func(int32_t (*certVerify)(void* certStoreCtx, const char *crlPath), + int32_t (*getKeyPass)(char *keyPassBuff, uint32_t keyPassBuffLen, char *keyPassPath)); + +int32_t ock_uwal_notify_nodelist_change(NodeStateList *nodeList, FinishCbFun cb, void *ctx); + +uint32_t ock_uwal_ipv4_inet_to_int(char ipv4[16UL]); + +int ock_uwal_append_copy_free(IN UwalAppendParam *param, OUT uint64_t *offset, OUT void *result); + +void *ock_uwal_alloc_append_mem(IN uint32_t len); + +#endif \ No newline at end of file -- Gitee From ad1d9b2990a89935fa83c484ca5a3f90b9913cda Mon Sep 17 00:00:00 2001 From: luqichao Date: Sat, 18 May 2024 16:53:03 +0800 Subject: [PATCH 4/7] add uwal plugin common source --- plugin/uwal/uwal_source.cc | 263 ++++++++++++++++++++++++++++++ plugin/uwal/uwal_source.h | 324 +++++++++++++++++++++++++++++++++++++ 2 files changed, 587 insertions(+) create mode 100644 plugin/uwal/uwal_source.cc create mode 100644 plugin/uwal/uwal_source.h diff --git a/plugin/uwal/uwal_source.cc b/plugin/uwal/uwal_source.cc new file mode 100644 index 000000000..7f6440244 --- /dev/null +++ b/plugin/uwal/uwal_source.cc @@ -0,0 +1,263 @@ +#include +#include + +#include "my_byteorder.h" +#include "my_compiler.h" +#include "my_systime.h" +#include "sql/mysqld.h" +#include "plugin/uwal/uwal_source.h" + +bool uwal_enabled = false; +unsigned long uwal_disk_size = 0; +unsigned int uwal_id = 0; +unsigned int uwal_port = 9991; +char *uwal_ip = nullptr; +char *uwal_protocol = nullptr; +char *uwal_devices_path = nullptr; +char *uwal_log_path = nullptr; + +unsigned int uwal_buffer_aligned_size = 262144; +bool uwal_sync_append = false; + +unsigned int uwal_replica_id = 1; +char *uwal_replica_ip = nullptr; +char *uwal_replica_protocol = nullptr; + +unsigned int uwal_master_id = 0; +char *uwal_master_ip = nullptr; +char *uwal_master_protocol = nullptr; + +void UwalObject::UwalValueInit() +{ + LogErr(INFORMATION_LEVEL, ER_UWAL_PATH, uwal_devices_path); + LogErr(INFORMATION_LEVEL, ER_UWAL_LOG_PATH, uwal_log_path); + LogErr(INFORMATION_LEVEL, ER_UWAL_DISK_SIZE, uwal_disk_size); + LogErr(INFORMATION_LEVEL, ER_UWAL_ID, uwal_id); + LogErr(INFORMATION_LEVEL, ER_UWAL_IP, uwal_ip); + LogErr(INFORMATION_LEVEL, ER_UWAL_PORT, uwal_port); + LogErr(INFORMATION_LEVEL, ER_UWAL_PROTOCOL, uwal_protocol); + + SetEnabled(uwal_enabled); + SetDiskSize(uwal_disk_size); + SetNodeId(uwal_id); + SetListenPort(uwal_port); + SetNodeIP(uwal_ip); + SetProtocol(uwal_protocol); + SetDevicesPath(uwal_devices_path); + SetLogPath(uwal_log_path); + + enableSyncAppend = uwal_sync_append; + alignedSize = uwal_buffer_aligned_size; +} + +int UwalObject::UwalInit() +{ + UwalValueInit(); + int index = 0; + UwalInitConfig("ock.uwal.ip", node_ip, index++); + + char *listen_port_str = ConvertIntegerToString(listen_port, UWAL_PORT_LEN); + UwalInitConfig("ock.uwal.port", listen_port_str, index++); + UwalInitConfigProtocol(index++); + UwalInitConfig("ock.uwal.disk.poolid", UWAL_DISK_POOLID, index++); + + char *disk_size_str = ConvertIntegerToString(disk_size, UWAL_INT_SIZE_LEN); + UwalInitConfig("ock.uwal.disk.size", disk_size_str, index++); + UwalInitConfig("ock.uwal.disk.min.block", "2147483648", index++); + UwalInitConfig("ock.uwal.disk.max.block", "2147483648", index++); + + UwalInitConfig("ock.uwal.devices.path", devices_path, index++); + if (enableSyncAppend) { + UwalInitConfig("ock.uwal.rpc.worker.thread.num", "1", index++); + } else { + UwalInitConfig("ock.uwal.rpc.worker.thread.num", "4", index++); + } + + UwalInitConfig("ock.uwal.rpc.timeout", "30000", index++); + if (protocol == NET_PROTOCOL_RDMA) { + UwalInitConfig("ock.uwal.rpc.rndv.switch", "true", index++); + } + + UwalInitConfig("ock.uwal.rpc.compression.switch", "false", index++); + UwalInitConfig("ock.uwal.rpc.flowcontrol.switch", "false", index++); + UwalInitConfig("ock.uwal.rpc.flowcontrol.value", "128", index++); + UwalInitConfig("ock.uwal.devices.split.switch", "true", index++); + UwalInitConfig("ock.uwal.devices.split.size", "2147483648", index++); + UwalInitConfig("ock.uwal.devices.split.path", devices_path, index++); + UwalInitConfig("ock.uwal.devices.split.mysql.switch", "true", index++); + UwalInitConfig("ock.uwal.devices.split.mysql.prefix", "mysql-bin", index++); + + int ret = ock_uwal_init(NULL, elem, index, log_path); + + return ret; +} + +int UwalObject::UwalWrite(UwalId *id, int nBytes, char *buf, UwalNodeInfo *infos) +{ + UwalBuffer uBuff; + UwalBufferList buffList; + uint64_t offset; + UwalAppendParam params; + + uBuff.buf = buf; + uBuff.len = nBytes; + buffList.buffers = &uBuff; + buffList.cnt = 1; + + params.bufferList = &buffList; + params.uwalId = id; + params.cb = NULL; + + int ret = ock_uwal_append(¶ms, &offset, infos); + return ret; +} + +int UwalObject::UwalWriteCopyFree(UwalId *id, int nBytes, char *buf, UwalNodeInfo *infos) +{ + UwalBuffer uBuff; + UwalBufferList buffList; + uint64_t offset; + UwalAppendParam params; + + uBuff.buf = buf; + uBuff.len = nBytes; + buffList.buffers = &uBuff; + buffList.cnt = 1; + + params.bufferList = &buffList; + params.uwalId = id; + params.cb = NULL; + + int ret = ock_uwal_append_copy_free(¶ms, &offset, infos); + return ret; +} + +int UwalObject::UwalWriteAsyncCopyFree(UwalId *id, UwalCallBack *uwalCB, UwalNodeInfo *infos) +{ + UwalBuffer buffers[1] = {{uwalBuffer, bufferOffset}}; + UwalBufferList bufferList = {1, buffers}; + UwalAppendParam appendParam = {id, &bufferList, uwalCB}; + int ret = ock_uwal_append_copy_free(&appendParam, &(((UwalAsyncCbCtx *)uwalCB->cbCtx)->outOffset), infos); + return ret; +} + +int UwalObject::UwalCreate(uint64_t startOffset, uint64_t timeLine, uint64_t uwalSplitSize) +{ + if (hasActiveUwal) { + LogErr(ERROR_LEVEL, ER_UWAL_STILL_IN_USE); + return -1; + } + UwalDurability dura; + dura.azCnt = 1; + dura.originNum = 1; + dura.redundancyNum = 0; + dura.reliabilityType = 1; + + UwalAffinityPolicy affinity; + affinity.partId = 0; + affinity.detail.cnt = 0; + affinity.detail.serverId = NULL; + + UwalDescriptor desc; + desc.user = UWAL_USER_OPENGAUSS; + desc.perfType = UWAL_PERF_TYPE_SSD; + desc.stripe = UWAL_STRIPE_BUTT; + desc.io = UWAL_IO_RANDOM; + desc.dataSize = uwalSplitSize; + desc.startTimeLine = timeLine; + desc.startWriteOffset = startOffset; + desc.durability = dura; + desc.flags = UWAL_CREATE_DEGRADE_LOSSANY; + desc.affinity = affinity; + + UwalCreateParam params = {&desc, 1, NULL}; + + UwalVector *vec = (UwalVector *)malloc(sizeof(UwalVector)); + if (vec == NULL) { + return -1; + } + UwalInfo *uwalInfos = (UwalInfo *)malloc(sizeof(UwalInfo)); + if (uwalInfos == NULL) { + free(vec); + return -1; + } + vec->uwals = uwalInfos; + + int ret; + ret = ock_uwal_create(¶ms, vec); + if (ret == 0 && vec->cnt > 0) { + activeUwalId = (UwalId *)malloc(sizeof(UwalId)); + memcpy(activeUwalId, &vec->uwals->id.id, UWAL_ID_LEN); + hasActiveUwal = true; + } + + free(uwalInfos); + free(vec); + return ret; +} + +int UwalObject::DeleteUselessUwal(UwalId *id) +{ + UwalDeleteParam param; + param.uwalId = id; + param.cb = nullptr; + return ock_uwal_delete(¶m); +} + +void UwalObject::GetLocalStateInfo(NodeStateInfo *nodeStateInfo) +{ + nodeStateInfo->nodeId = GetNodeId(); + nodeStateInfo->state = NODE_STATE_UP; + nodeStateInfo->groupId = 0; + nodeStateInfo->groupLevel = 0; + + NetInfo netInfo; + netInfo.ipv4Addr = ock_uwal_ipv4_inet_to_int(node_ip); + netInfo.port = GetListenPort(); + netInfo.protocol = GetProtocol(); + + NetList netList; + netList.num = 1; + netList.list[0] = netInfo; + nodeStateInfo->netList = netList; +} + +void UwalNotifyCallback(void *ctx, int ret) +{ + CBParams *cbParams = (CBParams *)ctx; + pthread_mutex_lock(&cbParams->mutex); + cbParams->cbResult = true; + cbParams->ret = ret; + pthread_mutex_unlock(&cbParams->mutex); + pthread_cond_signal(&cbParams->cond); +} + +int UwalSyncNotify(NodeStateList *nodeList) +{ + CBParams cbParams = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, false, 0}; + pthread_mutex_lock(&cbParams.mutex); + int ret = ock_uwal_notify_nodelist_change(nodeList, UwalNotifyCallback, (void *)&cbParams); + if (ret != 0) { + pthread_mutex_unlock(&cbParams.mutex); + return ret; + } + uint16 count = 0; + while (!cbParams.cbResult) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 3; + ts.tv_nsec = 0; + int condWaitRet = 0; + condWaitRet = pthread_cond_timedwait(&cbParams.cond, &cbParams.mutex, &ts); + if (condWaitRet == ETIMEDOUT) { + ++count; + LogErr(SYSTEM_LEVEL, ER_UWAL_NOTIFY_WAIT, count); + } + } + pthread_mutex_unlock(&cbParams.mutex); + bool success = (cbParams.ret == 0); + if (success) { + LogErr(SYSTEM_LEVEL, ER_UWAL_NOTIFY_SUCCESS); + } + return success ? 0 : -1; +} \ No newline at end of file diff --git a/plugin/uwal/uwal_source.h b/plugin/uwal/uwal_source.h new file mode 100644 index 000000000..e8a1bb74c --- /dev/null +++ b/plugin/uwal/uwal_source.h @@ -0,0 +1,324 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2022-2024. All rights reserved. + */ + +#ifndef __UWAL_SOURCE_H__ +#define __UWAL_SOURCE_H__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "mysql/plugin.h" +#include "sql/derror.h" // ER_THD +#include "mysqld_error.h" +#include "plugin/uwal/uwal_adaptor.h" + +#define UWAL_ELEM_NUM 100 + +constexpr uint32_t PAGE_ALIGN = 4096; +constexpr uint32_t UWAL_MEM_SEG_COUNT = 256; +constexpr uint32_t UWAL_MEM_SEG_SIZE = 2097152 + PAGE_ALIGN; +constexpr uint64_t UWAL_SIZE_1KB = 1024; +constexpr uint64_t UWAL_SIZE_1MB = 1024 * UWAL_SIZE_1KB; +constexpr uint64_t UWAL_SIZE_1GB = 1024 * UWAL_SIZE_1MB; +constexpr uint64_t UWAL_SIZE_2GB = 2 * UWAL_SIZE_1GB; + +extern bool uwal_enabled; +extern unsigned long uwal_disk_size; +extern unsigned int uwal_id; +extern unsigned int uwal_port; +extern char *uwal_ip; +extern char *uwal_protocol; +extern char *uwal_devices_path; +extern char *uwal_log_path; + +extern unsigned int uwal_buffer_aligned_size; +extern bool uwal_sync_append; + +extern unsigned int uwal_replica_id; +extern char *uwal_replica_ip; +extern char *uwal_replica_protocol; + +extern unsigned int uwal_master_id; +extern char *uwal_master_ip; +extern char *uwal_master_protocol; + +typedef struct UwalAsyncCbCtx { + uintptr_t cacheAddress; + UwalNodeInfo *info; + uint64_t outOffset; +} UwalAsyncCbCtx; + +typedef struct UwalObjNodeInfo { + uint32_t node_id; + char node_ip[UWAL_IP_LEN]; + NetProtocol protocol; +} UwalObjNodeInfo; + +typedef struct CBParams { + pthread_mutex_t mutex; + pthread_cond_t cond; + bool cbResult; + int ret; +} CBParams; + +class UwalSpinLock { +public: + UwalSpinLock() = default; + ~UwalSpinLock() = default; + + UwalSpinLock(const UwalSpinLock &) = delete; + UwalSpinLock &operator = (const UwalSpinLock &) = delete; + UwalSpinLock(UwalSpinLock &&) = delete; + UwalSpinLock &operator = (UwalSpinLock &&) = delete; + + inline void TryLock() + { + mFlag.test_and_set(std::memory_order_acquire); + } + + inline void Lock() + { + while (mFlag.test_and_set(std::memory_order_acquire)) { + } + } + + inline void Unlock() + { + mFlag.clear(std::memory_order_release); + } + +private: + std::atomic_flag mFlag = ATOMIC_FLAG_INIT; +}; + +class UwalObject { +private: + bool enabled = false; + uint64_t disk_size = 0; + uint32_t node_id = 0; + uint32_t listen_port = 0; + char *devices_path = nullptr; + char *log_path = nullptr; + NetProtocol protocol = NET_PROTOCOL_TCP; + UwalCfgElem elem[UWAL_ELEM_NUM]; + UwalBaseInfo info = {0}; + + bool masterNode = false; +public: + UwalObjNodeInfo replica_infos[MAX_NODE]; + UwalObjNodeInfo master_info; + char node_ip[UWAL_IP_LEN]; + std::set registered_replica; + uint32_t replica_num; + bool enableSyncAppend; + + UwalSpinLock appendFlushLock; + uint64_t appendFlushIndex; + + UwalSpinLock appendSyncLock; + uint64_t appendSyncIndex; + + UwalSpinLock syncPointLock; + uint64_t appendSyncPoint; + + char *uwalBuffer; + bool hasBuffer; + uint32_t bufferOffset; + uint32_t alignedSize; + + bool hasActiveUwal; + UwalSpinLock truncateListLock; + std::list readyTruncateList; + std::list pendingDeleteList; + UwalId *activeUwalId; + +public: + UwalObject() + { + alignedSize = 262144; + for (int i = 0; i < MAX_NODE; i++) { + replica_infos[i].node_id = NODE_ID_INVALID; + replica_infos[i].protocol = NET_PROTOCOL_TCP; + } + enableSyncAppend = false; + + appendFlushIndex = 0; + appendSyncIndex = 0; + appendSyncPoint = 0; + uwalBuffer = nullptr; + bufferOffset = 0; + hasBuffer = false; + replica_num = 0; + + hasActiveUwal = false; + activeUwalId = nullptr; + } + + int UwalInit(); + + int UwalWrite(UwalId *id, int nBytes, char *buf, UwalNodeInfo *infos); + + int UwalWriteCopyFree(UwalId *id, int nBytes, char *buf, UwalNodeInfo *infos); + + int UwalWriteAsyncCopyFree(UwalId *id, UwalCallBack *uwalCB, UwalNodeInfo *infos); + + int UwalCreate(uint64_t startOffset, uint64_t timeLine, uint64_t uwalSplitSize); + + int DeleteUselessUwal(UwalId *id); + + inline void RefreshBuffer(bool force) + { + if (force || !hasBuffer) { + uwalBuffer = (char *)ock_uwal_alloc_append_mem(alignedSize); + hasBuffer = true; + bufferOffset = 0; + return; + } + } + + void SetMasterNode(bool value) + { + masterNode = value; + } + + void SetEnabled(bool value) + { + enabled = value; + } + + void SetDiskSize(uint64_t value) + { + disk_size = value; + } + + void SetNodeId(uint32_t value) + { + node_id = value; + } + + uint32_t GetNodeId() + { + return node_id; + } + + void SetListenPort(uint32_t value) + { + listen_port = value; + } + + uint32_t GetListenPort() + { + return listen_port; + } + + void SetNodeIP(char *value) + { + memcpy(node_ip, value, strlen(value) + 1); + } + + void SetDevicesPath(char *value) + { + devices_path = value; + } + + void SetLogPath(char *value) + { + log_path = value; + } + + void SetProtocol(char *value) + { + if (!strcasecmp(UWAL_PROTOCOL_RDMA, value)) { + protocol = NET_PROTOCOL_RDMA; + } + } + + NetProtocol GetProtocol() + { + return protocol; + } + + void SetReplicaInfo() + { + if (uwal_replica_ip == nullptr) { + return; + } + replica_infos[uwal_replica_id].node_id = uwal_replica_id; + memcpy(replica_infos[uwal_replica_id].node_ip, uwal_replica_ip, strlen(uwal_replica_ip) + 1); + if (!strcasecmp(UWAL_PROTOCOL_RDMA, uwal_replica_protocol)) { + replica_infos[uwal_replica_id].protocol = NET_PROTOCOL_RDMA; + } + } + + void SetMasterInfo() + { + if (uwal_master_ip == nullptr) { + return; + } + master_info.node_id = uwal_master_id; + memcpy(master_info.node_ip, uwal_master_ip, strlen(uwal_master_ip) + 1); + if (!strcasecmp(UWAL_PROTOCOL_RDMA, uwal_master_protocol)) { + master_info.protocol = NET_PROTOCOL_RDMA; + } + } + + void UwalInitConfig(char *name, char *value, int index) + { + elem[index].substr = name; + elem[index].value = value; + } + + char *ConvertIntegerToString(int value, int length) + { + char *buffer = new char[length]; + sprintf(buffer, "%d\0", value); + return buffer; + } + + char *ConvertIntegerToString(uint32_t value, int length) + { + char *buffer = new char[length]; + sprintf(buffer, "%lu\0", value); + return buffer; + } + + char *ConvertIntegerToString(uint64_t value, int length) + { + char *buffer = new char[length]; + sprintf(buffer, "%llu\0", value); + return buffer; + } + + void UwalInitConfigProtocol(int index) + { + switch(protocol) { + case NET_PROTOCOL_TCP: + UwalInitConfig("ock.uwal.protocol", UWAL_PROTOCOL_TCP, index); + break; + case NET_PROTOCOL_RDMA: + UwalInitConfig("ock.uwal.protocol", UWAL_PROTOCOL_RDMA, index); + break; + default: + break; + } + } + + void UwalValueInit(); + + void GetLocalStateInfo(NodeStateInfo *nodeStateInfo); +}; + +void UwalNotifyCallback(void *ctx, int ret); + +int UwalSyncNotify(NodeStateList *nodeList); + +#endif \ No newline at end of file -- Gitee From 3ee1e6a044153f01041bf81dfe365a26ace81e87 Mon Sep 17 00:00:00 2001 From: luqichao Date: Sat, 18 May 2024 19:16:54 +0800 Subject: [PATCH 5/7] add uwal master --- plugin/uwal/uwal_source_plugin.cc | 547 ++++++++++++++++++++++++++++++ 1 file changed, 547 insertions(+) create mode 100644 plugin/uwal/uwal_source_plugin.cc diff --git a/plugin/uwal/uwal_source_plugin.cc b/plugin/uwal/uwal_source_plugin.cc new file mode 100644 index 000000000..13656de42 --- /dev/null +++ b/plugin/uwal/uwal_source_plugin.cc @@ -0,0 +1,547 @@ +#include +#include + +#include "my_inttypes.h" +#include "my_macros.h" +#include "my_psi_config.h" +#include "mysql/psi/mysql_memory.h" +#include "mysql/psi/mysql_stage.h" +#include "sql/current_thd.h" +#include "sql/protocol_classic.h" +#include "sql/raii/sentry.h" +#include "sql/sql_class.h" +#include "sql/sql_lex.h" +#include "typelib.h" +#include "plugin/uwal/uwal_source.h" +#include "sql/replication.h" +#include "sql/log_event.h" + +#define PLUGIN_AUTHOR_HUAWEI "Huawei Technologies Co., Ltd." +#define UWAL_PLUGIN_NAME "uwal" + +static SERVICE_TYPE(registry) *reg_srv = nullptr; +SERVICE_TYPE(log_builtins) *log_bi = nullptr; +SERVICE_TYPE(log_builtins_string) *log_bs = nullptr; + +UwalObject *uwalObj = nullptr; +pthread_t uwalBackendThread; +bool stopBackend = false; + +static MYSQL_SYSVAR_BOOL( + enabled, uwal_enabled, PLUGIN_VAR_OPCMDARG, + "Enable uwal (disabled by default).", + nullptr, // check + nullptr, // update + 0); + +static MYSQL_SYSVAR_ULONG( + disk_size, uwal_disk_size, PLUGIN_VAR_OPCMDARG, + "Maximum disk space occupied by uwal.", + nullptr, // check + nullptr, // update + UWAL_MIN_DISK_SIZE, // default value + UWAL_MIN_DISK_SIZE, // minimum value + UWAL_MAX_DISK_SIZE, // maximum value + UWAL_BLOCK_SIZE); + +static MYSQL_SYSVAR_STR( + devices_path, uwal_devices_path, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "Storage path for uwal.", + nullptr, // check + nullptr, // update + "./"); + +static MYSQL_SYSVAR_STR( + log_path, uwal_log_path, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "Log path for uwal.", + nullptr, // check + nullptr, // update + "./"); + +static MYSQL_SYSVAR_STR( + ip, uwal_ip, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "IP for uwal.", + nullptr, // check + nullptr, // update + "127.0.0.1"); + +static MYSQL_SYSVAR_UINT( + id, uwal_id, PLUGIN_VAR_OPCMDARG, + "Uwal node id.", + nullptr, // check + nullptr, // update + 0, // default value + 0, // minimum value + 7, // maximum value + 0); + +static MYSQL_SYSVAR_UINT( + port, uwal_port, PLUGIN_VAR_OPCMDARG, + "Listen port for uwal RPC.", + nullptr, // check + nullptr, // update + 9991, // default value + 1024, // minimum value + 65535, // maximum value + 0); + +static MYSQL_SYSVAR_STR( + protocol, uwal_protocol, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "Protocol used by uwal.", + nullptr, // check + nullptr, // update + "tcp"); + +static MYSQL_SYSVAR_UINT( + buffer_aligned_size, uwal_buffer_aligned_size, PLUGIN_VAR_OPCMDARG, + "Uwal buffer aligned size.", + nullptr, // check + nullptr, // update + 262144, // default value + 4096, // minimum value + 2097152, // maximum value + 4096); + +static MYSQL_SYSVAR_BOOL( + sync_append, uwal_sync_append, PLUGIN_VAR_OPCMDARG, + "Enable uwal sync append (disabled by default).", + nullptr, // check + nullptr, // update + 0); + +static MYSQL_SYSVAR_UINT( + replica_id, uwal_replica_id, PLUGIN_VAR_OPCMDARG, + "Uwal remote node id.", + nullptr, // check + nullptr, // update + 1, // default value + 0, // minimum value + 7, // maximum value + 0); + +static MYSQL_SYSVAR_STR( + replica_ip, uwal_replica_ip, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "IP for remote uwal node.", + nullptr, // check + nullptr, // update + "127.0.0.1"); + +static MYSQL_SYSVAR_STR( + replica_protocol, uwal_replica_protocol, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "Protocol used by replica uwal.", + nullptr, // check + nullptr, // update + "tcp"); + + +static SYS_VAR *uwal_master_system_vars[] = { + MYSQL_SYSVAR(enabled), + MYSQL_SYSVAR(disk_size), + MYSQL_SYSVAR(devices_path), + MYSQL_SYSVAR(log_path), + MYSQL_SYSVAR(ip), + MYSQL_SYSVAR(id), + MYSQL_SYSVAR(port), + MYSQL_SYSVAR(protocol), + MYSQL_SYSVAR(buffer_aligned_size), + MYSQL_SYSVAR(sync_append), + MYSQL_SYSVAR(replica_id), + MYSQL_SYSVAR(replica_ip), + MYSQL_SYSVAR(replica_protocol), + nullptr, +}; + +int UwalPrimaryInitNotify() +{ + NodeStateInfo primaryStateInfo; + uwalObj->GetLocalStateInfo(&primaryStateInfo); + + NodeStateList *nodeList = (NodeStateList *)malloc(sizeof(NodeStateList) + sizeof(NodeStateInfo)); + nodeList->localNodeId = uwalObj->GetNodeId(); + nodeList->masterNodeId = uwalObj->GetNodeId(); + nodeList->nodeNum = 1; + nodeList->nodeList[0] = primaryStateInfo; + + int ret = UwalSyncNotify(nodeList); + free(nodeList); + return ret; +} + +static int uwal_binlog_after_open(const char *binlog_name) +{ + if (uwalObj->hasActiveUwal) { + // move curUwal to truncate list + uwalObj->truncateListLock.Lock(); + uwalObj->readyTruncateList.emplace_back(uwalObj->activeUwalId); + uwalObj->truncateListLock.Unlock(); + uwalObj->activeUwalId = nullptr; + uwalObj->hasActiveUwal = false; + } + uwalObj->truncateListLock.Lock(); + if (uwalObj->pendingDeleteList.size() > 3) { + while (!uwalObj->pendingDeleteList.empty()) { + UwalId *id = uwalObj->pendingDeleteList.front(); + uwalObj->DeleteUselessUwal(id); + uwalObj->pendingDeleteList.pop_front(); + free(id); + } + } + uwalObj->truncateListLock.Unlock(); + char *name = (char *)binlog_name; + char *extPos = strrchr(name, FN_EXTCHAR); + uint32_t logIndex = strtoul(extPos + 1, NULL, 10); + + int ret = uwalObj->UwalCreate(0, logIndex, UWAL_SIZE_2GB); + return ret; +} + +static int uwal_write_sync() +{ + UwalNodeInfo *infos = (UwalNodeInfo *)malloc(sizeof(UwalNodeInfo) + MAX_NODE * sizeof(UwalNodeStatus)); + + int ret = uwalObj->UwalWriteCopyFree(uwalObj->activeUwalId, uwalObj->bufferOffset, uwalObj->uwalBuffer, infos); + if (ret != 0) { + LogPluginErr(WARNING_LEVEL, ER_UWAL_SYNC_WRITE_FAILED, ret); + } + uwalObj->RefreshBuffer(true); + free(infos); + return ret; +} + +static int uwal_binlog_write_sync(const unsigned char *buffer, my_off_t length) +{ + int ret = 0; + uwalObj->RefreshBuffer(false); + + if (uwalObj->bufferOffset + length < uwalObj->alignedSize) { + memcpy(uwalObj->uwalBuffer + uwalObj->bufferOffset, buffer, length); + uwalObj->bufferOffset += length; + return 0; + } + + // copy align + uint64_t tmpOffset = uwalObj->alignedSize - uwalObj->bufferOffset; + memcpy(uwalObj->uwalBuffer + uwalObj->bufferOffset, buffer, tmpOffset); + uwalObj->bufferOffset += tmpOffset; + + ret = uwal_write_sync(); + + while ((length - tmpOffset) >= uwalObj->alignedSize) { + memcpy(uwalObj->uwalBuffer, buffer + tmpOffset, uwalObj->alignedSize); + uwalObj->bufferOffset = uwalObj->alignedSize; + tmpOffset += uwalObj->alignedSize; + ret = uwal_write_sync(); + } + if (length - tmpOffset > 0) { + memcpy(uwalObj->uwalBuffer, buffer + tmpOffset, length - tmpOffset); + uwalObj->bufferOffset = length - tmpOffset; + } + + return ret; +} + +void UwalWriteAsyncCallBack(void *cbCtx, int retCode) +{ + UwalAsyncCbCtx *curCbCtx = (UwalAsyncCbCtx *)cbCtx; + uwalObj->appendSyncLock.Lock(); + uwalObj->appendSyncIndex += 1; + uwalObj->appendSyncLock.Unlock(); + + free(curCbCtx->info); + free(curCbCtx); +} + +static int uwal_write_async() +{ + UwalAsyncCbCtx *curCbCtx = (UwalAsyncCbCtx *)malloc(sizeof(UwalAsyncCbCtx)); + UwalNodeInfo *infos = (UwalNodeInfo *)malloc(sizeof(UwalNodeInfo) + MAX_NODE * sizeof(UwalNodeStatus)); + curCbCtx->info = infos; + UwalCallBack uwalCB = {UwalWriteAsyncCallBack, curCbCtx}; + + uwalObj->appendFlushIndex += 1; + int ret = uwalObj->UwalWriteAsyncCopyFree(uwalObj->activeUwalId, &uwalCB, infos); + if (ret != 0) { + LogPluginErr(WARNING_LEVEL, ER_UWAL_SYNC_WRITE_FAILED, ret); + uwalObj->appendFlushIndex -= 1; + } + uwalObj->RefreshBuffer(true); + return ret; +} + +static int uwal_binlog_write_async(const unsigned char *buffer, my_off_t length) +{ + int ret = 0; + uwalObj->RefreshBuffer(false); + + if (uwalObj->bufferOffset + length < uwalObj->alignedSize) { + memcpy(uwalObj->uwalBuffer + uwalObj->bufferOffset, buffer, length); + uwalObj->bufferOffset += length; + return 0; + } + + // copy align + uint64_t tmpOffset = uwalObj->alignedSize - uwalObj->bufferOffset; + memcpy(uwalObj->uwalBuffer + uwalObj->bufferOffset, buffer, tmpOffset); + uwalObj->bufferOffset += tmpOffset; + ret = uwal_write_async(); + + while ((length - tmpOffset) >= uwalObj->alignedSize) { + memcpy(uwalObj->uwalBuffer, buffer + tmpOffset, uwalObj->alignedSize); + uwalObj->bufferOffset = uwalObj->alignedSize; + tmpOffset += uwalObj->alignedSize; + ret = uwal_write_async(); + } + if (length - tmpOffset > 0) { + memcpy(uwalObj->uwalBuffer, buffer + tmpOffset, length - tmpOffset); + uwalObj->bufferOffset = length - tmpOffset; + } + + return ret; +} + +static int uwal_binlog_write(const unsigned char *buffer, my_off_t length, my_off_t position) +{ + if (!uwalObj->hasActiveUwal) { + LogPluginErr(WARNING_LEVEL, ER_UWAL_NOT_SUPPORT); + return 0; + } + + if (uwalObj->enableSyncAppend) { + return uwal_binlog_write_sync(buffer, length); + } + + return uwal_binlog_write_async(buffer, length); +} + +static int uwal_binlog_flush_sync() +{ + int ret = 0; + if (uwalObj->hasBuffer && uwalObj->bufferOffset > 0) { + ret = uwal_write_sync(); + } + + return ret; +} + +static int uwal_binlog_flush_async() +{ + int ret = 0; + if (uwalObj->hasBuffer && uwalObj->bufferOffset > 0) { + ret = uwal_write_async(); + } + + // update sync point + uwalObj->syncPointLock.Lock(); + uwalObj->appendSyncPoint = uwalObj->appendFlushIndex; + uwalObj->syncPointLock.Unlock(); + + return ret; +} + +static int uwal_binlog_flush() +{ + if (!uwalObj->hasActiveUwal) { + // enable uwal in RUN-time, IO cache contains unflushed buffer + LogPluginErr(WARNING_LEVEL, ER_UWAL_NOT_SUPPORT); + return 0; + } + + if (uwalObj->enableSyncAppend) { + return uwal_binlog_flush_sync(); + } + + return uwal_binlog_flush_async(); +} + +static int uwal_binlog_after_register(uint32 server_id) +{ + LogPluginErr(SYSTEM_LEVEL, ER_UWAL_MASTER_START_NOTIFY, server_id - 1); + auto iter = uwalObj->registered_replica.find(server_id - 1); + if (iter != uwalObj->registered_replica.end()) { + LogPluginErr(INFORMATION_LEVEL, ER_UWAL_MASTER_NOTIFY_REPEATEDLY); + return 0; + } + uwalObj->registered_replica.insert(server_id - 1); + NodeStateList *nodeList = (NodeStateList *)malloc( + sizeof(NodeStateList) + sizeof(NodeStateInfo) * (uwalObj->registered_replica.size() + 1)); + nodeList->localNodeId = uwalObj->GetNodeId(); + nodeList->masterNodeId = uwalObj->GetNodeId(); + nodeList->nodeNum = uwalObj->registered_replica.size() + 1; + + int count = 0; + for (const auto& replicaNodeId : uwalObj->registered_replica) { + nodeList->nodeList[count].groupId = 1; + nodeList->nodeList[count].groupLevel = 1; + nodeList->nodeList[count].nodeId = replicaNodeId; + nodeList->nodeList[count].state = NODE_STATE_UP; + nodeList->nodeList[count].netList.num = 1; + nodeList->nodeList[count].netList.list[0].ipv4Addr = + ock_uwal_ipv4_inet_to_int(uwalObj->replica_infos[replicaNodeId].node_ip); + nodeList->nodeList[count].netList.list[0].protocol = NET_PROTOCOL_TCP; + if (uwalObj->replica_infos[replicaNodeId].protocol == NET_PROTOCOL_RDMA + && uwalObj->GetProtocol() == NET_PROTOCOL_RDMA) { + nodeList->nodeList[count].netList.list[0].protocol = NET_PROTOCOL_RDMA; + } + nodeList->nodeList[count].netList.list[0].port = uwalObj->GetListenPort(); + count++; + } + NodeStateInfo primaryStateInfo; + uwalObj->GetLocalStateInfo(&primaryStateInfo); + nodeList->nodeList[count] = primaryStateInfo; + int ret = UwalSyncNotify(nodeList); + uwalObj->replica_num = count; + free(nodeList); + return ret; +} + +static int uwal_binlog_after_sync() +{ + // check uwal status here + + if (uwalObj->enableSyncAppend) { + return 0; + } + + // get current sync point + uwalObj->syncPointLock.Lock(); + uint64_t tmpSyncPoint = uwalObj->appendSyncPoint; + uwalObj->syncPointLock.Unlock(); + + // wait for sync + uwalObj->appendSyncLock.Lock(); + uint64_t tmpSyncIndex = uwalObj->appendSyncIndex; + uwalObj->appendSyncLock.Unlock(); + // overflow? + + while (tmpSyncIndex < tmpSyncPoint) { + usleep(50); + uwalObj->appendSyncLock.Lock(); + tmpSyncIndex = uwalObj->appendSyncIndex; + uwalObj->appendSyncLock.Unlock(); + } + + return 0; +} + +void *UwalBackendThread(void *param) +{ + int count = 1; + bool hit = false; + while (!stopBackend) { + if (hit) { + count = count % 10 + 1; + hit = false; + } + + uwalObj->truncateListLock.Lock(); + if (uwalObj->readyTruncateList.size() > count) { + hit = true; + } + while (uwalObj->readyTruncateList.size() > count) { + uwalObj->pendingDeleteList.emplace_back(uwalObj->readyTruncateList.front()); + uwalObj->readyTruncateList.pop_front(); + } + uwalObj->truncateListLock.Unlock(); + sleep(1); + } +} + +Binlog_uwal_observer uwal_observer = { + sizeof(Binlog_uwal_observer), // len + + uwal_binlog_write, // before_flush + uwal_binlog_flush, // before_sync + uwal_binlog_after_register, // after_register + uwal_binlog_after_sync, + uwal_binlog_after_open, +}; + +static int uwal_master_plugin_init(void *p) +{ + // Initialize error logging service. + if (init_logging_service_for_plugin(®_srv, &log_bi, &log_bs)) return 1; + + uwalObj = new UwalObject(); + + // dlopen uwal here + int ret = uwal_init_symbols(); + if (ret != 0) { + return 1; + } + + ret = uwalObj->UwalInit(); + if (ret != 0) { + return 1; + } + uwalObj->SetMasterNode(true); + uwalObj->SetReplicaInfo(); + + // notify + ret = UwalPrimaryInitNotify(); + if (ret != 0) { + LogPluginErr(ERROR_LEVEL, ER_UWAL_PRIMARY_NOTIFY_FAILED, ret); + return 1; + } + + // register observer + if (register_binlog_uwal_observer(&uwal_observer, p)) { + return 1; + } + + // start backend thread + if (pthread_create(&uwalBackendThread, NULL, UwalBackendThread, NULL) != 0) { + return 1; + } + pthread_setname_np(uwalBackendThread, "uwalbak"); + + return 0; +} + +static int uwal_master_plugin_deinit(void *p) +{ + // the plugin was not initialized, thre is nothing to do here + delete uwalObj; + + // unregister observer + if (unregister_binlog_uwal_observer(&uwal_observer, p)) { + return 1; + } + + deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs); + return 0; +} + +static int uwal_source_plugin_check_uninstall(void *) +{ + return 0; +} + +struct Mysql_replication uwal_master_plugin = { + MYSQL_REPLICATION_INTERFACE_VERSION}; + +/* + Plugin library descriptor +*/ + +mysql_declare_plugin(uwal_master){ + MYSQL_REPLICATION_PLUGIN, + &uwal_master_plugin, + UWAL_PLUGIN_NAME, + PLUGIN_AUTHOR_HUAWEI, + "Source-side uwal replication.", + PLUGIN_LICENSE_GPL, + uwal_master_plugin_init, /* Plugin Init */ + uwal_source_plugin_check_uninstall, /* Plugin Check uninstall */ + uwal_master_plugin_deinit, /* Plugin Deinit */ + 0x0100 /* 1.0 */, + nullptr, /* status variables */ + uwal_master_system_vars, /* system variables */ + nullptr, /* config options */ + 0, /* flags */ +} mysql_declare_plugin_end; \ No newline at end of file -- Gitee From be02011e60dcbcd5d010dea9e876adf56abcc6c3 Mon Sep 17 00:00:00 2001 From: luqichao Date: Sat, 18 May 2024 20:22:18 +0800 Subject: [PATCH 6/7] add uwal replica --- plugin/uwal/uwal_replica_plugin.cc | 381 +++++++++++++++++++++++++++++ 1 file changed, 381 insertions(+) create mode 100644 plugin/uwal/uwal_replica_plugin.cc diff --git a/plugin/uwal/uwal_replica_plugin.cc b/plugin/uwal/uwal_replica_plugin.cc new file mode 100644 index 000000000..15d647c4b --- /dev/null +++ b/plugin/uwal/uwal_replica_plugin.cc @@ -0,0 +1,381 @@ +#include +#include + +#include "my_inttypes.h" +#include "my_macros.h" +#include "my_psi_config.h" +#include "mysql/psi/mysql_memory.h" +#include "mysql/psi/mysql_stage.h" +#include "sql/current_thd.h" +#include "sql/protocol_classic.h" +#include "sql/raii/sentry.h" +#include "sql/sql_class.h" +#include "sql/sql_lex.h" +#include "typelib.h" +#include "plugin/uwal/uwal_source.h" +#include "sql/replication.h" + +#define PLUGIN_AUTHOR_HUAWEI "Huawei Technologies Co., Ltd." +#define UWAL_PLUGIN_NAME "uwal" + +static SERVICE_TYPE(registry) *reg_srv = nullptr; +SERVICE_TYPE(log_builtins) *log_bi = nullptr; +SERVICE_TYPE(log_builtins_string) *log_bs = nullptr; + +UwalObject *uwalReplicaObj = nullptr; + +static MYSQL_SYSVAR_BOOL( + enabled, uwal_enabled, PLUGIN_VAR_OPCMDARG, + "Enable uwal (disabled by default).", + nullptr, // check + nullptr, // update + 0); + +static MYSQL_SYSVAR_ULONG( + disk_size, uwal_disk_size, PLUGIN_VAR_OPCMDARG, + "Maximum disk space occupied by uwal.", + nullptr, // check + nullptr, // update + UWAL_MIN_DISK_SIZE, // default value + UWAL_MIN_DISK_SIZE, // minimum value + UWAL_MAX_DISK_SIZE, // maximum value + UWAL_BLOCK_SIZE); + +static MYSQL_SYSVAR_STR( + devices_path, uwal_devices_path, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "Storage path for uwal.", + nullptr, // check + nullptr, // update + "./"); + +static MYSQL_SYSVAR_STR( + log_path, uwal_log_path, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "Log path for uwal.", + nullptr, // check + nullptr, // update + "./"); + +static MYSQL_SYSVAR_STR( + ip, uwal_ip, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "IP for uwal.", + nullptr, // check + nullptr, // update + "127.0.0.1"); + +static MYSQL_SYSVAR_UINT( + id, uwal_id, PLUGIN_VAR_OPCMDARG, + "Uwal node id.", + nullptr, // check + nullptr, // update + 0, // default value + 0, // minimum value + 7, // maximum value + 0); + +static MYSQL_SYSVAR_UINT( + port, uwal_port, PLUGIN_VAR_OPCMDARG, + "Listen port for uwal RPC.", + nullptr, // check + nullptr, // update + 9991, // default value + 1024, // minimum value + 65535, // maximum value + 0); + +static MYSQL_SYSVAR_STR( + protocol, uwal_protocol, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "Protocol used by uwal.", + nullptr, // check + nullptr, // update + "tcp"); + +static MYSQL_SYSVAR_UINT( + master_id, uwal_master_id, PLUGIN_VAR_OPCMDARG, + "Uwal remote node id.", + nullptr, // check + nullptr, // update + 1, // default value + 0, // minimum value + 7, // maximum value + 0); + +static MYSQL_SYSVAR_STR( + master_ip, uwal_master_ip, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "IP for remote uwal node.", + nullptr, // check + nullptr, // update + "127.0.0.1"); + +static MYSQL_SYSVAR_STR( + master_protocol, uwal_master_protocol, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "Protocol used by master uwal.", + nullptr, // check + nullptr, // update + "tcp"); + +static SYS_VAR *uwal_replica_system_vars[] = { + MYSQL_SYSVAR(enabled), + MYSQL_SYSVAR(disk_size), + MYSQL_SYSVAR(devices_path), + MYSQL_SYSVAR(log_path), + MYSQL_SYSVAR(ip), + MYSQL_SYSVAR(id), + MYSQL_SYSVAR(port), + MYSQL_SYSVAR(protocol), + MYSQL_SYSVAR(master_id), + MYSQL_SYSVAR(master_ip), + MYSQL_SYSVAR(master_protocol), + nullptr, +}; + +int UwalStandbyInitNotify() +{ + NodeStateList *nodeList = (NodeStateList *)malloc(sizeof(NodeStateList) + sizeof(NodeStateInfo)); + nodeList->nodeNum = 1; + nodeList->localNodeId = uwalReplicaObj->GetNodeId(); + nodeList->masterNodeId = NODE_ID_INVALID; + + // local state info + NodeStateInfo localStateInfo; + uwalReplicaObj->GetLocalStateInfo(&localStateInfo); + nodeList->nodeList[0] = localStateInfo; + + int ret = UwalSyncNotify(nodeList); + free(nodeList); + return ret; +} + +static int uwal_binlog_relay_after_register() +{ + LogPluginErr(SYSTEM_LEVEL, ER_UWAL_REPLICA_NOTIFY); + NodeStateList *nodeList = (NodeStateList *)malloc(sizeof(NodeStateList) + sizeof(NodeStateInfo) * 2); + nodeList->nodeNum = 2; + nodeList->localNodeId = uwalReplicaObj->GetNodeId(); + nodeList->masterNodeId = uwalReplicaObj->master_info.node_id; + + NodeStateInfo localStateInfo; + uwalReplicaObj->GetLocalStateInfo(&localStateInfo); + nodeList->nodeList[0] = localStateInfo; + + nodeList->nodeList[1].groupId = 0; + nodeList->nodeList[1].groupLevel = 0; + nodeList->nodeList[1].state = NODE_STATE_UP; + nodeList->nodeList[1].nodeId = uwalReplicaObj->master_info.node_id; + nodeList->nodeList[1].netList.num = 1; + nodeList->nodeList[1].netList.list[0].port = uwalReplicaObj->GetListenPort(); + nodeList->nodeList[1].netList.list[0].ipv4Addr =ock_uwal_ipv4_inet_to_int(uwalReplicaObj->master_info.node_ip); + nodeList->nodeList[1].netList.list[0].protocol = NET_PROTOCOL_TCP; + if (uwalReplicaObj->GetProtocol() == NET_PROTOCOL_RDMA + && uwalReplicaObj->master_info.protocol == NET_PROTOCOL_RDMA) { + nodeList->nodeList[1].netList.list[0].protocol = NET_PROTOCOL_RDMA; + } + + int ret = UwalSyncNotify(nodeList); + free(nodeList); + return ret; +} + +uint64_t uwal_read_offset = 4; +uint64_t uwal_write_offset = 4; +uint64_t uwal_buffer_left_len = 0; +uint64_t uwal_buffer_read_len = 0; +char *uwal_buffer; +UwalId *uwalId; +static int relay_query_uwal() +{ + UwalVector infoV; + UwalInfo uwalInfo; + infoV.uwals = &uwalInfo; + int ret = ock_uwal_query_by_user(UwalUserType::UWAL_USER_OPENGAUSS, UwalRouteType::UWAL_ROUTE_LOCAL, &infoV); + if (ret != 0) { + LogPluginErr(WARNING_LEVEL, ER_UWAL_RELAY_QUERY_FAILED); + return 1; + } + + if (uwal_write_offset == infoV.uwals->info.writeOffset) { + return 2; // the same with last query + } + + uwal_write_offset = infoV.uwals->info.writeOffset; + if (uwalId == nullptr) { + uwalId = static_cast(malloc(sizeof(UwalId))); + memcpy(uwalId, &(infoV.uwals->id), sizeof(UwalId)); + } + return 0; +} + +static int relay_read_uwal() +{ + if (uwal_buffer == nullptr) { + uwal_buffer = static_cast(malloc(sizeof(char) * UWAL_BLOCK_SIZE)); + } + + if (uwal_write_offset = uwal_read_offset < UWAL_BLOCK_SIZE) { + query: + int ret = relay_query_uwal(); + if (ret == 2) { + sleep(1); + goto query; + } else if (ret != 0) { + return 1; + } + } + + uint64_t read_offset = uwal_read_offset - uwal_buffer_left_len; + uwal_buffer_left_len = + read_offset + UWAL_BLOCK_SIZE < uwal_write_offset ? UWAL_BLOCK_SIZE : uwal_write_offset - read_offset; + LogPluginErr(INFORMATION_LEVEL, ER_UWAL_RELAY_READ_INFO, uwal_buffer_read_len, uwal_read_offset, uwal_write_offset); + + UwalDataToReadVec dataV; + UwalDataToRead uwalDataToRead; + uwalDataToRead.length = uwal_buffer_read_len; + uwalDataToRead.offset = read_offset; + dataV.dataToRead = &uwalDataToRead; + dataV.cnt = 1; + + UwalBufferList bufferList; + UwalBuffer uwalBuffer; + uwalBuffer.buf = uwal_buffer; + uwalBuffer.len = uwal_buffer_read_len; + bufferList.buffers = &uwalBuffer; + bufferList.cnt = 1; + + UwalReadParam readParam = {uwalId, UwalRouteType::UWAL_ROUTE_LOCAL, &dataV, nullptr}; + int ret = ock_uwal_read(&readParam, &bufferList); + if (ret != 0) { + LogPluginErr(WARNING_LEVEL, ER_UWAL_RELAY_READ_UWAL_FAILED, ret, read_offset, uwal_buffer_read_len); + return 1; + } + uwal_read_offset += uwal_buffer_read_len - uwal_buffer_left_len; + uwal_buffer_left_len = uwal_buffer_read_len; + return 0; +} + +static int uwal_binlog_relay_read_event(ulong &event_len, const char **event_buffer) +{ + if (uwal_buffer_left_len < 19) {// LOG_EVENT_HEADER_LEN + int ret = relay_read_uwal(); + if (ret != 0) { + LogPluginErr(WARNING_LEVEL, ER_UWAL_RELAY_READ_FAILED, ret); + event_len = packet_error; + return 1; + } + } + + event_len = uint4korr(uwal_buffer + (uwal_buffer_read_len - uwal_buffer_left_len) + 9); + read: + if (uwal_buffer_left_len - event_len < 0) { + // read uwal + int ret = relay_read_uwal(); + if (ret != 0) { + LogPluginErr(WARNING_LEVEL, ER_UWAL_RELAY_READ_FAILED, ret); + event_len = packet_error; + return 1; + } + goto read; + } else { + char *buffer = static_cast(malloc(sizeof(char) * event_len)); + memcpy(buffer, uwal_buffer + (uwal_buffer_read_len - uwal_buffer_left_len), event_len); + *event_buffer = buffer; + uwal_buffer_left_len -= event_len; + return 0; + } +} + +static int uwal_binlog_relay_is_uwal_relay(int &uwal_skip) +{ + uwal_skip = 1; + return 0; +} + +Binlog_relay_uwal_observer uwal_relay_observer = { + sizeof(Binlog_relay_uwal_observer), // len + + uwal_binlog_relay_after_register, // relay_after_register + uwal_binlog_relay_read_event, + uwal_binlog_relay_is_uwal_relay, +}; + +static int uwal_replica_plugin_init(void *p) +{ + // Initialize error logging service. + if (init_logging_service_for_plugin(®_srv, &log_bi, &log_bs)) return 1; + + uwalReplicaObj = new UwalObject(); + + // dlopen uwal here + int ret = uwal_init_symbols(); + if (ret != 0) { + return 1; + } + + ret = uwalReplicaObj->UwalInit(); + if (ret != 0) { + return 1; + } + uwalReplicaObj->SetMasterNode(false); + uwalReplicaObj->SetMasterInfo(); + + // notify + ret = UwalStandbyInitNotify(); + if (ret != 0) { + LogPluginErr(ERROR_LEVEL, ER_UWAL_REPLICA_NOTIFY_FAILED, ret); + return 1; + } + + // register observer + if (register_binlog_relay_uwal_observer(&uwal_relay_observer, p)) { + return 1; + } + + return 0; +} + +static int uwal_replica_plugin_deinit(void *p) +{ + // the plugin was not initialized, thre is nothing to do here + delete uwalReplicaObj; + + // unregister observer + if (unregister_binlog_relay_uwal_observer(&uwal_relay_observer, p)) { + return 1; + } + + deinit_logging_service_for_plugin(®_srv, &log_bi, &log_bs); + return 0; +} + +static int uwal_replica_plugin_check_uninstall(void *) +{ + return 0; +} + +struct Mysql_replication uwal_replica_plugin = { + MYSQL_REPLICATION_INTERFACE_VERSION}; + +/* + Plugin library descriptor +*/ + +mysql_declare_plugin(uwal_replica){ + MYSQL_REPLICATION_PLUGIN, + &uwal_replica_plugin, + UWAL_PLUGIN_NAME, + PLUGIN_AUTHOR_HUAWEI, + "Replica-side uwal replication.", + PLUGIN_LICENSE_GPL, + uwal_replica_plugin_init, /* Plugin Init */ + uwal_replica_plugin_check_uninstall, /* Plugin Check uninstall */ + uwal_replica_plugin_deinit, /* Plugin Deinit */ + 0x0100 /* 1.0 */, + nullptr, /* status variables */ + uwal_replica_system_vars, /* system variables */ + nullptr, /* config options */ + 0, /* flags */ +} mysql_declare_plugin_end; \ No newline at end of file -- Gitee From d888bed98c1b7121db01d7029520487c131c7ad8 Mon Sep 17 00:00:00 2001 From: luqichao Date: Sat, 18 May 2024 20:25:38 +0800 Subject: [PATCH 7/7] add uwal plugin cmake --- plugin/uwal/CMakeLists.txt | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 plugin/uwal/CMakeLists.txt diff --git a/plugin/uwal/CMakeLists.txt b/plugin/uwal/CMakeLists.txt new file mode 100644 index 000000000..537ef3d2a --- /dev/null +++ b/plugin/uwal/CMakeLists.txt @@ -0,0 +1,24 @@ +ADD_DEFINITIONS(-DMYSQL_SERVER) +ADD_DEFINITIONS(-DLOG_SUBSYSTEM_TAG="Repl") +ADD_DEFINITIONS(-DLOG_COMPONENT_TAG="uwal") + +DISABLE_MISSING_PROFILE_WARNING() + +MYSQL_ADD_PLUGIN(uwal_master + uwal_adaptor.cc + uwal_source_plugin.cc + uwal_source.cc + MODULE_ONLY + MODULE_OUTPUT_NAME "uwal_master" + VISIBILITY_HIDDEN + ) + +MYSQL_ADD_PLUGIN(uwal_replica + uwal_adaptor.cc + uwal_replica_plugin.cc + uwal_source.cc + MODULE_ONLY + MODULE_OUTPUT_NAME "uwal_replica" + VISIBILITY_HIDDEN + ) + -- Gitee