diff --git a/mysql-test/suite/thread_pool/r/threadpool_pool_size.result b/mysql-test/suite/thread_pool/r/threadpool_pool_size.result index 5baad219e38613ec0a379846a33121ced70a1361..48ef7a5e2e4f2f7e4c55b954a5d164402dd7cc5b 100644 --- a/mysql-test/suite/thread_pool/r/threadpool_pool_size.result +++ b/mysql-test/suite/thread_pool/r/threadpool_pool_size.result @@ -3,13 +3,13 @@ SELECT benchmark(9999999999, md5('very long command 1')); SELECT benchmark(9999999999, md5('very long command 2')); SELECT COUNT(Id) FROM INFORMATION_SCHEMA.processlist where Info like 'SELECT benchmark(9999999999, md5(\'very long command%'; COUNT(Id) -1 +2 SELECT SUM(CONNECTIONS) FROM INFORMATION_SCHEMA.THREAD_POOL_GROUPS; SUM(CONNECTIONS) 3 SELECT SUM(ACTIVE_THREADS) FROM INFORMATION_SCHEMA.THREAD_POOL_GROUPS; SUM(ACTIVE_THREADS) -1 +3 SELECT SUM(STANDBY_THREADS) FROM INFORMATION_SCHEMA.THREAD_POOL_GROUPS; SUM(STANDBY_THREADS) 0 diff --git a/mysql-test/suite/thread_pool/r/threadpool_too_busy_active.result b/mysql-test/suite/thread_pool/r/threadpool_too_busy_active.result index 8236aceefb0b93c43442a5844b3169794ef9ea60..1eb619bd164bb4223d1dde1b28f2924d736659b4 100644 --- a/mysql-test/suite/thread_pool/r/threadpool_too_busy_active.result +++ b/mysql-test/suite/thread_pool/r/threadpool_too_busy_active.result @@ -24,4 +24,4 @@ COUNT(*) 1 SELECT SUM(THREAD_CREATIONS_DUE_TO_STALL) FROM INFORMATION_SCHEMA.THREAD_POOL_STATS; SUM(THREAD_CREATIONS_DUE_TO_STALL) -2 +1 diff --git a/mysql-test/suite/thread_pool/r/threadpool_too_busy_wait-debug.result b/mysql-test/suite/thread_pool/r/threadpool_too_busy_wait-debug.result index 0ab38cc29e788a3627d26be09b0ed39393339362..fc137b59193c27bc1f6d2b0c050b2b8b11365a46 100644 --- a/mysql-test/suite/thread_pool/r/threadpool_too_busy_wait-debug.result +++ b/mysql-test/suite/thread_pool/r/threadpool_too_busy_wait-debug.result @@ -34,7 +34,7 @@ SUM(CONNECTIONS) 6 SELECT SUM(ACTIVE_THREADS) FROM INFORMATION_SCHEMA.THREAD_POOL_GROUPS; SUM(ACTIVE_THREADS) -0 +1 SELECT SUM(STANDBY_THREADS) FROM INFORMATION_SCHEMA.THREAD_POOL_GROUPS; SUM(STANDBY_THREADS) 2 diff --git a/mysql-test/suite/thread_pool/r/threadpool_too_busy_wait.result b/mysql-test/suite/thread_pool/r/threadpool_too_busy_wait.result index db380758435c87caf14e83ef76febccb47ba644c..1c4daa9faf9a1f9f453068ee1c78f254296e350c 100644 --- a/mysql-test/suite/thread_pool/r/threadpool_too_busy_wait.result +++ b/mysql-test/suite/thread_pool/r/threadpool_too_busy_wait.result @@ -24,7 +24,7 @@ SUM(CONNECTIONS) 5 SELECT SUM(ACTIVE_THREADS) FROM INFORMATION_SCHEMA.THREAD_POOL_GROUPS; SUM(ACTIVE_THREADS) -0 +1 SELECT SUM(STANDBY_THREADS) FROM INFORMATION_SCHEMA.THREAD_POOL_GROUPS; SUM(STANDBY_THREADS) 2 diff --git a/mysql-test/suite/thread_pool/t/threadpool_pool_size.test b/mysql-test/suite/thread_pool/t/threadpool_pool_size.test index ce57634df04500acd33c5f01e17690928662fc53..30aff1b3879563c2bb6322ac50af4cb9e917a508 100644 --- a/mysql-test/suite/thread_pool/t/threadpool_pool_size.test +++ b/mysql-test/suite/thread_pool/t/threadpool_pool_size.test @@ -1,6 +1,6 @@ # Restart with plugin-load-add=thread_pool.so -# Test that each thread group will have only one active thread if neither any stall or wait happens, no matter what's the number of cores available underneath. +# Test that each thread group will have one or more active threads if neither any stall or wait happens, no matter what's the number of cores available underneath. # Here thread_pool_stall_limit is set to a large value, to make those "very long command" treated as not stalled command. # Calculate value for admin port diff --git a/plugin/thread_pool/numa_affinity_manager.h b/plugin/thread_pool/numa_affinity_manager.h new file mode 100644 index 0000000000000000000000000000000000000000..3471d3287369b6f8900544c9a30485570101c915 --- /dev/null +++ b/plugin/thread_pool/numa_affinity_manager.h @@ -0,0 +1,117 @@ +/* Copyright (C) 2012 Monty Program Ab + Copyright (C) 2022 Huawei Technologies Co., Ltd + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#ifndef NUMA_AFFINITY_MANAGER_H_ +#define NUMA_AFFINITY_MANAGER_H_ + +#include +#include +#include +#include +#include + +using namespace std; + +class numa_affinity_manager +{ +public: + numa_affinity_manager(){}; + virtual ~numa_affinity_manager(){}; + + bool init() { + initok = false; + cpu_count = get_sys_cpu(); + numa_count = get_sys_numa(); + if (cpu_count <= 0 || numa_count <= 0 || + cpu_count % numa_count != 0) { + return false; + } + + int cpu_per_numa = cpu_count / numa_count; + int start = 0; + numa_cpu_map.clear(); + auto delete_cpumask = [](bitmask *ptr) { + if (ptr != nullptr) { + numa_free_cpumask(ptr); + } + }; + for (int i = 0; i < numa_count; i++) { + auto msk = numa_allocate_cpumask(); + if (msk == nullptr) { + return false; + } + + for (int j = 0; j < cpu_per_numa; j++) { + numa_bitmask_setbit(msk, start + j); + } + numa_cpu_map.emplace_back(msk, delete_cpumask); + start += cpu_per_numa; + } + initok = true; + return true; + } + + bool bind_numa(int group_id) { + if (initok) { + pid_t pid = gettid(); + return (numa_sched_setaffinity( + pid, numa_cpu_map[group_id%numa_cpu_map.size()].get()) == 0); + } + + return false; + } + +protected: + int get_sys_cpu() { + return numa_num_configured_cpus(); + } + + int get_sys_numa() { + return numa_num_configured_nodes(); + } + + pid_t gettid() { + return static_cast(syscall(SYS_gettid)); + } + +public: + void print_cpumask(const string &name, bitmask *msk) { + cout << name << ": "; + for (unsigned int i = 0; i < msk->size; i++) { + if (numa_bitmask_isbitset(msk, i)) { + cout << i << " "; + } + } + cout << endl; + } + void dump() { + cout << "initok: " << initok << endl; + cout << "cpu_count: " << cpu_count << endl; + cout << "numa_count: " << numa_count << endl; + + for (unsigned int i = 0; i < numa_cpu_map.size(); i++) { + string name = "numa_cpu_map[" + to_string(i) + "]"; + print_cpumask(name, numa_cpu_map[i].get()); + } + } + +private: + bool initok{false}; + int cpu_count{0}; + int numa_count{0}; + vector> numa_cpu_map; +}; + +#endif // NUMA_AFFINITY_MANAGER_H_ diff --git a/plugin/thread_pool/threadpool.h b/plugin/thread_pool/threadpool.h index 38fdee21deec1126d777abedde1e1374106cc145..f4dd68dc8a9528d14b4b549d3a79b80fd9555b2f 100644 --- a/plugin/thread_pool/threadpool.h +++ b/plugin/thread_pool/threadpool.h @@ -23,7 +23,7 @@ struct SHOW_VAR; -#define MAX_THREAD_GROUPS 128 +#define MAX_THREAD_GROUPS 1024 #define MAX_CONNECTIONS 100000 @@ -35,7 +35,9 @@ enum tp_high_prio_mode_t { /* Threadpool parameters */ extern uint threadpool_idle_timeout; /* Shutdown idle worker threads after this timeout */ +extern bool threadpool_dedicated_listener; /* Control whether listener be dedicated */ extern uint threadpool_size; /* Number of parallel executing threads */ +extern bool threadpool_sched_affinity; /* Control whether thread group scheduling affinity */ extern uint threadpool_max_threads; extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/ extern uint threadpool_oversubscribe; /* Maximum active threads in group */ diff --git a/plugin/thread_pool/threadpool_common.cc b/plugin/thread_pool/threadpool_common.cc index a86fcad243fa1bb11348021b3d29ee374f3ad22a..00595fc4b3f1e02d2f92ed4a3f7c33cd29d5527b 100644 --- a/plugin/thread_pool/threadpool_common.cc +++ b/plugin/thread_pool/threadpool_common.cc @@ -38,7 +38,9 @@ /* Threadpool parameters */ uint threadpool_idle_timeout; +bool threadpool_dedicated_listener; uint threadpool_size; +bool threadpool_sched_affinity; uint threadpool_stall_limit; uint threadpool_max_threads; uint threadpool_oversubscribe; @@ -315,6 +317,11 @@ static MYSQL_SYSVAR_UINT(toobusy, threadpool_toobusy, "How many additional active and waiting worker threads in a group are allowed.", NULL, NULL, 13, 1, 1000, 1); +static MYSQL_SYSVAR_BOOL(dedicated_listener, threadpool_dedicated_listener, + PLUGIN_VAR_RQCMDARG, + "Control whether listener be dedicated", nullptr, + nullptr, false); + static MYSQL_SYSVAR_UINT(size, threadpool_size, PLUGIN_VAR_RQCMDARG, "Number of thread groups in the pool. " @@ -322,6 +329,11 @@ static MYSQL_SYSVAR_UINT(size, threadpool_size, "executing threads (threads in a waiting state do not count as executing).", NULL, fix_threadpool_size, (uint)my_getncpus(), 1, MAX_THREAD_GROUPS, 1); +static MYSQL_SYSVAR_BOOL(sched_affinity, threadpool_sched_affinity, + PLUGIN_VAR_RQCMDARG, + "Control whether thread group scheduling affinity.", nullptr, + nullptr, false); + static MYSQL_SYSVAR_UINT(stall_limit, threadpool_stall_limit, PLUGIN_VAR_RQCMDARG, "Maximum query execution time in milliseconds," @@ -378,7 +390,9 @@ static MYSQL_THDVAR_ENUM(high_prio_mode, NULL, NULL, TP_HIGH_PRIO_MODE_TRANSACTIONS, &threadpool_high_prio_mode_typelib); static uint &idle_timeout = threadpool_idle_timeout; +static bool &dedicated_listener = threadpool_dedicated_listener; static uint &size = threadpool_size; +static bool &sched_affinity = threadpool_sched_affinity; static uint &stall_limit = threadpool_stall_limit; static uint &max_threads = threadpool_max_threads; static uint &oversubscribe = threadpool_oversubscribe; @@ -386,7 +400,9 @@ static uint &toobusy = threadpool_toobusy; SYS_VAR *system_variables[] = { MYSQL_SYSVAR(idle_timeout), + MYSQL_SYSVAR(dedicated_listener), MYSQL_SYSVAR(size), + MYSQL_SYSVAR(sched_affinity), MYSQL_SYSVAR(max_threads), MYSQL_SYSVAR(stall_limit), MYSQL_SYSVAR(oversubscribe), diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index b0a2a61a70b269c6bad511c6e3ddcef7831c9a84..a9fdf3dbfcd8978abf65402d72b585922d602ded 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -65,6 +65,8 @@ static PSI_thread_info thread_list[] = { #endif // HAVE_PSI_INTERFACE thread_group_t all_groups[MAX_THREAD_GROUPS]; +numa_affinity_manager group_affinity; + static uint group_count; /** @@ -328,6 +330,10 @@ inline bool too_many_busy_threads(const thread_group_t &thread_group) noexcept { 1 + (int)threadpool_toobusy); } +inline bool too_many_connection(const thread_group_t &thread_group) noexcept { + return (thread_group.connection_count > (int)threadpool_toobusy - 1); +} + /* Checks if a given connection is eligible to enter the high priority queue based on its current thread_pool_high_prio_mode value, available high @@ -347,6 +353,31 @@ inline bool connection_is_high_prio(const connection_t &c) noexcept { c.thd->mdl_context.has_locks(MDL_key::LOCKING_SERVICE))); } +inline bool connection_is_worker_continue(const connection_t &c) noexcept { + if (c.thd->is_admin_connection()) { + return true; + } + + if (c.thread_group != &all_groups[c.thd->thread_id() % group_count]) { + return false; + } + + if (!too_many_connection(*(c.thread_group))) { + return true; + } + + const ulong mode = tp_get_thdvar_high_prio_mode(c.thd); + bool ret = (mode == TP_HIGH_PRIO_MODE_TRANSACTIONS && c.tickets > 0 && + (thd_is_transaction_active(c.thd) || + c.thd->variables.option_bits & OPTION_TABLE_LOCK || + c.thd->locked_tables_mode != LTM_NONE || + c.thd->mdl_context.has_locks() || + c.thd->global_read_lock.is_acquired() || + c.thd->mdl_context.has_locks(MDL_key::USER_LEVEL_LOCK) || + c.thd->mdl_context.has_locks(MDL_key::LOCKING_SERVICE))); + return ret; +} + } // namespace /* Dequeue element from a workqueue */ @@ -666,9 +697,8 @@ static connection_t *listener(thread_group_t *thread_group) { more workers. */ - const bool listener_picks_event = - thread_group->high_prio_queue.is_empty() && - thread_group->queue.is_empty(); + const bool listener_picks_event = threadpool_dedicated_listener? false : + (thread_group->high_prio_queue.is_empty() && thread_group->queue.is_empty()); /* If listener_picks_event is set, listener thread will handle first event, @@ -694,31 +724,40 @@ static connection_t *listener(thread_group_t *thread_group) { break; } - if (thread_group->active_thread_count == 0) { + /* The remaining threads can be created at most */ + int workers_in_need = (int)threadpool_toobusy - + thread_group->active_thread_count - thread_group->waiting_thread_count; + + /* There are no remaining threads and the thread group is stalled */ + if (workers_in_need <= 0 && thread_group->active_thread_count == 0) { + workers_in_need = 1; + } + + /* The number of threads that can be created and + the number of threads that are really needed, whichever is smaller */ + workers_in_need = workers_in_need > cnt ? cnt : workers_in_need; + + /* Wake up or create the required threads */ + for (int i = 0; i < workers_in_need; i++) { /* We added some work items to queue, now wake a worker. */ if (wake_thread(thread_group, false)) { /* Wake failed, hence groups has no idle threads. Now check if there are any threads in the group except listener. + In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. + The queue is not empty, and listener is not going to handle + events. In order to drain the queue, we create a worker here. + Alternatively, we could just rely on timer to detect stall, and + create thread, but waiting for timer would be an inefficient and + pointless delay. */ - if (thread_group->thread_count == 1) { - /* - Currently there is no worker thread in the group, as indicated by - thread_count == 1 (this means listener is the only one thread in - the group). - The queue is not empty, and listener is not going to handle - events. In order to drain the queue, we create a worker here. - Alternatively, we could just rely on timer to detect stall, and - create thread, but waiting for timer would be an inefficient and - pointless delay. - */ - create_worker(thread_group, false); - } + create_worker(thread_group, false); } } mysql_mutex_unlock(&thread_group->mutex); } - DBUG_RETURN(retval); } @@ -829,10 +868,14 @@ static int wake_or_create_thread(thread_group_t *thread_group, if (thread_group->thread_count > thread_group->connection_count) DBUG_RETURN(-1); - if (thread_group->active_thread_count == 0) { + /* In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. */ + if (thread_group->active_thread_count < + (1 + (int)threadpool_oversubscribe)) { /* - We're better off creating a new thread here with no delay, either there - are no workers at all, or they all are all blocking and there was no + We're better off creating a new thread here with no delay, either there + are not enough active workers, or they all are all blocking and there was no idle thread to wakeup. Smells like a potential deadlock or very slowly executing requests, e.g sleeps or user locks. */ @@ -963,7 +1006,11 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection) { queue_push(thread_group, connection); - if (thread_group->active_thread_count == 0) { + /* In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. */ + if (thread_group->active_thread_count < + 1 + (int)threadpool_oversubscribe) { wake_or_create_thread(thread_group, false); } @@ -1114,7 +1161,10 @@ static void wait_begin(thread_group_t *thread_group) noexcept { assert(thread_group->connection_count > 0); #ifdef THREADPOOL_CREATE_THREADS_ON_WAIT - if ((thread_group->active_thread_count == 0) && + /* In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. */ + if ((thread_group->active_thread_count < (1 + (int)threadpool_oversubscribe)) && (!queues_are_empty(*thread_group) || !thread_group->listener)) { /* Group might stall while this thread waits, thus wake @@ -1489,7 +1539,7 @@ static void handle_event(connection_t *connection) { set_wait_timeout(connection); - if (!connection->thd->is_admin_connection()) { + if (!connection_is_worker_continue(*connection)) { break; } } @@ -1542,6 +1592,10 @@ static void *worker_main(void *param) { thread_group_t *thread_group = static_cast(param); assert(thread_group != nullptr); + if (threadpool_sched_affinity) { + group_affinity.bind_numa((thread_group - all_groups) / sizeof(thread_group_t)); + } + /* Init per-thread structure */ worker_thread_t this_thread; mysql_cond_init(key_worker_cond, &this_thread.cond); @@ -1588,6 +1642,7 @@ static void *worker_main(void *param) { bool tp_init() { DBUG_ENTER("tp_init"); threadpool_started = true; + group_affinity.init(); for (uint i = 0; i < array_elements(all_groups); i++) { thread_group_init(&all_groups[i], get_connection_attrib()); diff --git a/plugin/thread_pool/threadpool_unix.h b/plugin/thread_pool/threadpool_unix.h index e9a27baa04a9c4775daa83f82c5acc952d7ee48e..3c561f2da75484170dd86029185be7e786973c93 100644 --- a/plugin/thread_pool/threadpool_unix.h +++ b/plugin/thread_pool/threadpool_unix.h @@ -23,6 +23,7 @@ #include "sql/mysqld.h" #include "threadpool.h" #include "violite.h" +#include "numa_affinity_manager.h" #ifdef __linux__ #include @@ -128,6 +129,7 @@ static_assert(sizeof(thread_group_t) == 512, #define TP_INCREMENT_GROUP_COUNTER(group, var) do {group->counters.var++;}while(0) extern thread_group_t all_groups[MAX_THREAD_GROUPS]; +extern numa_affinity_manager group_affinity; #endif // THREADPOOL_UNIX_H_