From c266ecff5bdd663f711fcd3133ed86c64a98442d Mon Sep 17 00:00:00 2001 From: liangwenjia <1757640220@qq.com> Date: Wed, 29 Nov 2023 11:47:15 +0800 Subject: [PATCH] add version adaption 8.0.25 and later --- plugin/thread_pool/threadpool_common.cc | 15 +++++++++- plugin/thread_pool/threadpool_unix.cc | 38 ++++++++++++++++++------- plugin/thread_pool/threadpool_unix.h | 1 + 3 files changed, 43 insertions(+), 11 deletions(-) diff --git a/plugin/thread_pool/threadpool_common.cc b/plugin/thread_pool/threadpool_common.cc index b8c421ce8..4d0c302d7 100644 --- a/plugin/thread_pool/threadpool_common.cc +++ b/plugin/thread_pool/threadpool_common.cc @@ -151,8 +151,13 @@ int threadpool_add_connection(THD *thd) { /* Create new PSI thread for use with the THD. */ #ifdef HAVE_PSI_THREAD_INTERFACE +#if (MYSQL_VERSION_ID>80026) + thd->set_psi(PSI_THREAD_CALL(new_thread)(key_thread_one_connection, 0, thd, + thd->thread_id())); +#else thd->set_psi(PSI_THREAD_CALL(new_thread)(key_thread_one_connection, thd, thd->thread_id())); +#endif #endif /* Login. */ @@ -296,6 +301,13 @@ static void fix_threadpool_stall_limit(THD*, struct SYS_VAR *, void*, const void tp_set_threadpool_stall_limit(threadpool_stall_limit); } +static void fix_threadpool_connection_balance(THD*, struct SYS_VAR *, void*, const void* value) +{ + change_group_rwlock.xlock(); + threadpool_connection_balance = *static_cast(value); + change_group_rwlock.unlock(); +} + static inline int my_getncpus() noexcept { #ifdef _SC_NPROCESSORS_ONLN return sysconf(_SC_NPROCESSORS_ONLN); @@ -354,7 +366,7 @@ static MYSQL_SYSVAR_BOOL(connection_balance, threadpool_connection_balance, PLUGIN_VAR_RQCMDARG, "Control whether thread group migrating connections" "so that they are evenly distributed.", nullptr, -nullptr, false); +fix_threadpool_connection_balance, false); static int threadpool_plugin_init(void *) { @@ -406,6 +418,7 @@ static uint &stall_limit = threadpool_stall_limit; static uint &max_threads = threadpool_max_threads; static uint &oversubscribe = threadpool_oversubscribe; static uint &toobusy = threadpool_toobusy; +static bool &connection_balance = threadpool_connection_balance; SYS_VAR *system_variables[] = { MYSQL_SYSVAR(idle_timeout), diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index 5d81f7c78..09d0c703f 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -14,6 +14,7 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "threadpool_unix.h" +#include "numa_affinity_manager.h" #include "sql/debug_sync.h" #include "sql/log.h" #include "sql/protocol_classic.h" @@ -61,10 +62,19 @@ static PSI_cond_info cond_list[] = { static PSI_thread_key key_worker_thread; static PSI_thread_key key_timer_thread; + +#if (MYSQL_VERSION_ID>80026) +static PSI_thread_info thread_list[] = { + {&key_worker_thread, "worker_thread", "worker_th", 0, 0, PSI_DOCUMENT_ME}, + {&key_timer_thread, "timer_thread", "timer_th", PSI_FLAG_SINGLETON, 0, + PSI_DOCUMENT_ME}}; +#else static PSI_thread_info thread_list[] = { {&key_worker_thread, "worker_thread", 0, 0, PSI_DOCUMENT_ME}, {&key_timer_thread, "timer_thread", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}}; +#endif + #endif // HAVE_PSI_INTERFACE thread_group_t all_groups[MAX_THREAD_GROUPS]; @@ -378,6 +388,10 @@ int change_group(connection_t *c, thread_group_t *group, thread_group_t *to_grou if (!to_group->thread_count) ret = create_worker(to_group, false); mysql_mutex_unlock(&to_group->mutex); + if (threadpool_sched_affinity) { + group_affinity.bind_numa(to_group - all_groups); + } + return ret; } @@ -403,8 +417,7 @@ thread_group_t *get_change_group_to(connection_t *connection) { if (group == &all_groups[i]) { continue; } - if (all_groups[i].connection_count < avg_conn_cnt || - (connection->thread_group - all_groups >= group_count && i == group_count - 1)) { + if (all_groups[i].connection_count < avg_conn_cnt || i == group_count - 1) { return &all_groups[i]; } } @@ -421,10 +434,12 @@ int get_min_conn_cnt() { } bool check_change_group_low(connection_t *connection) { - return connection->thread_group - all_groups >= group_count || - (threadpool_connection_balance && - (connection->thread_group->connection_count > get_avg_conn_cnt() || - connection->thread_group->connection_count - get_min_conn_cnt() >= 2)); + return (!threadpool_connection_balance && + connection->thread_group - all_groups != connection->thd->thread_id() % group_count) || + (threadpool_connection_balance && + ((connection->thread_group->connection_count > get_avg_conn_cnt() || + connection->thread_group->connection_count - get_min_conn_cnt() >= 2) || + (connection->thread_group - all_groups >= group_count))); } int change_group(connection_t *connection) { @@ -526,9 +541,9 @@ class Thd_timeout_checker : public Do_THD_Impl { public: Thd_timeout_checker(pool_timer_t *timer) noexcept : m_timer(timer) {} - virtual ~Thd_timeout_checker() {} + ~Thd_timeout_checker() override {} - virtual void operator()(THD *thd) noexcept { + void operator()(THD *thd) noexcept override{ if (thd_get_net_read_write(thd) != 1) return; connection_t *connection = (connection_t *)thd->scheduler.data; @@ -1656,7 +1671,7 @@ static void *worker_main(void *param) { assert(thread_group != nullptr); if (threadpool_sched_affinity) { - group_affinity.bind_numa((thread_group - all_groups)); + group_affinity.bind_numa(thread_group - all_groups); } /* Init per-thread structure */ @@ -1767,16 +1782,19 @@ void tp_set_threadpool_size(uint size) noexcept { success = (group->pollfd >= 0); if (!success) { sql_print_error("io_poll_create() failed, errno=%d\n", errno); - break; } } mysql_mutex_unlock(&all_groups[i].mutex); if (!success) { + change_group_rwlock.xlock(); group_count = i; + change_group_rwlock.unlock(); return; } } + change_group_rwlock.xlock(); group_count = size; + change_group_rwlock.unlock(); } void tp_set_threadpool_stall_limit(uint limit) noexcept { diff --git a/plugin/thread_pool/threadpool_unix.h b/plugin/thread_pool/threadpool_unix.h index 3c561f2da..68b9eb942 100644 --- a/plugin/thread_pool/threadpool_unix.h +++ b/plugin/thread_pool/threadpool_unix.h @@ -24,6 +24,7 @@ #include "threadpool.h" #include "violite.h" #include "numa_affinity_manager.h" +#include "mysql_version.h" #ifdef __linux__ #include -- Gitee