From 66731c292e83002e944f304f6b9c44c3b07099d3 Mon Sep 17 00:00:00 2001 From: liangwenjia <1757640220@qq.com> Date: Mon, 11 Dec 2023 09:39:12 +0800 Subject: [PATCH] add time thread join, fix uninstall plugin bug --- plugin/thread_pool/threadpool.h | 34 ++++++- plugin/thread_pool/threadpool_common.cc | 61 +++++++++++- plugin/thread_pool/threadpool_unix.cc | 125 ++++++++---------------- 3 files changed, 131 insertions(+), 89 deletions(-) diff --git a/plugin/thread_pool/threadpool.h b/plugin/thread_pool/threadpool.h index ce92934d3..979bed0c5 100644 --- a/plugin/thread_pool/threadpool.h +++ b/plugin/thread_pool/threadpool.h @@ -64,12 +64,12 @@ extern void tp_wait_end(THD *); extern void tp_post_kill_notification(THD *thd) noexcept; extern bool tp_add_connection(Channel_info *); extern void tp_end(void); -extern void tp_fake_end(void); -extern void threadpool_remove_connection(THD *thd); extern bool thread_attach(THD *thd); +extern void tp_deinit(); extern THD_event_functions tp_event_functions; + /* Threadpool statistics */ @@ -141,5 +141,35 @@ private: pthread_rwlock_t lk; }; +class SLockGuard { +public: + SLockGuard(thread_pool_rwlock_t &lk): lck(lk) { + lck.slock(); + } + + ~SLockGuard() { + lck.unslock(); + } + + thread_pool_rwlock_t &lck; +}; + +class XLockGuard { +public: + XLockGuard(thread_pool_rwlock_t &lk): lck(lk) { + lck.xlock(); + } + + ~XLockGuard() { + lck.unxlock(); + } + + thread_pool_rwlock_t &lck; +}; + +extern st_plugin_int *gPluginPtr; +extern thread_pool_rwlock_t gPluginLock; +extern bool gPluginUninstalling; + #endif // THREADPOOL_H_ diff --git a/plugin/thread_pool/threadpool_common.cc b/plugin/thread_pool/threadpool_common.cc index 4d0c302d7..1e59f5c84 100644 --- a/plugin/thread_pool/threadpool_common.cc +++ b/plugin/thread_pool/threadpool_common.cc @@ -368,19 +368,54 @@ PLUGIN_VAR_RQCMDARG, "so that they are evenly distributed.", nullptr, fix_threadpool_connection_balance, false); -static int threadpool_plugin_init(void *) +static bool tptarget(void) +{ + unsigned long long cpuId; + __asm__ volatile("mrs %0, MIDR_EL1":"=r"(cpuId)); + + unsigned long long vendor = (cpuId >> 0x18) & 0xFF; + unsigned long long partId = (cpuId >> 0x4) & 0xFFF; + return ((vendor == 0x48) && partId == 0xD01); +} + +thread_pool_rwlock_t gPluginLock; +st_plugin_int *gPluginPtr = nullptr; +bool gPluginUninstalling = false; + +static int threadpool_plugin_init(void *plugin) { DBUG_ENTER("threadpool_plugin_init"); + if (!tptarget()) { + DBUG_RETURN(-1); + } + + gPluginPtr = static_cast(plugin); + gPluginUninstalling = false; tp_init(); my_connection_handler_set(&tp_chf, &tp_event_functions); DBUG_RETURN(0); } +static int threadpool_plugin_check_uninstall(void *) { + DBUG_ENTER("threadpool_plugin_check_uninstall"); + XLockGuard lk(gPluginLock); + if (!gPluginUninstalling) { + gPluginUninstalling = true; + my_connection_handler_reset(); + } + DBUG_RETURN(0); +} + static int threadpool_plugin_deinit(void *) { DBUG_ENTER("threadpool_plugin_deinit"); - my_connection_handler_reset(); + XLockGuard lk(gPluginLock); + if (!gPluginUninstalling) { + gPluginUninstalling = true; + my_connection_handler_reset(); + } + tp_deinit(); DBUG_RETURN(0); } @@ -497,6 +532,10 @@ static int groups_fill_table(THD* thd, TABLE_LIST* tables, Item*) static int groups_init(void* p) { + if (!tptarget()) { + return -1; + } + ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p; schema->fields_info = Show::groups_fields_info; schema->fill_table = groups_fill_table; @@ -572,6 +611,10 @@ static int queues_fill_table(THD* thd, TABLE_LIST* tables, Item*) static int queues_init(void* p) { + if (!tptarget()) { + return -1; + } + ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p; schema->fields_info = Show::queues_field_info; schema->fill_table = queues_fill_table; @@ -631,6 +674,10 @@ static int stats_fill_table(THD* thd, TABLE_LIST* tables, Item*) static int stats_init(void* p) { + if (!tptarget()) { + return -1; + } + ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p; schema->fields_info = Show::stats_fields_info; schema->fill_table = stats_fill_table; @@ -685,6 +732,10 @@ static int waits_fill_table(THD* thd, TABLE_LIST* tables, Item*) static int waits_init(void* p) { + if (!tptarget()) { + return -1; + } + ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p; schema->fields_info = Show::waits_fields_info; schema->fill_table = waits_fill_table; @@ -705,9 +756,9 @@ mysql_declare_plugin(thread_pool) "TEST_TEST", "thread pool plugin extracted from percona server", PLUGIN_LICENSE_GPL, - threadpool_plugin_init, /* Plugin Init */ - nullptr, /* Plugin Check uninstall */ - threadpool_plugin_deinit, /* Plugin Deinit */ + threadpool_plugin_init, /* Plugin Init */ + threadpool_plugin_check_uninstall, /* Plugin Check uninstall */ + threadpool_plugin_deinit, /* Plugin Deinit */ 0x0100 /* 1.0 */, nullptr, /* status variables */ system_variables, /* system variables */ diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index 09d0c703f..707fbd54c 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -18,6 +18,7 @@ #include "sql/debug_sync.h" #include "sql/log.h" #include "sql/protocol_classic.h" +#include "sql/set_var.h" #include "my_sys.h" #include "my_systime.h" #include "mysql/thread_pool_priv.h" // thd_is_transaction_active() @@ -109,13 +110,24 @@ static int wake_or_create_thread(thread_group_t *thread_group, static int create_worker(thread_group_t *thread_group, bool due_to_stall) noexcept; static void *admin_port_worker_main(void *param); static void *worker_main(void *param); -static void *connection_detach_worker(void *param); static void check_stall(thread_group_t *thread_group); static void connection_abort(connection_t *connection); static void set_next_timeout_check(ulonglong abstime); static void print_pool_blocked_message(bool) noexcept; -THD *thd_to_detach = nullptr; +static void tp_inc_ref_count() { + lock_plugin_mutex(); + assert(gPluginPtr != nullptr); + gPluginPtr->ref_count++; + unlock_plugin_mutex(); +} + +static void tp_dec_ref_count() { + lock_plugin_mutex(); + assert(gPluginPtr != nullptr); + gPluginPtr->ref_count--; + unlock_plugin_mutex(); +} class ThreadPoolConnSet { public: @@ -130,23 +142,8 @@ public: return ret; } - void killConns() { - mtx.lock(); - for (auto &it: conns) { - THD *thd = it->thd; - if (current_thd != thd && thd->killed != THD::KILL_CONNECTION) { - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->killed = THD::KILL_CONNECTION; - tp_post_kill_notification(thd); - mysql_mutex_unlock(&thd->LOCK_thd_data); - } else if (current_thd == thd) { - thd_to_detach = thd; - } - } - mtx.unlock(); - } - void insert(connection_t *c) { + tp_inc_ref_count(); mtx.lock(); conns.insert(c); mtx.unlock(); @@ -156,6 +153,7 @@ public: mtx.lock(); conns.erase(c); mtx.unlock(); + tp_dec_ref_count(); } public: @@ -707,13 +705,14 @@ static void check_stall(thread_group_t *thread_group) { mysql_mutex_unlock(&thread_group->mutex); } +my_thread_handle timer_thread_id; + static void start_timer(pool_timer_t *timer) noexcept { - my_thread_handle thread_id; DBUG_ENTER("start_timer"); mysql_mutex_init(key_timer_mutex, &timer->mutex, nullptr); mysql_cond_init(key_timer_cond, &timer->cond); timer->shutdown = false; - mysql_thread_create(key_timer_thread, &thread_id, nullptr, timer_thread, timer); + mysql_thread_create(key_timer_thread, &timer_thread_id, nullptr, timer_thread, timer); DBUG_VOID_RETURN; } @@ -723,6 +722,7 @@ static void stop_timer(pool_timer_t *timer) noexcept { timer->shutdown = true; mysql_cond_signal(&timer->cond); mysql_mutex_unlock(&timer->mutex); + my_thread_join(&timer_thread_id, nullptr); DBUG_VOID_RETURN; } @@ -1333,6 +1333,11 @@ static connection_t *alloc_connection(THD *thd) noexcept { bool tp_add_connection( Channel_info *channel_info) { DBUG_ENTER("Thread_pool_connection_handler::add_connection"); + SLockGuard lk(gPluginLock); + if (gPluginUninstalling) { + channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false); + DBUG_RETURN(true); + } THD *const thd = channel_info->create_thd(); @@ -1415,57 +1420,6 @@ static void connection_abort(connection_t *connection) { DBUG_VOID_RETURN; } -/** - Detach connection. -*/ -static void connection_detach(connection_t *connection) { - DBUG_ENTER("connection_detach"); - threadpool_thds.erase(connection); - - thread_group_t *group = connection->thread_group; - bool is_admin_port = connection->thd->is_admin_connection(); - Vio *const vio = connection->thd->get_protocol_classic()->get_vio(); - const int fd = mysql_socket_getfd(vio->mysql_socket); - mysql_mutex_lock(&group->mutex); - io_poll_disassociate_fd(group->pollfd, fd); - connection->bound_to_poll_descriptor = false; - mysql_mutex_unlock(&group->mutex); - - if (!is_admin_port) { - change_group_rwlock.xlock(); - group->connection_count--; - change_group_rwlock.unxlock(); - } - - my_thread_handle thread_id; - - if (mysql_thread_create(key_worker_thread, &thread_id, group->pthread_attr, - connection_detach_worker, connection->thd)) { - threadpool_remove_connection(connection->thd); - } - - my_free(connection); - DBUG_VOID_RETURN; -} - - -static void *connection_detach_worker(void *param) { - my_thread_init(); - DBUG_ENTER("connection_detach_worker"); - THD *thd = static_cast(param); - assert(thd != nullptr); - thread_attach(thd); - - while (1) { - if (threadpool_process_request(thd)) { - break; - } - } - - threadpool_remove_connection(thd); - return nullptr; -} - /** MySQL scheduler callback : kill connection */ @@ -1609,11 +1563,6 @@ static void handle_event(connection_t *connection) { if (err) { goto end; } - - if (connection->thd == thd_to_detach) { - connection_detach(connection); - goto end_return; - } set_wait_timeout(connection); @@ -1631,7 +1580,6 @@ end: connection_abort(connection); } -end_return: DBUG_VOID_RETURN; } @@ -1742,7 +1690,18 @@ bool tp_init() { DBUG_RETURN(false); } -void tp_end_thread() { +std::thread *tp_end_thread = nullptr; + +void tp_deinit() { + if (tp_end_thread != nullptr) { + assert(threadpool_thds.empty()); + tp_end_thread->join(); + delete tp_end_thread; + tp_end_thread = nullptr; + } +} + +void tp_end_func() { if (!threadpool_started) { return; } @@ -1762,10 +1721,12 @@ void tp_end_thread() { void tp_end() { DBUG_ENTER("tp_end"); - threadpool_thds.killConns(); - - std::thread exit_tp(tp_end_thread); - exit_tp.detach(); + if (threadpool_thds.empty()) { + assert(tp_end_thread == nullptr); + tp_end_func(); + } else { + tp_end_thread = new std::thread(tp_end_func); + } DBUG_VOID_RETURN; } -- Gitee