From 639bd901f63d9afdc96e68487b1db4cb1dc0e891 Mon Sep 17 00:00:00 2001 From: liuwei Date: Fri, 10 May 2024 20:24:55 +0800 Subject: [PATCH] add threadpool code for mysql --- plugin/thread_pool/CMakeLists.txt | 26 + plugin/thread_pool/numa_affinity_manager.h | 117 ++++ plugin/thread_pool/threadpool_unix.cc | 613 ++++++++++++++++++++- 3 files changed, 755 insertions(+), 1 deletion(-) create mode 100644 plugin/thread_pool/CMakeLists.txt create mode 100644 plugin/thread_pool/numa_affinity_manager.h diff --git a/plugin/thread_pool/CMakeLists.txt b/plugin/thread_pool/CMakeLists.txt new file mode 100644 index 000000000..35cbdff51 --- /dev/null +++ b/plugin/thread_pool/CMakeLists.txt @@ -0,0 +1,26 @@ +# Copyright (c) 2006, 2010, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 Huawei Technologies Co., Ltd. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +ADD_COMPILE_DEFINITIONS( + COMPILE_DEFINITIONS MYSQL_DYNAMIC_PLUGIN) + +MYSQL_ADD_PLUGIN(thread_pool + threadpool_common.cc + threadpool_unix.cc + MODULE_ONLY + MODULE_OUTPUT_NAME "thread_pool" + ) + diff --git a/plugin/thread_pool/numa_affinity_manager.h b/plugin/thread_pool/numa_affinity_manager.h new file mode 100644 index 000000000..3471d3287 --- /dev/null +++ b/plugin/thread_pool/numa_affinity_manager.h @@ -0,0 +1,117 @@ +/* Copyright (C) 2012 Monty Program Ab + Copyright (C) 2022 Huawei Technologies Co., Ltd + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#ifndef NUMA_AFFINITY_MANAGER_H_ +#define NUMA_AFFINITY_MANAGER_H_ + +#include +#include +#include +#include +#include + +using namespace std; + +class numa_affinity_manager +{ +public: + numa_affinity_manager(){}; + virtual ~numa_affinity_manager(){}; + + bool init() { + initok = false; + cpu_count = get_sys_cpu(); + numa_count = get_sys_numa(); + if (cpu_count <= 0 || numa_count <= 0 || + cpu_count % numa_count != 0) { + return false; + } + + int cpu_per_numa = cpu_count / numa_count; + int start = 0; + numa_cpu_map.clear(); + auto delete_cpumask = [](bitmask *ptr) { + if (ptr != nullptr) { + numa_free_cpumask(ptr); + } + }; + for (int i = 0; i < numa_count; i++) { + auto msk = numa_allocate_cpumask(); + if (msk == nullptr) { + return false; + } + + for (int j = 0; j < cpu_per_numa; j++) { + numa_bitmask_setbit(msk, start + j); + } + numa_cpu_map.emplace_back(msk, delete_cpumask); + start += cpu_per_numa; + } + initok = true; + return true; + } + + bool bind_numa(int group_id) { + if (initok) { + pid_t pid = gettid(); + return (numa_sched_setaffinity( + pid, numa_cpu_map[group_id%numa_cpu_map.size()].get()) == 0); + } + + return false; + } + +protected: + int get_sys_cpu() { + return numa_num_configured_cpus(); + } + + int get_sys_numa() { + return numa_num_configured_nodes(); + } + + pid_t gettid() { + return static_cast(syscall(SYS_gettid)); + } + +public: + void print_cpumask(const string &name, bitmask *msk) { + cout << name << ": "; + for (unsigned int i = 0; i < msk->size; i++) { + if (numa_bitmask_isbitset(msk, i)) { + cout << i << " "; + } + } + cout << endl; + } + void dump() { + cout << "initok: " << initok << endl; + cout << "cpu_count: " << cpu_count << endl; + cout << "numa_count: " << numa_count << endl; + + for (unsigned int i = 0; i < numa_cpu_map.size(); i++) { + string name = "numa_cpu_map[" + to_string(i) + "]"; + print_cpumask(name, numa_cpu_map[i].get()); + } + } + +private: + bool initok{false}; + int cpu_count{0}; + int numa_count{0}; + vector> numa_cpu_map; +}; + +#endif // NUMA_AFFINITY_MANAGER_H_ diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index a34ea4366..7c150e9db 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -1,5 +1,5 @@ /* Copyright (C) 2012 Monty Program Ab - Copyright (C) 2022 Huawei Technologies Co., Ltd + Copyright (C) 2024 Huawei Technologies Co., Ltd This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -1244,3 +1244,614 @@ static connection_t *get_event(worker_thread_t *current_thread, DBUG_RETURN(connection); } + +** + Tells the pool that worker starts waiting on IO, lock, condition, + sleep() or similar. +*/ + +static void wait_begin(thread_group_t *thread_group) noexcept { + DBUG_ENTER("wait_begin"); + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count--; + thread_group->waiting_thread_count++; + + assert(thread_group->active_thread_count >= 0); + assert(thread_group->connection_count > 0); + +#ifdef THREADPOOL_CREATE_THREADS_ON_WAIT + /* In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. */ + if ((thread_group->active_thread_count < (1 + (int)threadpool_oversubscribe)) && + (!queues_are_empty(*thread_group) || !thread_group->listener)) { + /* + Group might stall while this thread waits, thus wake + or create a worker to prevent stall. + */ + wake_or_create_thread(thread_group); + } +#endif + + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + +/** + Tells the pool has finished waiting. +*/ +static void wait_end(thread_group_t *thread_group) noexcept { + DBUG_ENTER("wait_end"); + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count++; + thread_group->waiting_thread_count--; + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + +/** + Allocate/initialize a new connection structure. +*/ + +static connection_t *alloc_connection(THD *thd) noexcept { + DBUG_ENTER("alloc_connection"); + DBUG_EXECUTE_IF("simulate_tp_alloc_connection_oom", DBUG_RETURN(nullptr);); + + connection_t *connection = (connection_t *)my_malloc( + PSI_NOT_INSTRUMENTED /*key_memory_thread_pool_connection*/, + sizeof(connection_t), 0); + if (connection) { + connection->thd = thd; + connection->waiting = false; + connection->logged_in = false; + connection->bound_to_poll_descriptor = false; + connection->abs_wait_timeout = ULLONG_MAX; + connection->tickets = 0; + } + DBUG_RETURN(connection); +} + +/** + Add a new connection to thread pool.. +*/ + +bool tp_add_connection( + Channel_info *channel_info) { + DBUG_ENTER("Thread_pool_connection_handler::add_connection"); + + THD *const thd = channel_info->create_thd(); + + if (unlikely(!thd)) { + channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false); + DBUG_RETURN(true); + } + + connection_t *const connection = alloc_connection(thd); + + if (unlikely(!connection)) { + thd->get_protocol_classic()->end_net(); + delete thd; + // channel will be closed by send_error_and_close_channel() + channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false); + DBUG_RETURN(true); + } + + delete channel_info; + + thd->set_new_thread_id(); + thd->start_utime = my_micro_time(); + + threadpool_thds.insert(connection); + Global_THD_manager::get_instance()->add_thd(thd); + + thd->scheduler.data = connection; + + /* Assign connection to a group. */ + thread_group_t *group = &all_groups[thd->thread_id() % group_count]; + + connection->thread_group = group; + + if (thd->is_admin_connection()) { + my_thread_handle thread_id; + mysql_mutex_lock(&group->mutex); + int err = mysql_thread_create(key_worker_thread, &thread_id, + group->pthread_attr, admin_port_worker_main, connection); + + if (err) { + set_my_errno(errno); + print_pool_blocked_message(false); + } else { + group->admin_port_thread_count++; + } + mysql_mutex_unlock(&group->mutex); + } else { + change_group_rwlock.xlock(); + group->connection_count++; + change_group_rwlock.unxlock(); + + /* + Add connection to the work queue. Actual login + will be done by a worker thread. + */ + queue_put(group, connection); + } + + DBUG_RETURN(false); +} + +/** + Terminate connection. +*/ +static void connection_abort(connection_t *connection) { + DBUG_ENTER("connection_abort"); + threadpool_thds.erase(connection); + + thread_group_t *group = connection->thread_group; + bool is_admin_port = connection->thd->is_admin_connection(); + threadpool_remove_connection(connection->thd); + + if (!is_admin_port) { + change_group_rwlock.xlock(); + group->connection_count--; + change_group_rwlock.unxlock(); + } + + my_free(connection); + DBUG_VOID_RETURN; +} + +/** + Detach connection. +*/ +static void connection_detach(connection_t *connection) { + DBUG_ENTER("connection_detach"); + threadpool_thds.erase(connection); + + thread_group_t *group = connection->thread_group; + bool is_admin_port = connection->thd->is_admin_connection(); + Vio *const vio = connection->thd->get_protocol_classic()->get_vio(); + const int fd = mysql_socket_getfd(vio->mysql_socket); + mysql_mutex_lock(&group->mutex); + io_poll_disassociate_fd(group->pollfd, fd); + connection->bound_to_poll_descriptor = false; + mysql_mutex_unlock(&group->mutex); + + if (!is_admin_port) { + change_group_rwlock.xlock(); + group->connection_count--; + change_group_rwlock.unxlock(); + } + + my_thread_handle thread_id; + + if (mysql_thread_create(key_worker_thread, &thread_id, group->pthread_attr, + connection_detach_worker, connection->thd)) { + threadpool_remove_connection(connection->thd); + } + + my_free(connection); + DBUG_VOID_RETURN; +} + + +static void *connection_detach_worker(void *param) { + my_thread_init(); + DBUG_ENTER("connection_detach_worker"); + THD *thd = static_cast(param); + assert(thd != nullptr); + thread_attach(thd); + + while (1) { + if (threadpool_process_request(thd)) { + break; + } + } + + threadpool_remove_connection(thd); + return nullptr; +} + +/** + MySQL scheduler callback : kill connection +*/ + +void tp_post_kill_notification(THD *thd) noexcept { + DBUG_ENTER("tp_post_kill_notification"); + if (current_thd == thd || thd->system_thread) { + DBUG_VOID_RETURN; + } + + Vio *vio = thd->get_protocol_classic()->get_vio(); + if (vio) vio_cancel(vio, SHUT_RD); + DBUG_VOID_RETURN; +} + +alignas(CPU_LEVEL1_DCACHE_LINESIZE) std::atomic tp_waits[THD_WAIT_LAST]; + +/** + MySQL scheduler callback: wait begin +*/ +void tp_wait_begin(THD *thd, int type MY_ATTRIBUTE((unused))) { + DBUG_ENTER("tp_wait_begin"); + + if (thd == nullptr) { + DBUG_VOID_RETURN; + } + + connection_t *connection = (connection_t *)thd->scheduler.data; + + if (connection && connection->thd && + !connection->thd->is_admin_connection()) { + assert(!connection->waiting); + connection->waiting = true; + assert(type > 0 && type < THD_WAIT_LAST); + tp_waits[type]++; + wait_begin(connection->thread_group); + } + DBUG_VOID_RETURN; +} + +/** + MySQL scheduler callback: wait end +*/ + +void tp_wait_end(THD *thd) { + DBUG_ENTER("tp_wait_end"); + + if (thd == nullptr) { + DBUG_VOID_RETURN; + } + connection_t *connection = (connection_t *)thd->scheduler.data; + + if (connection && connection->thd && + !connection->thd->is_admin_connection()) { + assert(connection->waiting); + connection->waiting = false; + wait_end(connection->thread_group); + } + DBUG_VOID_RETURN; +} + +static void set_next_timeout_check(ulonglong abstime) { + DBUG_ENTER("set_next_timeout_check"); + while (abstime < pool_timer.next_timeout_check.load()) { + uint64 old = pool_timer.next_timeout_check.load(); + pool_timer.next_timeout_check.compare_exchange_weak(old, abstime); + } + DBUG_VOID_RETURN; +} + + + + inline ulong get_wait_timeout(THD *thd) noexcept { + return thd->variables.net_wait_timeout; + } + +/** + Set wait timeout for connection. +*/ + +static void set_wait_timeout(connection_t *c) noexcept { + DBUG_ENTER("set_wait_timeout"); + /* + Calculate wait deadline for this connection. + Instead of using my_microsecond_getsystime() which has a syscall + overhead, use pool_timer.current_microtime and take + into account that its value could be off by at most + one tick interval. + */ + + c->abs_wait_timeout = + pool_timer.current_microtime.load(std::memory_order_relaxed) + + 1000LL * pool_timer.tick_interval + + 1000000LL * get_wait_timeout(c->thd); + + set_next_timeout_check(c->abs_wait_timeout); + DBUG_VOID_RETURN; +} + +static int start_io(connection_t *connection) { + /* + Usually, connection will stay in the same group for the entire + connection's life. However, we do allow group_count to change + at runtime, which means in rare cases when it decreases + connection whose thread group id exceeds group_count need to + migrate to another group. We also support connection number + balancing between groups, which means when variables + threadpool_connection_balance is set to on, connection would + migrate to the thread group with fewer connections. + */ + if (check_change_group(connection)) { + change_group(connection); + } + thread_group_t *group = connection->thread_group; + + /* + Bind to poll descriptor if not yet done. + */ + Vio *vio = connection->thd->get_protocol_classic()->get_vio(); + int fd = mysql_socket_getfd(vio->mysql_socket); + if (!connection->bound_to_poll_descriptor) { + connection->bound_to_poll_descriptor = true; + return io_poll_associate_fd(group->pollfd, fd, connection); + } + + return io_poll_start_read(group->pollfd, fd, connection); +} + +static void handle_event(connection_t *connection) { + DBUG_ENTER("handle_event"); + int err = 0; + + while (1) { + if (!connection->logged_in) { + err = threadpool_add_connection(connection->thd); + connection->logged_in = true; + } else { + err = threadpool_process_request(connection->thd); + } + + if (err) { + goto end; + } + + if (connection->thd == thd_to_detach) { + connection_detach(connection); + goto end_return; + } + + set_wait_timeout(connection); + + if (!connection_is_worker_continue(*connection)) { + break; + } + } + + if (!connection->thd->is_admin_connection()) { + err = start_io(connection); + } + +end: + if (err || connection->thd->is_admin_connection()) { + connection_abort(connection); + } + +end_return: + DBUG_VOID_RETURN; +} + +static void *admin_port_worker_main(void *param) { + my_thread_init(); + DBUG_ENTER("admin_port_worker_main"); + +#ifdef HAVE_PSI_THREAD_INTERFACE + PSI_THREAD_CALL(set_thread_account) + (nullptr, 0, nullptr, 0); +#endif + + connection_t *connection = static_cast(param); + assert(connection != nullptr); + assert(connection->thread_group != nullptr); + thread_group_t *group = connection->thread_group; + + handle_event(connection); + + mysql_mutex_lock(&group->mutex); + group->admin_port_thread_count--; + mysql_mutex_unlock(&group->mutex); + + my_thread_end(); + return nullptr; +} + +/** + Worker thread's main +*/ +static void *worker_main(void *param) { + my_thread_init(); + + DBUG_ENTER("worker_main"); + + thread_group_t *thread_group = static_cast(param); + assert(thread_group != nullptr); + + if (threadpool_sched_affinity) { + group_affinity.bind_numa((thread_group - all_groups)); + } + + /* Init per-thread structure */ + worker_thread_t this_thread; + mysql_cond_init(key_worker_cond, &this_thread.cond); + this_thread.thread_group = thread_group; + this_thread.event_count = 0; + +#ifdef HAVE_PSI_THREAD_INTERFACE + PSI_THREAD_CALL(set_thread_account) + (nullptr, 0, nullptr, 0); +#endif + + /* Run event loop */ + for (;;) { + struct timespec ts; + set_timespec(&ts, threadpool_idle_timeout); + connection_t *connection = get_event(&this_thread, thread_group, &ts); + + if (!connection) { + break; + } + + this_thread.event_count++; + handle_event(connection); + } + + /* Thread shutdown: cleanup per-worker-thread structure. */ + mysql_cond_destroy(&this_thread.cond); + + bool last_thread = false; /* last thread in group exits */ + mysql_mutex_lock(&thread_group->mutex); + add_thread_count(thread_group, -1); + last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown); + mysql_mutex_unlock(&thread_group->mutex); + + /* Last thread in group exits and pool is terminating, destroy group.*/ + if (last_thread) { + thread_group_destroy(thread_group); + } + + my_thread_end(); + return nullptr; +} + +bool tp_init() { + DBUG_ENTER("tp_init"); + threadpool_started = true; + group_affinity.init(); + + for (uint i = 0; i < array_elements(all_groups); i++) { + thread_group_init(&all_groups[i], get_connection_attrib()); + } + tp_set_threadpool_size(threadpool_size); + if (group_count == 0) { + /* Something went wrong */ + sql_print_error("Can't set threadpool size to %d", threadpool_size); + DBUG_RETURN(true); + } +#ifdef HAVE_PSI_INTERFACE + mysql_mutex_register("threadpool", mutex_list, array_elements(mutex_list)); + mysql_cond_register("threadpool", cond_list, array_elements(cond_list)); + mysql_thread_register("threadpool", thread_list, array_elements(thread_list)); +#endif + + pool_timer.tick_interval = threadpool_stall_limit; + start_timer(&pool_timer); + DBUG_RETURN(false); +} + +void tp_end_thread() { + if (!threadpool_started) { + return; + } + + while (!threadpool_thds.empty()) { + my_sleep(10000); + } + + stop_timer(&pool_timer); + + for (uint i = 0; i < array_elements(all_groups); i++) { + thread_group_close(&all_groups[i]); + } + + threadpool_started = false; +} + +void tp_end() { + DBUG_ENTER("tp_end"); + threadpool_thds.killConns(); + + std::thread exit_tp(tp_end_thread); + exit_tp.detach(); + DBUG_VOID_RETURN; +} + +/** Ensure that poll descriptors are created when threadpool_size changes */ +void tp_set_threadpool_size(uint size) noexcept { + if (!threadpool_started) return; + + bool success = true; + for (uint i = 0; i < size; i++) { + thread_group_t *group = &all_groups[i]; + mysql_mutex_lock(&group->mutex); + if (group->pollfd == -1) { + group->pollfd = io_poll_create(); + success = (group->pollfd >= 0); + if (!success) { + sql_print_error("io_poll_create() failed, errno=%d\n", errno); + break; + } + } + mysql_mutex_unlock(&all_groups[i].mutex); + if (!success) { + group_count = i; + return; + } + } + group_count = size; +} + +void tp_set_threadpool_stall_limit(uint limit) noexcept { + if (!threadpool_started) { + return; + } + + mysql_mutex_lock(&(pool_timer.mutex)); + pool_timer.tick_interval = limit; + mysql_mutex_unlock(&(pool_timer.mutex)); + mysql_cond_signal(&(pool_timer.cond)); +} + +/** + Calculate number of idle/waiting threads in the pool. + + Sum idle threads over all groups. + Don't do any locking, it is not required for stats. +*/ +int tp_get_idle_thread_count() noexcept { + int sum = 0; + for (uint i = 0; + i < array_elements(all_groups) && (all_groups[i].pollfd >= 0); i++) { + sum += (all_groups[i].thread_count - all_groups[i].active_thread_count); + } + return sum; +} + +/* Report threadpool problems */ + +/** + Delay in microseconds, after which "pool blocked" message is printed. + (30 sec == 30 Mio usec) +*/ +#define BLOCK_MSG_DELAY 30 * 1000000 + +#define MAX_THREADS_REACHED_MSG \ + "Threadpool could not create additional thread to handle queries, because the \ +number of allowed threads was reached. Increasing 'thread_pool_max_threads' \ +parameter can help in this situation.\n \ +If 'admin_port' parameter is set, you can still connect to the database with \ +superuser account (it must be TCP connection using admin_port as TCP port) \ +and troubleshoot the situation. \ +A likely cause of pool blocks are clients that lock resources for long time. \ +'show processlist' or 'show engine innodb status' can give additional hints." + +#define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)." + +/** + Write a message when blocking situation in threadpool occurs. + The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds. + It will be just a single message for each blocking situation (to prevent + log flood). +*/ +static void print_pool_blocked_message(bool max_threads_reached) noexcept { + ulonglong now = my_microsecond_getsystime(); + static bool msg_written = false; + + if (pool_block_start == 0) { + pool_block_start = now; + msg_written = false; + } + + if (!msg_written && ((now > pool_block_start + BLOCK_MSG_DELAY) || + (now == pool_block_start))) { + if (max_threads_reached) + sql_print_error(MAX_THREADS_REACHED_MSG); + else + sql_print_error(CREATE_THREAD_ERROR_MSG, my_errno); + + if (now > pool_block_start) { + sql_print_information("Threadpool has been blocked for %u seconds\n", + (uint)((now - pool_block_start) / 1000000)); + } + /* avoid reperated messages for the same blocking situation */ + msg_written = true; + } +} -- Gitee