diff --git a/plugin/thread_pool/CMakeLists.txt b/plugin/thread_pool/CMakeLists.txt index 94f3caa0f9bb299feebf39475e202093ae3d10e6..71e77f9665457a01f4873aadbcba76f0ee905089 100644 --- a/plugin/thread_pool/CMakeLists.txt +++ b/plugin/thread_pool/CMakeLists.txt @@ -17,11 +17,13 @@ ADD_COMPILE_DEFINITIONS( COMPILE_DEFINITIONS MYSQL_DYNAMIC_PLUGIN) + MYSQL_ADD_PLUGIN(thread_pool + numa_affinity_manager.cc threadpool_common.cc threadpool_unix.cc - numa_affinity_manager.cc + LINK_LIBRARIES + numa MODULE_ONLY MODULE_OUTPUT_NAME "thread_pool" ) - diff --git a/plugin/thread_pool/numa_affinity_manager.cc b/plugin/thread_pool/numa_affinity_manager.cc index 1f0576407bd6735dbff3d8dc17db6aeca7174a0c..de1b1996969637e24c77a31a9376a503a4487b2b 100644 --- a/plugin/thread_pool/numa_affinity_manager.cc +++ b/plugin/thread_pool/numa_affinity_manager.cc @@ -1,389 +1,345 @@ -/* Copyright (C) 2012 Monty Program Ab - Copyright (C) 2023 Huawei Technologies Co., Ltd - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include "numa_affinity_manager.h" -#include "sql/log.h" -#include -#include -#include -#include -#include -#include - -using namespace std; - -struct bitmask *numa_bitmask = NULL; - -static struct bitmask *numa_bitmask_mem = NULL; -static int max_conf_nodes = -1; -static int node_possible = 0; -static int cpu_possible = 0; - -#define BIT_LENGTH_LONG (8 * sizeof(unsigned long)) -#define BIT_LENGTH_LONG_N(n) (((n)+((BIT_LENGTH_LONG)-1))/(BIT_LENGTH_LONG)) - -static const char *system_dir = "/sys/devices/system/node"; -static const char *node_str = "node"; -static const int node_size = 4; -static const char *file_mask_size = "/proc/self/status"; -static const char *node_mask_comment = "Mems_allowed:\t"; -static const int node_possible_max = 4096 * 8; -static const char *mem_total = "MemTotal"; -static const char *mem_free = "MemFree"; -static const int length_init = 4096; -static const int length_max = 1024 * 1024; - -long long numa_mem_size(int node, long long *file); -int numa_sched_getaffinity(pid_t pid, struct bitmask *mask); -void numa_node_bm_free(struct bitmask *bmp); -struct bitmask *numa_bm_alloc(void); - -void numa_end(void) -{ - numa_node_bm_free(numa_bitmask_mem); - numa_node_bm_free(numa_bitmask); -} - -static unsigned int get_bmp_bit(const struct bitmask *b, unsigned int num) -{ - unsigned int ret = 0; - if (b->size > num) { - unsigned long cur_mask = b->mask_cpu[num / BIT_LENGTH_LONG]; - unsigned int cur_bit = num % BIT_LENGTH_LONG; - ret = (cur_mask >> cur_bit) & 1; - } - return ret; -} - -static void set_bmp_bit(struct bitmask *b, unsigned int num, unsigned int v) -{ - if (b->size > num) { - unsigned long &cur_mask = b->mask_cpu[num / BIT_LENGTH_LONG]; - unsigned int cur_bit = num % BIT_LENGTH_LONG; - if (v) - cur_mask |= 1UL << cur_bit; - else - cur_mask &= ~(1UL << cur_bit); - } -} - -struct bitmask *bitmask_alloc(unsigned int num) -{ - if (num < 1) { - errno = EINVAL; - sql_print_error("The number is invalid"); - return nullptr; - } - - struct bitmask *b = (struct bitmask *)malloc(sizeof(*b)); - if (b == 0) { - sql_print_error("Allocation failed, out of memory"); - return nullptr; - } - b->size = num; - b->mask_cpu = (unsigned long *)calloc(BIT_LENGTH_LONG_N(num), sizeof(unsigned long)); - if (b->mask_cpu == 0) { - free(b); - sql_print_error("Allocation failed, out of memory"); - return nullptr; - } - return b; -} - -void numa_node_bm_free(struct bitmask *b) { - if (b != nullptr) { - free(b->mask_cpu); - b->mask_cpu = (unsigned long *)0xdeadcdef; - free(b); - } - return; -} - -static void set_conf_nd(void) -{ - long long fp; - struct dirent *dp; - - numa_bitmask_mem = numa_bm_alloc(); - numa_bitmask = numa_bm_alloc(); - - DIR *dir = opendir(system_dir); - if (dir == nullptr) { - max_conf_nodes = 0; - return; - } - - while ((dp = readdir(dir)) != NULL) { - if (strncmp(dp->d_name, node_str, node_size) == 0) { - int node = strtoul(dp->d_name + node_size, NULL, 0); - set_bmp_bit(numa_bitmask, node, 1); - if (numa_mem_size(node, &fp) > 0) { - set_bmp_bit(numa_bitmask_mem, node, 1); - } - max_conf_nodes = max_conf_nodes < node ? node : max_conf_nodes; - } - } - closedir(dir); -} - -static inline int is_letter_digit(char c) -{ - return (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F') || (c >= '0' && c <= '9'); -} - -long __attribute__((weak)) get_mempolicy(int *policy, unsigned long *nmask, - unsigned long maxnode, void *addr, unsigned flags) -{ - return syscall(__NR_get_mempolicy, policy, nmask, maxnode, addr, flags); -} - -static void set_node_possible(void) -{ - unsigned long *m = NULL; - int p; - int len = 0; - size_t buf_size = 0; - FILE *file; - char *buffer = NULL; - char *buffer_tmp = NULL; - - if ((file = fopen(file_mask_size, "r")) == NULL) { - if (node_possible == 0) { - node_possible = 16; - do { - node_possible <<= 1; - if ((m = (unsigned long *)realloc(m, node_possible / 8)) == 0) { - return; - } - } while (get_mempolicy(&p, m, node_possible + 1, 0, 0) < 0 && - node_possible_max > node_possible && errno == EINVAL); - free(m); - } - return; - } - - while (getline(&buffer, &buf_size, file) > 0) { - if (strncmp(buffer, node_mask_comment, strlen(node_mask_comment)) == 0) { - buffer_tmp = buffer; - buffer_tmp += strlen(node_mask_comment); - while (*buffer_tmp != '\n' && *buffer_tmp != '\0') { - if (is_letter_digit(*buffer_tmp)) { - len++; - } - buffer_tmp++; - } - node_possible = len * 4; - } - } - free(buffer); - fclose(file); - return; -} - -struct bitmask *numa_bm_alloc(void) -{ - return bitmask_alloc(node_possible); -} - -void mem_total_free(char *string, char *str, long long &size, int &state, long long *file) { - char *final; - if (strstr(string, mem_total)) { - size = strtoull(str, &final, 0) << 10; - if (final != str) { - state++; - } else { - size = -1; - } - } - if (file && strstr(string, mem_free)) { - *file = strtoull(str, &final, 0) << 10; - if (final != str) { - state++; - } else { - *file = -1; - } - } -} - -long long numa_mem_size(int nd, long long *file) -{ - size_t length = 0; - char *string = nullptr; - long long size = -1; - char fd[64]; - int state = 0; - int req = file ? 2 : 1; - - if (file) { - *file = -1; - } - sprintf(fd, "/sys/devices/system/node/node%d/meminfo", nd); - FILE *f = fopen(fd, "r"); - if (f == nullptr) { - return -1; - } - - while (getdelim(&string, &length, '\n', f) > 0) { - char *str = strcasestr(string, "kB"); - if (!str) { - continue; - } - --str; - while (isspace(*str) && str > string) { - --str; - } - while (isdigit(*str) && str > string) { - --str; - } - - mem_total_free(string, str, size, state, file); - } - if (state != req) { - sql_print_warning("Parse sysfs mem info failed, (%d)", state); - } - free(string); - fclose(f); - - return size; -} - -static void set_cpu_possible(void) -{ - struct bitmask *buf; - int olderr = errno; - int length = length_init; - int num; - - do { - buf = bitmask_alloc(length); - num = numa_sched_getaffinity(0, buf); - if (num < 0) { - if (errno != EINVAL) { - sql_print_warning("Determine max cpu failed (sched_getaffinity: %s)", - strerror(errno)); - num = sizeof(cpu_set_t); - break; - } else { - if (length < length_max) { - length *= 2; - numa_node_bm_free(buf); - } else { - break; - } - } - } - } while (num < 0); - numa_node_bm_free(buf); - errno = olderr; - cpu_possible = num * 8; -} - -int numa_num_configured_nodes(void) -{ - int count_node_mem=0; - - for (int i=0; i <= max_conf_nodes; i++) { - if (get_bmp_bit(numa_bitmask_mem, i)) - count_node_mem++; - } - return count_node_mem; -} - -struct bitmask *numa_allocate_cpumask() -{ - return bitmask_alloc(cpu_possible); -} - -int numa_sched_setaffinity(pid_t pid, struct bitmask *bmp) -{ - return syscall(__NR_sched_setaffinity, pid, BIT_LENGTH_LONG_N(bmp->size) * sizeof(unsigned long), bmp->mask_cpu); -} - -int numa_sched_getaffinity(pid_t pid, struct bitmask *bmp) -{ - return syscall(__NR_sched_getaffinity, pid, BIT_LENGTH_LONG_N(bmp->size) * sizeof(unsigned long), bmp->mask_cpu); -} - -bool numa_affinity_manager::init() { - initok = false; - - if (get_sys_cpu() != get_sys_cpu_only()) { - return false; - } - - set_node_possible(); - set_conf_nd(); - set_cpu_possible(); - - cpu_count = get_sys_cpu(); - numa_count = get_sys_numa(); - if (cpu_count <= 0 || numa_count <= 0 || cpu_count % numa_count != 0) { - return false; - } - - int cpu_per_numa = cpu_count / numa_count; - int start = 0; - numa_cpu_map.clear(); - auto del_cpu_mask = [](bitmask *p) { - if (p != nullptr) { - numa_node_bm_free(p); - } - }; - for (int i = 0; i < numa_count; i++) { - auto msk = numa_allocate_cpumask(); - if (msk == nullptr) { - return false; - } - - for (int j = 0; j < cpu_per_numa; j++) { - set_bmp_bit(msk, start + j, 1); - } - numa_cpu_map.emplace_back(msk, del_cpu_mask); - start += cpu_per_numa; - } - initok = true; - return true; -} - -bool numa_affinity_manager::bind_numa(int group_id) { - if (initok) { - pid_t pid = gettid(); - return (numa_sched_setaffinity( - pid, numa_cpu_map[group_id%numa_cpu_map.size()].get()) == 0); - } - - return false; -} - -void numa_affinity_manager::print_cpumask(const string &name, bitmask *msk) { - cout << name << ": "; - for (unsigned int i = 0; i < msk->size; i++) { - if (get_bmp_bit(msk, i)) { - cout << i << " "; - } - } - cout << endl; -} -void numa_affinity_manager::dump() { - cout << "initok: " << initok << endl; - cout << "cpu_count: " << cpu_count << endl; - cout << "numa_count: " << numa_count << endl; - - for (unsigned int i = 0; i < numa_cpu_map.size(); i++) { - string name = "numa_cpu_map[" + to_string(i) + "]"; - print_cpumask(name, numa_cpu_map[i].get()); - } -} \ No newline at end of file +/* Copyright (C) 2012 Monty Program Ab +Copyright (C) 2023 Huawei Technologies Co., Ltd + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "numa_affinity_manager.h" +#include "sql/log.h" // need remove +#include "threadpool.h" +#include +#include +#include +#include +#include +#include + +using namespace std; + +static const std::string THREAD_NAME_LOG_WRITER = "thread/innodb/log_writer_thread"; +static const std::string THREAD_NAME_LOG_FLUSHER = "thread/innodb/log_flusher_thread"; +static const std::string THREAD_NAME_LOG_WRITER_NTF = "thread/innodb/log_write_notifier_thread"; +static const std::string THREAD_NAME_LOG_FLUSHER_NTF = "thread/innodb/log_flush_notifier_thread"; +static const std::string THREAD_NAME_CHECK_POINTER = "thread/innodb/log_checkpointer_thread"; +static const std::string THREAD_NAME_PURGE = "thread/innodb/srv_purge_thread"; + +numa_affinity_manager::numa_affinity_manager() +{ +} + +numa_affinity_manager::~numa_affinity_manager() +{ +} + +bool numa_affinity_manager::init() { + XLockGuard lk(lock); + initok = false; + if (!init_foreground_cpu_map()) { + return false; + } + bind_background_threads(); + initok = true; + bind_background_ok = false; // no purge threads, need call try bind again later + print_state(); + return true; +} + +bool numa_affinity_manager::init_foreground_cpu_map() { + cpu_count = get_sys_cpu(); + numa_count = get_sys_numa(); + if (cpu_count <= 0 || numa_count <= 0 || + cpu_count % numa_count != 0) { + return false; + } + + int cpu_per_numa = cpu_count / numa_count; + int start = 0; + foreground_cpu_map.clear(); + foreground_cpu_map_opts = parse_cpustring(threadpool_sched_affinity_foreground_thread); + + // 所有的core 要包含在配置的前端线程范围 是否排除后端线程的绑核由前端线程配置决定 + for (int i = 0; i < numa_count; i++) { + auto msk = numa_allocate_cpumask(); + if (msk == nullptr) { + return false; + } + + for (int j = 0; j < cpu_per_numa; j++) { + if (foreground_cpu_map_opts == nullptr || + numa_bitmask_isbitset(foreground_cpu_map_opts.get(), start + j)) { + numa_bitmask_setbit(msk, start + j); + } + } + foreground_cpu_map.emplace_back(msk, free_cpumask_func); + start += cpu_per_numa; + } + + return true; +} + +bool numa_affinity_manager::bind_foreground_thread(int group_id) { + // for bind the purge thread + if (!bind_background_ok) { + XLockGuard lk(lock); + if (!bind_background_ok) { + bind_background_threads(); + bind_background_ok = true; + print_state(); + } + } + + SLockGuard lk(lock); + if (initok) { + my_thread_os_id_t pid = my_thread_os_id(); + bitmask *msk = foreground_cpu_map[group_id%foreground_cpu_map.size()].get(); + return (bind_thread(pid, msk) == 0); + } + return false; +} + +void numa_affinity_manager::update_bind_background_threads() { + XLockGuard lk(lock); + bind_background_threads(); + print_state(); +} + +void numa_affinity_manager::print_state() { +#ifndef NDEBUG + // assert has own lock of s or x + std::cout << "initok: " << initok << std::endl; + std::cout << "cpu_count: " << cpu_count << std::endl; + std::cout << "numa_count: " << numa_count << std::endl; + + for (unsigned int i = 0; i < foreground_cpu_map.size(); i++) { + std::string name = "foreground_cpu_map[" + std::to_string(i) + "]"; + std::cout << name << ": " << cpumask_to_string(foreground_cpu_map[i].get()) << std::endl; + } + + for (auto &pi : background_threads) { + for (auto &it : pi.second) { + std::cout << "back-ground thread " << pi.first << " : " << it << std::endl; + } + } + + std::string log_writer_cpustring = threadpool_sched_affinity_log_writer == nullptr? "" : threadpool_sched_affinity_log_writer; + std::string log_flusher_cpustring = threadpool_sched_affinity_log_flusher == nullptr? "" : threadpool_sched_affinity_log_flusher; + std::string log_write_notifier_cpustring = threadpool_sched_affinity_log_write_notifier == nullptr? "" : threadpool_sched_affinity_log_write_notifier; + std::string log_flush_notifier_cpustring = threadpool_sched_affinity_log_flush_notifier == nullptr? "" : threadpool_sched_affinity_log_flush_notifier; + std::string log_checkpointer_cpustring = threadpool_sched_affinity_log_checkpointer == nullptr? "" : threadpool_sched_affinity_log_checkpointer; + std::string purge_coordinator_cpustring = threadpool_sched_affinity_purge_coordinator == nullptr? "" : threadpool_sched_affinity_purge_coordinator; + + std::cout << "thread_pool_sched_affinity_log_writer: " << log_writer_cpustring << std::endl; + std::cout << "thread_pool_sched_affinity_log_flusher: " << log_flusher_cpustring << std::endl; + std::cout << "thread_pool_sched_affinity_log_write_notifier: " << log_write_notifier_cpustring << std::endl; + std::cout << "thread_pool_sched_affinity_log_flush_notifier: " << log_flush_notifier_cpustring << std::endl; + std::cout << "thread_pool_sched_affinity_log_checkpointer: " << log_checkpointer_cpustring << std::endl; + std::cout << "thread_pool_sched_affinity_purge_coordinator: " << purge_coordinator_cpustring << std::endl; +#endif +} + +void numa_affinity_manager::free_cpumask_func(bitmask *ptr) { + if (ptr != nullptr) { + numa_bitmask_free(ptr); + } +} + +int numa_affinity_manager::get_sys_cpu() { + return sysconf(_SC_NPROCESSORS_CONF); +} + +int numa_affinity_manager::get_sys_cpu_only() { + return sysconf(_SC_NPROCESSORS_ONLN); +} + +int numa_affinity_manager::get_sys_numa() { + return numa_num_configured_nodes(); +} + +bool numa_affinity_manager::bind_thread(my_thread_os_id_t pid, bitmask *msk) { + bool ret = false; + int i = 0; + for (i = 0; i < 10; i++) { + if (numa_sched_setaffinity(pid, msk) == 0) { + ret = true; + break; + } + + std::this_thread::yield(); + } + std::cout << "bind thread " << (ret? "succeed":"failed") << ": " + << pid << " - " << cpumask_to_string(msk) << std::endl; + return ret; +} + +bool numa_affinity_manager::check_cpustring(const char *opt) { + if (opt == nullptr) { + return true; + } + std::string out; + if (!normalize_cpustring(opt, out)) { + return false; + } + std::shared_ptr ret(numa_parse_cpustring(out.c_str())); + return ret.get() != nullptr; +} + +const std::string numa_affinity_manager::cpumask_to_string(bitmask *msk) { + std::string ret; + for (unsigned int i = 0; i < msk->size; i++) { + if (numa_bitmask_isbitset(msk, i)) { + ret += std::to_string(i) + " "; + } + } + return ret; +} + +bool numa_affinity_manager::normalize_cpustring(const std::string &cpu_string, std::string &out) { + std::string normalized_cpu_string; + bool invalid_cpu_string = false; + const int INVALID_CORE_ID = -1; + int core_id = INVALID_CORE_ID; + for (auto c : cpu_string) { + switch (c) { + case ' ': + break; + case '-': + case ',': + if (core_id == INVALID_CORE_ID) { + invalid_cpu_string = true; + } else { + normalized_cpu_string += std::to_string(core_id); + normalized_cpu_string += c; + core_id = INVALID_CORE_ID; + } + break; + case '0' ... '9': + if (core_id == INVALID_CORE_ID) { + core_id = (c - '0'); + } else { + core_id = core_id * 10 + (c - '0'); + } + break; + default: + invalid_cpu_string = true; + break; + } + if (invalid_cpu_string) { + break; + } + } + if (core_id != INVALID_CORE_ID) { + normalized_cpu_string += std::to_string(core_id); + } + if (!normalized_cpu_string.empty() && + (*normalized_cpu_string.rbegin() == '-' || + *normalized_cpu_string.rbegin() == ',')) { + invalid_cpu_string = true; + } + if (invalid_cpu_string) { + out = ""; + return false; + } + out = normalized_cpu_string; + return true; +} + +std::shared_ptr numa_affinity_manager::parse_cpustring(const char *opt) { + std::shared_ptr ret(nullptr, free_cpumask_func); + if (opt == nullptr || (std::string(opt)).empty()) { + return ret; + } + + std::string out; + if (normalize_cpustring(opt, out)) { + ret.reset(numa_parse_cpustring(out.c_str()), free_cpumask_func); + } + return ret; +} + +void numa_affinity_manager::apply_background_options() { + for (auto &pi : background_threads) { + auto cpu_map = background_cpu_map_opts.find(pi.first); + if (cpu_map == background_cpu_map_opts.end() || cpu_map->second == nullptr) { + continue; + } + for (auto it : pi.second) { + bind_thread(it, cpu_map->second.get()); + } + } +} + +void numa_affinity_manager::fetch_background_options() { + background_cpu_map_opts.clear(); + background_cpu_map_opts[THREAD_NAME_LOG_WRITER] = parse_cpustring(threadpool_sched_affinity_log_writer); + background_cpu_map_opts[THREAD_NAME_LOG_FLUSHER] = parse_cpustring(threadpool_sched_affinity_log_flusher); + background_cpu_map_opts[THREAD_NAME_LOG_WRITER_NTF] = parse_cpustring(threadpool_sched_affinity_log_write_notifier); + background_cpu_map_opts[THREAD_NAME_LOG_FLUSHER_NTF] = parse_cpustring(threadpool_sched_affinity_log_flush_notifier); + background_cpu_map_opts[THREAD_NAME_CHECK_POINTER] = parse_cpustring(threadpool_sched_affinity_log_checkpointer); + background_cpu_map_opts[THREAD_NAME_PURGE] = parse_cpustring(threadpool_sched_affinity_purge_coordinator); +} + +void numa_affinity_manager::fetch_background_threads() { + PFS_simple_index pos(0); /** Current position. */ + PFS_simple_index next_pos(0); /** Next position. */ + pfs_optimistic_state lock; + pfs_optimistic_state session_lock; + + background_threads.clear(); + background_threads[THREAD_NAME_LOG_WRITER] = std::vector(); + background_threads[THREAD_NAME_LOG_FLUSHER] = std::vector(); + background_threads[THREAD_NAME_LOG_WRITER_NTF] = std::vector(); + background_threads[THREAD_NAME_LOG_FLUSHER_NTF] = std::vector(); + background_threads[THREAD_NAME_CHECK_POINTER] = std::vector(); + background_threads[THREAD_NAME_PURGE] = std::vector(); + + while (1) { + pos.set_at(&next_pos); + PFS_thread_iterator it = global_thread_container.iterate(pos.m_index); + PFS_thread *pfs = it.scan_next(&pos.m_index); + + if (pfs == nullptr) { + break; + } + + next_pos.set_after(&pos); + + pfs->m_lock.begin_optimistic_lock(&lock); /* Protect this reader against thread termination */ + pfs->m_session_lock.begin_optimistic_lock(&session_lock); /* Protect this reader against session attribute changes */ + + PFS_thread_class *safe_class = sanitize_thread_class(pfs->m_class); + if (unlikely(safe_class == nullptr)) { + pfs->m_session_lock.end_optimistic_lock(&session_lock); + pfs->m_lock.end_optimistic_lock(&lock); + // HA_ERR_RECORD_DELETED + continue; + } + + std::cout << safe_class->m_name << " " << pfs->m_thread_os_id << std::endl; + auto itp = background_threads.find(safe_class->m_name); + if (itp != background_threads.end()) { + itp->second.push_back(pfs->m_thread_os_id); + } + + pfs->m_session_lock.end_optimistic_lock(&session_lock); + pfs->m_lock.end_optimistic_lock(&lock); + } +} + +void numa_affinity_manager::bind_background_threads() +{ + fetch_background_threads(); + fetch_background_options(); + if (threadpool_sched_affinity) { + apply_background_options(); + } +} diff --git a/plugin/thread_pool/numa_affinity_manager.h b/plugin/thread_pool/numa_affinity_manager.h index b1d1033c8b3e78846242f5158acd820c94dbbf8e..c139a641f5684a60d7befd159af769612f05becd 100644 --- a/plugin/thread_pool/numa_affinity_manager.h +++ b/plugin/thread_pool/numa_affinity_manager.h @@ -18,59 +18,52 @@ #include #include -#include #include #include -#include "sys/syscall.h" - -using namespace std; - -extern struct bitmask *numa_bitmask; - -void numa_end(void); -int numa_num_configured_nodes(void); - -struct bitmask { - unsigned long size; - unsigned long *mask_cpu; -}; +#include +#include "storage/perfschema/pfs_buffer_container.h" +#include "storage/perfschema/pfs_engine_table.h" +#include "threadpool_rwlock.h" class numa_affinity_manager { public: - numa_affinity_manager(){} - virtual ~numa_affinity_manager(){} + numa_affinity_manager(); + virtual ~numa_affinity_manager(); bool init(); - bool bind_numa(int group_id); + bool init_foreground_cpu_map(); + bool bind_foreground_thread(int group_id); + void update_bind_background_threads(); + void print_state(); + + static void free_cpumask_func(bitmask *ptr); + static int get_sys_cpu(); + static int get_sys_cpu_only(); + static int get_sys_numa(); + static bool bind_thread(my_thread_os_id_t id, bitmask *msk); + static bool check_cpustring(const char *opt); + static const std::string cpumask_to_string(bitmask *msk); + static bool normalize_cpustring(const std::string &cpu_string, std::string &out); + static std::shared_ptr parse_cpustring(const char *opt); protected: - int get_sys_cpu() { - return sysconf(_SC_NPROCESSORS_CONF); - } + void apply_background_options(); + void fetch_background_options(); + void fetch_background_threads(); + void bind_background_threads(); - int get_sys_cpu_only() { - return sysconf(_SC_NPROCESSORS_ONLN); - } - - int get_sys_numa() { - return numa_num_configured_nodes(); - } - - pid_t gettid() { - return static_cast(syscall(SYS_gettid)); - } - -public: - void print_cpumask(const string &name, bitmask *msk) ; - void dump(); private: + thread_pool_rwlock_t lock; bool initok{false}; int cpu_count{0}; int numa_count{0}; - - vector> numa_cpu_map; + std::vector > foreground_cpu_map; + std::shared_ptr foreground_cpu_map_opts; + std::map > background_cpu_map_opts; + bool bind_background_ok{false}; + std::map > background_threads; }; #endif // NUMA_AFFINITY_MANAGER_H_ diff --git a/plugin/thread_pool/threadpool.h b/plugin/thread_pool/threadpool.h index 979bed0c50c17e5da60241e5346bc7ac7373ff9d..e2c58bf55d21704b9afc5a976c6f9f5b864d6fa0 100644 --- a/plugin/thread_pool/threadpool.h +++ b/plugin/thread_pool/threadpool.h @@ -20,7 +20,7 @@ #include "sql/mysqld_thd_manager.h" #include "sql/conn_handler/connection_handler_manager.h" #include "sql/conn_handler/channel_info.h" -#include "pthread.h" +#include "threadpool_rwlock.h" struct SHOW_VAR; @@ -38,13 +38,21 @@ enum tp_high_prio_mode_t { extern uint threadpool_idle_timeout; /* Shutdown idle worker threads after this timeout */ extern bool threadpool_dedicated_listener; /* Control whether listener be dedicated */ extern uint threadpool_size; /* Number of parallel executing threads */ -extern bool threadpool_sched_affinity; /* Control whether thread group scheduling affinity */ extern uint threadpool_max_threads; extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/ extern uint threadpool_oversubscribe; /* Maximum active threads in group */ extern uint threadpool_toobusy; /* Maximum active and waiting threads in group */ extern bool threadpool_connection_balance; /* Control whether conncetions migrating to another thread group so that they are evenly distributed */ +extern bool threadpool_sched_affinity; /* Control whether thread group scheduling affinity */ +extern char *threadpool_sched_affinity_foreground_thread; +extern char *threadpool_sched_affinity_log_writer; +extern char *threadpool_sched_affinity_log_flusher; +extern char *threadpool_sched_affinity_log_write_notifier; +extern char *threadpool_sched_affinity_log_flush_notifier; +extern char *threadpool_sched_affinity_log_checkpointer; +extern char *threadpool_sched_affinity_purge_coordinator; + /* Possible values for thread_pool_high_prio_mode */ extern const char *threadpool_high_prio_mode_names[]; @@ -87,86 +95,6 @@ extern void tp_set_threadpool_stall_limit(uint val) noexcept; extern uint tp_get_thdvar_high_prio_tickets(THD *thd); extern uint tp_get_thdvar_high_prio_mode(THD *thd); -class thread_pool_rwlock_t { -public: - thread_pool_rwlock_t() - { - pthread_rwlock_init(&lk, NULL); - } - - ~thread_pool_rwlock_t() - { - pthread_rwlock_destroy(&lk); - } - - void slock() - { - while (pthread_rwlock_rdlock(&lk) != 0) { - std::this_thread::yield(); - } - } - - void unslock() - { - while (pthread_rwlock_unlock(&lk) != 0) { - std::this_thread::yield(); - } - } - - void xlock() - { - while (pthread_rwlock_wrlock(&lk) != 0) { - std::this_thread::yield(); - } - } - - void unxlock() - { - while (pthread_rwlock_unlock(&lk) != 0) { - std::this_thread::yield(); - } - } - - void lock() - { - xlock(); - } - - void unlock() - { - unxlock(); - } - -private: - pthread_rwlock_t lk; -}; - -class SLockGuard { -public: - SLockGuard(thread_pool_rwlock_t &lk): lck(lk) { - lck.slock(); - } - - ~SLockGuard() { - lck.unslock(); - } - - thread_pool_rwlock_t &lck; -}; - -class XLockGuard { -public: - XLockGuard(thread_pool_rwlock_t &lk): lck(lk) { - lck.xlock(); - } - - ~XLockGuard() { - lck.unxlock(); - } - - thread_pool_rwlock_t &lck; -}; - extern st_plugin_int *gPluginPtr; extern thread_pool_rwlock_t gPluginLock; extern bool gPluginUninstalling; diff --git a/plugin/thread_pool/threadpool_common.cc b/plugin/thread_pool/threadpool_common.cc index 1e59f5c8472f7b4be032129af30ad59beef5831d..9084b2a3d478f190813c1aaa9f03a8872a624610 100644 --- a/plugin/thread_pool/threadpool_common.cc +++ b/plugin/thread_pool/threadpool_common.cc @@ -40,13 +40,21 @@ uint threadpool_idle_timeout; bool threadpool_dedicated_listener; uint threadpool_size; -bool threadpool_sched_affinity; uint threadpool_stall_limit; uint threadpool_max_threads; uint threadpool_oversubscribe; uint threadpool_toobusy; bool threadpool_connection_balance; +bool threadpool_sched_affinity = false; +char *threadpool_sched_affinity_foreground_thread = nullptr; +char *threadpool_sched_affinity_log_writer = nullptr; +char *threadpool_sched_affinity_log_flusher = nullptr; +char *threadpool_sched_affinity_log_write_notifier = nullptr; +char *threadpool_sched_affinity_log_flush_notifier = nullptr; +char *threadpool_sched_affinity_log_checkpointer = nullptr; +char *threadpool_sched_affinity_purge_coordinator = nullptr; + extern thread_pool_rwlock_t change_group_rwlock; /* Stats */ @@ -344,11 +352,6 @@ static MYSQL_SYSVAR_UINT(size, threadpool_size, "executing threads (threads in a waiting state do not count as executing).", NULL, fix_threadpool_size, (uint)my_getncpus(), 1, MAX_THREAD_GROUPS, 1); -static MYSQL_SYSVAR_BOOL(sched_affinity, threadpool_sched_affinity, - PLUGIN_VAR_RQCMDARG, - "Control whether thread group scheduling affinity.", nullptr, - nullptr, false); - static MYSQL_SYSVAR_UINT(stall_limit, threadpool_stall_limit, PLUGIN_VAR_RQCMDARG, "Maximum query execution time in milliseconds," @@ -368,6 +371,98 @@ PLUGIN_VAR_RQCMDARG, "so that they are evenly distributed.", nullptr, fix_threadpool_connection_balance, false); +static int check_fix_sched_affinity_cpustring(MYSQL_THD thd, SYS_VAR *, void *save, + struct st_mysql_value *value) { + std::string tmp(1024, '\0'); + char *buff = const_cast(tmp.data()); + const char *str = nullptr; + (*(const char **)save) = nullptr; + int length = tmp.size(); + if ((str = value->val_str(value, buff, &length))) { + str = thd->strmake(str, length); + } + + // char *c = var->save_result.string_value.str; + if (str != nullptr && !numa_affinity_manager::check_cpustring(str)) { + // my_error("Invalid cpu string %s.", MYF(0), c); + return 1; + } + + *(const char **)save = str; + return 0; +} + +static void fix_sched_affinity_cpustring(MYSQL_THD, SYS_VAR *, void *var_ptr, const void *save) { + const char *new_option_val = *static_cast(save); + *static_cast(var_ptr) = new_option_val; + group_affinity.update_bind_background_threads(); +} + +static void fix_sched_affinity(MYSQL_THD, SYS_VAR *, void *var_ptr, const void *save) { + bool newval = *static_cast(save); + *static_cast(var_ptr) = newval; + if (threadpool_sched_affinity) { + group_affinity.init(); + } +} + +static MYSQL_SYSVAR_BOOL(sched_affinity, threadpool_sched_affinity, + PLUGIN_VAR_RQCMDARG, + "Control whether thread group scheduling affinity.", + nullptr, // check func + fix_sched_affinity, // update func + false); + +static MYSQL_SYSVAR_STR(sched_affinity_foreground_thread, threadpool_sched_affinity_foreground_thread, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "The set of cpus which foreground threads will run on.", + check_fix_sched_affinity_cpustring, // check func + fix_sched_affinity_cpustring, // update func + nullptr); // default value + +static MYSQL_SYSVAR_STR(sched_affinity_log_writer, threadpool_sched_affinity_log_writer, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "log_writer", + check_fix_sched_affinity_cpustring, // check func + fix_sched_affinity_cpustring, // update func + nullptr); // default value + +static MYSQL_SYSVAR_STR(sched_affinity_log_flusher, threadpool_sched_affinity_log_flusher, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "log_flusher", + check_fix_sched_affinity_cpustring, // check func + fix_sched_affinity_cpustring, // update func + nullptr); // default value + +static MYSQL_SYSVAR_STR(sched_affinity_log_write_notifier, threadpool_sched_affinity_log_write_notifier, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "log_write_notifier", + check_fix_sched_affinity_cpustring, // check func + fix_sched_affinity_cpustring, // update func + nullptr); // default value + +static MYSQL_SYSVAR_STR(sched_affinity_log_flush_notifier, threadpool_sched_affinity_log_flush_notifier, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "log_flush_notifier", + check_fix_sched_affinity_cpustring, // check func + fix_sched_affinity_cpustring, // update func + nullptr); // default value + +static MYSQL_SYSVAR_STR(sched_affinity_log_checkpointer, threadpool_sched_affinity_log_checkpointer, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "log_checkpointer", + check_fix_sched_affinity_cpustring, // check func + fix_sched_affinity_cpustring, // update func + nullptr); // default value + +static MYSQL_SYSVAR_STR(sched_affinity_purge_coordinator, threadpool_sched_affinity_purge_coordinator, + PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC, + "purge_coordinator", + check_fix_sched_affinity_cpustring, // check func + fix_sched_affinity_cpustring, // update func + nullptr); // default value + + static bool tptarget(void) { unsigned long long cpuId; @@ -448,18 +543,25 @@ static MYSQL_THDVAR_ENUM(high_prio_mode, static uint &idle_timeout = threadpool_idle_timeout; static bool &dedicated_listener = threadpool_dedicated_listener; static uint &size = threadpool_size; -static bool &sched_affinity = threadpool_sched_affinity; static uint &stall_limit = threadpool_stall_limit; static uint &max_threads = threadpool_max_threads; static uint &oversubscribe = threadpool_oversubscribe; static uint &toobusy = threadpool_toobusy; static bool &connection_balance = threadpool_connection_balance; +static auto &sched_affinity = threadpool_sched_affinity; +static auto &sched_affinity_foreground_thread = threadpool_sched_affinity_foreground_thread; +static auto &sched_affinity_log_writer = threadpool_sched_affinity_log_writer; +static auto &sched_affinity_log_flusher = threadpool_sched_affinity_log_flusher; +static auto &sched_affinity_log_write_notifier = threadpool_sched_affinity_log_write_notifier; +static auto &sched_affinity_log_flush_notifier = threadpool_sched_affinity_log_flush_notifier; +static auto &sched_affinity_log_checkpointer = threadpool_sched_affinity_log_checkpointer; +static auto &sched_affinity_purge_coordinator = threadpool_sched_affinity_purge_coordinator; + SYS_VAR *system_variables[] = { MYSQL_SYSVAR(idle_timeout), MYSQL_SYSVAR(dedicated_listener), MYSQL_SYSVAR(size), - MYSQL_SYSVAR(sched_affinity), MYSQL_SYSVAR(max_threads), MYSQL_SYSVAR(stall_limit), MYSQL_SYSVAR(oversubscribe), @@ -467,6 +569,14 @@ SYS_VAR *system_variables[] = { MYSQL_SYSVAR(high_prio_tickets), MYSQL_SYSVAR(connection_balance), MYSQL_SYSVAR(high_prio_mode), + MYSQL_SYSVAR(sched_affinity), + MYSQL_SYSVAR(sched_affinity_foreground_thread), + MYSQL_SYSVAR(sched_affinity_log_writer), + MYSQL_SYSVAR(sched_affinity_log_flusher), + MYSQL_SYSVAR(sched_affinity_log_write_notifier), + MYSQL_SYSVAR(sched_affinity_log_flush_notifier), + MYSQL_SYSVAR(sched_affinity_log_checkpointer), + MYSQL_SYSVAR(sched_affinity_purge_coordinator), NULL }; diff --git a/plugin/thread_pool/threadpool_rwlock.h b/plugin/thread_pool/threadpool_rwlock.h new file mode 100644 index 0000000000000000000000000000000000000000..e07b37983b2ecca88d24c5479c5b347b4e5bc449 --- /dev/null +++ b/plugin/thread_pool/threadpool_rwlock.h @@ -0,0 +1,101 @@ +/* Copyright (C) 2012 Monty Program Ab + Copyright (C) 2022 Huawei Technologies Co., Ltd + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#ifndef THREADPOOL_RWLOCK_H_ +#define THREADPOOL_RWLOCK_H_ + +#include "pthread.h" + +class thread_pool_rwlock_t { +public: + thread_pool_rwlock_t() + { + pthread_rwlock_init(&lk, NULL); + } + + ~thread_pool_rwlock_t() + { + pthread_rwlock_destroy(&lk); + } + + void slock() + { + while (pthread_rwlock_rdlock(&lk) != 0) { + std::this_thread::yield(); + } + } + + void unslock() + { + while (pthread_rwlock_unlock(&lk) != 0) { + std::this_thread::yield(); + } + } + + void xlock() + { + while (pthread_rwlock_wrlock(&lk) != 0) { + std::this_thread::yield(); + } + } + + void unxlock() + { + while (pthread_rwlock_unlock(&lk) != 0) { + std::this_thread::yield(); + } + } + + void lock() + { + xlock(); + } + + void unlock() + { + unxlock(); + } + +private: + pthread_rwlock_t lk; +}; + +class SLockGuard { +public: + SLockGuard(thread_pool_rwlock_t &lk): lck(lk) { + lck.slock(); + } + + ~SLockGuard() { + lck.unslock(); + } + + thread_pool_rwlock_t &lck; +}; + +class XLockGuard { +public: + XLockGuard(thread_pool_rwlock_t &lk): lck(lk) { + lck.xlock(); + } + + ~XLockGuard() { + lck.unxlock(); + } + + thread_pool_rwlock_t &lck; +}; + +#endif //THREADPOOL_RWLOCK_H_ diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index 80795b511ac11e2bd44201943ce577d4603b0d34..236013823563f498bc238bfbda356a2c19d39952 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -396,7 +396,7 @@ int change_group(connection_t *c, thread_group_t *group, thread_group_t *to_grou mysql_mutex_unlock(&to_group->mutex); if (threadpool_sched_affinity) { - group_affinity.bind_numa(to_group - all_groups); + group_affinity.bind_foreground_thread(to_group - all_groups); } return ret; @@ -1628,7 +1628,7 @@ static void *worker_main(void *param) { assert(thread_group != nullptr); if (threadpool_sched_affinity) { - group_affinity.bind_numa(thread_group - all_groups); + group_affinity.bind_foreground_thread(thread_group - all_groups); } /* Init per-thread structure */ @@ -1736,7 +1736,6 @@ void tp_end() { } else { tp_end_thread = new std::thread(tp_end_func); } - numa_end(); DBUG_VOID_RETURN; }