From e36532a2b4d1f6082a3c8721465c9fe1f926a520 Mon Sep 17 00:00:00 2001 From: liangwenjia <1757640220@qq.com> Date: Fri, 10 Nov 2023 18:59:09 +0800 Subject: [PATCH] add connection balancing --- plugin/thread_pool/threadpool.h | 56 ++++++++ plugin/thread_pool/threadpool_common.cc | 15 +- plugin/thread_pool/threadpool_unix.cc | 175 ++++++++++++++++-------- 3 files changed, 189 insertions(+), 57 deletions(-) diff --git a/plugin/thread_pool/threadpool.h b/plugin/thread_pool/threadpool.h index f4dd68dc8..ce92934d3 100644 --- a/plugin/thread_pool/threadpool.h +++ b/plugin/thread_pool/threadpool.h @@ -20,6 +20,7 @@ #include "sql/mysqld_thd_manager.h" #include "sql/conn_handler/connection_handler_manager.h" #include "sql/conn_handler/channel_info.h" +#include "pthread.h" struct SHOW_VAR; @@ -42,6 +43,7 @@ extern uint threadpool_max_threads; extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/ extern uint threadpool_oversubscribe; /* Maximum active threads in group */ extern uint threadpool_toobusy; /* Maximum active and waiting threads in group */ +extern bool threadpool_connection_balance; /* Control whether conncetions migrating to another thread group so that they are evenly distributed */ /* Possible values for thread_pool_high_prio_mode */ extern const char *threadpool_high_prio_mode_names[]; @@ -85,5 +87,59 @@ extern void tp_set_threadpool_stall_limit(uint val) noexcept; extern uint tp_get_thdvar_high_prio_tickets(THD *thd); extern uint tp_get_thdvar_high_prio_mode(THD *thd); +class thread_pool_rwlock_t { +public: + thread_pool_rwlock_t() + { + pthread_rwlock_init(&lk, NULL); + } + + ~thread_pool_rwlock_t() + { + pthread_rwlock_destroy(&lk); + } + + void slock() + { + while (pthread_rwlock_rdlock(&lk) != 0) { + std::this_thread::yield(); + } + } + + void unslock() + { + while (pthread_rwlock_unlock(&lk) != 0) { + std::this_thread::yield(); + } + } + + void xlock() + { + while (pthread_rwlock_wrlock(&lk) != 0) { + std::this_thread::yield(); + } + } + + void unxlock() + { + while (pthread_rwlock_unlock(&lk) != 0) { + std::this_thread::yield(); + } + } + + void lock() + { + xlock(); + } + + void unlock() + { + unxlock(); + } + +private: + pthread_rwlock_t lk; +}; + #endif // THREADPOOL_H_ diff --git a/plugin/thread_pool/threadpool_common.cc b/plugin/thread_pool/threadpool_common.cc index 00595fc4b..e7b64d001 100644 --- a/plugin/thread_pool/threadpool_common.cc +++ b/plugin/thread_pool/threadpool_common.cc @@ -45,6 +45,9 @@ uint threadpool_stall_limit; uint threadpool_max_threads; uint threadpool_oversubscribe; uint threadpool_toobusy; +bool threadpool_connection_balance; + +extern thread_pool_rwlock_t change_group_rwlock; /* Stats */ TP_STATISTICS tp_stats; @@ -148,7 +151,8 @@ int threadpool_add_connection(THD *thd) { /* Create new PSI thread for use with the THD. */ #ifdef HAVE_PSI_THREAD_INTERFACE - thd->set_psi(PSI_THREAD_CALL(new_thread)(key_thread_one_connection, thd, + uint seqnum = 0; + thd->set_psi(PSI_THREAD_CALL(new_thread)(key_thread_one_connection, seqnum++, thd, thd->thread_id())); #endif @@ -347,6 +351,12 @@ static MYSQL_SYSVAR_UINT(max_threads, threadpool_max_threads, "Maximum allowed number of worker threads in the thread pool", NULL, NULL, MAX_CONNECTIONS, 1, MAX_CONNECTIONS, 1); +static MYSQL_SYSVAR_BOOL(connection_balance, threadpool_connection_balance, +PLUGIN_VAR_RQCMDARG, +"Control whether thread group migrating connections" +"so that they are evenly distributed.", nullptr, +nullptr, false); + static int threadpool_plugin_init(void *) { DBUG_ENTER("threadpool_plugin_init"); @@ -408,6 +418,7 @@ SYS_VAR *system_variables[] = { MYSQL_SYSVAR(oversubscribe), MYSQL_SYSVAR(toobusy), MYSQL_SYSVAR(high_prio_tickets), + MYSQL_SYSVAR(connection_balance), MYSQL_SYSVAR(high_prio_mode), NULL }; @@ -444,8 +455,10 @@ static int groups_fill_table(THD* thd, TABLE_LIST* tables, Item*) /* ID */ table->field[0]->store(i, true); + change_group_rwlock.slock(); /* CONNECTION_COUNT */ table->field[1]->store(group->connection_count, true); + change_group_rwlock.unlock(); /* THREAD_COUNT */ table->field[2]->store(group->thread_count, true); /* ACTIVE_THREAD_COUNT */ diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index a9fdf3dbf..291ced899 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -24,6 +24,7 @@ #include "threadpool.h" #include #include +#include #define MYSQL_SERVER 1 @@ -37,6 +38,8 @@ for stall detection to kick in */ /** Indicates that threadpool was initialized*/ static bool threadpool_started = false; +thread_pool_rwlock_t change_group_rwlock; + /* Define PSI Keys for performance schema. We have a mutex per group, worker threads, condition per worker thread, @@ -59,8 +62,8 @@ static PSI_cond_info cond_list[] = { static PSI_thread_key key_worker_thread; static PSI_thread_key key_timer_thread; static PSI_thread_info thread_list[] = { - {&key_worker_thread, "worker_thread", 0, 0, PSI_DOCUMENT_ME}, - {&key_timer_thread, "timer_thread", PSI_FLAG_SINGLETON, 0, + {&key_worker_thread, "worker_thread", "thread_pool_worker", 0, 0, PSI_DOCUMENT_ME}, + {&key_timer_thread, "timer_thread", "thread_pool_timer", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}}; #endif // HAVE_PSI_INTERFACE @@ -353,12 +356,108 @@ inline bool connection_is_high_prio(const connection_t &c) noexcept { c.thd->mdl_context.has_locks(MDL_key::LOCKING_SERVICE))); } +int change_group(connection_t *c, thread_group_t *group, thread_group_t *to_group) { + assert(c->thread_group == group); + + /* Remove connection from the old group. */ + if (c->bound_to_poll_descriptor) { + Vio *const vio = c->thd->get_protocol_classic()->get_vio(); + const int fd = mysql_socket_getfd(vio->mysql_socket); + mysql_mutex_lock(&group->mutex); + io_poll_disassociate_fd(group->pollfd, fd); + c->bound_to_poll_descriptor = false; + mysql_mutex_unlock(&group->mutex); + } + c->thread_group->connection_count--; + + /* Add connection to the new group. */ + c->thread_group = to_group; + to_group->connection_count++; + /* Ensure that there is a listener in the new group. */ + int ret = 0; + if (!to_group->thread_count) ret = create_worker(to_group, false); + mysql_mutex_unlock(&to_group->mutex); + + return ret; +} + +int get_avg_conn_cnt() { + int total_conn_cnt = 0; + + for (uint i = 0; i < group_count; i++) { + total_conn_cnt += all_groups[i].connection_count; + } + return ceil((total_conn_cnt + 0.0) / group_count); +} + +thread_group_t *get_change_group_to(connection_t *connection) { + int avg_conn_cnt = get_avg_conn_cnt(); + thread_group_t *group = connection->thread_group; + + thread_group_t *to_group = &all_groups[(connection->thd->thread_id()) % group_count]; + if (to_group->connection_count <= avg_conn_cnt || !threadpool_connection_balance) { + return to_group; + } + + for (uint i = 0; i < group_count; i++) { + if (group == &all_groups[i]) { + continue; + } + if (all_groups[i].connection_count < avg_conn_cnt || + (connection->thread_group - all_groups >= group_count && i == group_count - 1)) { + return &all_groups[i]; + } + } + return &all_groups[group_count - 1]; +} + +int get_min_conn_cnt() { + int min_conn_cnt = INT_MAX; + for (uint i = 0; i < group_count; i++) { + min_conn_cnt = all_groups[i].connection_count < min_conn_cnt ? + all_groups[i].connection_count : min_conn_cnt; + } + return min_conn_cnt; +} + +bool check_change_group_low(connection_t *connection) { + return connection->thread_group - all_groups >= group_count || + (threadpool_connection_balance && + (connection->thread_group->connection_count > get_avg_conn_cnt() || + connection->thread_group->connection_count - get_min_conn_cnt() >= 2)); +} + +int change_group(connection_t *connection) { + int ret = -1; + change_group_rwlock.xlock(); + if (check_change_group_low(connection)) { + thread_group_t *to_group = get_change_group_to(connection); + ret = change_group(connection, connection->thread_group, to_group); + } + change_group_rwlock.unxlock(); + return ret; +} + +/** + Check if connection needs to migrate to a different group + because group_count has changed after thread_pool_size + setting or connection_count in each thread group is not + evenly distributed. +*/ +bool check_change_group(connection_t *connection) { + bool ret = false; + change_group_rwlock.slock(); + ret = check_change_group_low(connection); + change_group_rwlock.unslock(); + return ret; +} + inline bool connection_is_worker_continue(const connection_t &c) noexcept { if (c.thd->is_admin_connection()) { return true; } - if (c.thread_group != &all_groups[c.thd->thread_id() % group_count]) { + if (check_change_group(const_cast(&c))) { return false; } @@ -1266,9 +1365,9 @@ bool tp_add_connection( } mysql_mutex_unlock(&group->mutex); } else { - mysql_mutex_lock(&group->mutex); + change_group_rwlock.xlock(); group->connection_count++; - mysql_mutex_unlock(&group->mutex); + change_group_rwlock.unxlock(); /* Add connection to the work queue. Actual login @@ -1292,9 +1391,9 @@ static void connection_abort(connection_t *connection) { threadpool_remove_connection(connection->thd); if (!is_admin_port) { - mysql_mutex_lock(&group->mutex); + change_group_rwlock.xlock(); group->connection_count--; - mysql_mutex_unlock(&group->mutex); + change_group_rwlock.unxlock(); } my_free(connection); @@ -1318,9 +1417,9 @@ static void connection_detach(connection_t *connection) { mysql_mutex_unlock(&group->mutex); if (!is_admin_port) { - mysql_mutex_lock(&group->mutex); + change_group_rwlock.xlock(); group->connection_count--; - mysql_mutex_unlock(&group->mutex); + change_group_rwlock.unxlock(); } my_thread_handle thread_id; @@ -1451,57 +1550,21 @@ static void set_wait_timeout(connection_t *c) noexcept { DBUG_VOID_RETURN; } -/** - Handle a (rare) special case,where connection needs to - migrate to a different group because group_count has changed - after thread_pool_size setting. -*/ - -static int change_group(connection_t *c, thread_group_t *old_group, - thread_group_t *new_group) { - assert(c->thread_group == old_group); - - /* Remove connection from the old group. */ - if (c->bound_to_poll_descriptor) { - Vio *const vio = c->thd->get_protocol_classic()->get_vio(); - const int fd = mysql_socket_getfd(vio->mysql_socket); - mysql_mutex_lock(&old_group->mutex); - io_poll_disassociate_fd(old_group->pollfd, fd); - c->bound_to_poll_descriptor = false; - } else { - mysql_mutex_lock(&old_group->mutex); - } - c->thread_group->connection_count--; - mysql_mutex_unlock(&old_group->mutex); - - /* Add connection to the new group. */ - mysql_mutex_lock(&new_group->mutex); - c->thread_group = new_group; - new_group->connection_count++; - /* Ensure that there is a listener in the new group. */ - int ret = 0; - if (!new_group->thread_count) ret = create_worker(new_group, false); - mysql_mutex_unlock(&new_group->mutex); - return ret; -} - static int start_io(connection_t *connection) { /* Usually, connection will stay in the same group for the entire - connection's life. However, we do allow group_count to - change at runtime, which means in rare cases when it changes is - connection should need to migrate to another group, this ensures - to ensure equal load between groups. - - So we recalculate in which group the connection should be, based - on thread_id and current group count, and migrate if necessary. + connection's life. However, we do allow group_count to change + at runtime, which means in rare cases when it decreases + connection whose thread group id exceeds group_count need to + migrate to another group. We also support connection number + balancing between groups, which means when variables + threadpool_connection_balance is set to on, connection would + migrate to the thread group with fewer connections. */ - thread_group_t *const group = - &all_groups[connection->thd->thread_id() % group_count]; - - if (group != connection->thread_group) { - if (change_group(connection, connection->thread_group, group)) return -1; + if (check_change_group(connection)) { + change_group(connection); } + thread_group_t *group = connection->thread_group; /* Bind to poll descriptor if not yet done. @@ -1593,7 +1656,7 @@ static void *worker_main(void *param) { assert(thread_group != nullptr); if (threadpool_sched_affinity) { - group_affinity.bind_numa((thread_group - all_groups) / sizeof(thread_group_t)); + group_affinity.bind_numa((thread_group - all_groups)); } /* Init per-thread structure */ -- Gitee