diff --git a/plugin/thread_pool/CMakeLists.txt b/plugin/thread_pool/CMakeLists.txt index 35cbdff51401d4000f09c5e0ba59261cc4f7c33d..94f3caa0f9bb299feebf39475e202093ae3d10e6 100644 --- a/plugin/thread_pool/CMakeLists.txt +++ b/plugin/thread_pool/CMakeLists.txt @@ -20,6 +20,7 @@ ADD_COMPILE_DEFINITIONS( MYSQL_ADD_PLUGIN(thread_pool threadpool_common.cc threadpool_unix.cc + numa_affinity_manager.cc MODULE_ONLY MODULE_OUTPUT_NAME "thread_pool" ) diff --git a/plugin/thread_pool/numa_affinity_manager.cc b/plugin/thread_pool/numa_affinity_manager.cc new file mode 100644 index 0000000000000000000000000000000000000000..1f0576407bd6735dbff3d8dc17db6aeca7174a0c --- /dev/null +++ b/plugin/thread_pool/numa_affinity_manager.cc @@ -0,0 +1,389 @@ +/* Copyright (C) 2012 Monty Program Ab + Copyright (C) 2023 Huawei Technologies Co., Ltd + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "numa_affinity_manager.h" +#include "sql/log.h" +#include +#include +#include +#include +#include +#include + +using namespace std; + +struct bitmask *numa_bitmask = NULL; + +static struct bitmask *numa_bitmask_mem = NULL; +static int max_conf_nodes = -1; +static int node_possible = 0; +static int cpu_possible = 0; + +#define BIT_LENGTH_LONG (8 * sizeof(unsigned long)) +#define BIT_LENGTH_LONG_N(n) (((n)+((BIT_LENGTH_LONG)-1))/(BIT_LENGTH_LONG)) + +static const char *system_dir = "/sys/devices/system/node"; +static const char *node_str = "node"; +static const int node_size = 4; +static const char *file_mask_size = "/proc/self/status"; +static const char *node_mask_comment = "Mems_allowed:\t"; +static const int node_possible_max = 4096 * 8; +static const char *mem_total = "MemTotal"; +static const char *mem_free = "MemFree"; +static const int length_init = 4096; +static const int length_max = 1024 * 1024; + +long long numa_mem_size(int node, long long *file); +int numa_sched_getaffinity(pid_t pid, struct bitmask *mask); +void numa_node_bm_free(struct bitmask *bmp); +struct bitmask *numa_bm_alloc(void); + +void numa_end(void) +{ + numa_node_bm_free(numa_bitmask_mem); + numa_node_bm_free(numa_bitmask); +} + +static unsigned int get_bmp_bit(const struct bitmask *b, unsigned int num) +{ + unsigned int ret = 0; + if (b->size > num) { + unsigned long cur_mask = b->mask_cpu[num / BIT_LENGTH_LONG]; + unsigned int cur_bit = num % BIT_LENGTH_LONG; + ret = (cur_mask >> cur_bit) & 1; + } + return ret; +} + +static void set_bmp_bit(struct bitmask *b, unsigned int num, unsigned int v) +{ + if (b->size > num) { + unsigned long &cur_mask = b->mask_cpu[num / BIT_LENGTH_LONG]; + unsigned int cur_bit = num % BIT_LENGTH_LONG; + if (v) + cur_mask |= 1UL << cur_bit; + else + cur_mask &= ~(1UL << cur_bit); + } +} + +struct bitmask *bitmask_alloc(unsigned int num) +{ + if (num < 1) { + errno = EINVAL; + sql_print_error("The number is invalid"); + return nullptr; + } + + struct bitmask *b = (struct bitmask *)malloc(sizeof(*b)); + if (b == 0) { + sql_print_error("Allocation failed, out of memory"); + return nullptr; + } + b->size = num; + b->mask_cpu = (unsigned long *)calloc(BIT_LENGTH_LONG_N(num), sizeof(unsigned long)); + if (b->mask_cpu == 0) { + free(b); + sql_print_error("Allocation failed, out of memory"); + return nullptr; + } + return b; +} + +void numa_node_bm_free(struct bitmask *b) { + if (b != nullptr) { + free(b->mask_cpu); + b->mask_cpu = (unsigned long *)0xdeadcdef; + free(b); + } + return; +} + +static void set_conf_nd(void) +{ + long long fp; + struct dirent *dp; + + numa_bitmask_mem = numa_bm_alloc(); + numa_bitmask = numa_bm_alloc(); + + DIR *dir = opendir(system_dir); + if (dir == nullptr) { + max_conf_nodes = 0; + return; + } + + while ((dp = readdir(dir)) != NULL) { + if (strncmp(dp->d_name, node_str, node_size) == 0) { + int node = strtoul(dp->d_name + node_size, NULL, 0); + set_bmp_bit(numa_bitmask, node, 1); + if (numa_mem_size(node, &fp) > 0) { + set_bmp_bit(numa_bitmask_mem, node, 1); + } + max_conf_nodes = max_conf_nodes < node ? node : max_conf_nodes; + } + } + closedir(dir); +} + +static inline int is_letter_digit(char c) +{ + return (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F') || (c >= '0' && c <= '9'); +} + +long __attribute__((weak)) get_mempolicy(int *policy, unsigned long *nmask, + unsigned long maxnode, void *addr, unsigned flags) +{ + return syscall(__NR_get_mempolicy, policy, nmask, maxnode, addr, flags); +} + +static void set_node_possible(void) +{ + unsigned long *m = NULL; + int p; + int len = 0; + size_t buf_size = 0; + FILE *file; + char *buffer = NULL; + char *buffer_tmp = NULL; + + if ((file = fopen(file_mask_size, "r")) == NULL) { + if (node_possible == 0) { + node_possible = 16; + do { + node_possible <<= 1; + if ((m = (unsigned long *)realloc(m, node_possible / 8)) == 0) { + return; + } + } while (get_mempolicy(&p, m, node_possible + 1, 0, 0) < 0 && + node_possible_max > node_possible && errno == EINVAL); + free(m); + } + return; + } + + while (getline(&buffer, &buf_size, file) > 0) { + if (strncmp(buffer, node_mask_comment, strlen(node_mask_comment)) == 0) { + buffer_tmp = buffer; + buffer_tmp += strlen(node_mask_comment); + while (*buffer_tmp != '\n' && *buffer_tmp != '\0') { + if (is_letter_digit(*buffer_tmp)) { + len++; + } + buffer_tmp++; + } + node_possible = len * 4; + } + } + free(buffer); + fclose(file); + return; +} + +struct bitmask *numa_bm_alloc(void) +{ + return bitmask_alloc(node_possible); +} + +void mem_total_free(char *string, char *str, long long &size, int &state, long long *file) { + char *final; + if (strstr(string, mem_total)) { + size = strtoull(str, &final, 0) << 10; + if (final != str) { + state++; + } else { + size = -1; + } + } + if (file && strstr(string, mem_free)) { + *file = strtoull(str, &final, 0) << 10; + if (final != str) { + state++; + } else { + *file = -1; + } + } +} + +long long numa_mem_size(int nd, long long *file) +{ + size_t length = 0; + char *string = nullptr; + long long size = -1; + char fd[64]; + int state = 0; + int req = file ? 2 : 1; + + if (file) { + *file = -1; + } + sprintf(fd, "/sys/devices/system/node/node%d/meminfo", nd); + FILE *f = fopen(fd, "r"); + if (f == nullptr) { + return -1; + } + + while (getdelim(&string, &length, '\n', f) > 0) { + char *str = strcasestr(string, "kB"); + if (!str) { + continue; + } + --str; + while (isspace(*str) && str > string) { + --str; + } + while (isdigit(*str) && str > string) { + --str; + } + + mem_total_free(string, str, size, state, file); + } + if (state != req) { + sql_print_warning("Parse sysfs mem info failed, (%d)", state); + } + free(string); + fclose(f); + + return size; +} + +static void set_cpu_possible(void) +{ + struct bitmask *buf; + int olderr = errno; + int length = length_init; + int num; + + do { + buf = bitmask_alloc(length); + num = numa_sched_getaffinity(0, buf); + if (num < 0) { + if (errno != EINVAL) { + sql_print_warning("Determine max cpu failed (sched_getaffinity: %s)", + strerror(errno)); + num = sizeof(cpu_set_t); + break; + } else { + if (length < length_max) { + length *= 2; + numa_node_bm_free(buf); + } else { + break; + } + } + } + } while (num < 0); + numa_node_bm_free(buf); + errno = olderr; + cpu_possible = num * 8; +} + +int numa_num_configured_nodes(void) +{ + int count_node_mem=0; + + for (int i=0; i <= max_conf_nodes; i++) { + if (get_bmp_bit(numa_bitmask_mem, i)) + count_node_mem++; + } + return count_node_mem; +} + +struct bitmask *numa_allocate_cpumask() +{ + return bitmask_alloc(cpu_possible); +} + +int numa_sched_setaffinity(pid_t pid, struct bitmask *bmp) +{ + return syscall(__NR_sched_setaffinity, pid, BIT_LENGTH_LONG_N(bmp->size) * sizeof(unsigned long), bmp->mask_cpu); +} + +int numa_sched_getaffinity(pid_t pid, struct bitmask *bmp) +{ + return syscall(__NR_sched_getaffinity, pid, BIT_LENGTH_LONG_N(bmp->size) * sizeof(unsigned long), bmp->mask_cpu); +} + +bool numa_affinity_manager::init() { + initok = false; + + if (get_sys_cpu() != get_sys_cpu_only()) { + return false; + } + + set_node_possible(); + set_conf_nd(); + set_cpu_possible(); + + cpu_count = get_sys_cpu(); + numa_count = get_sys_numa(); + if (cpu_count <= 0 || numa_count <= 0 || cpu_count % numa_count != 0) { + return false; + } + + int cpu_per_numa = cpu_count / numa_count; + int start = 0; + numa_cpu_map.clear(); + auto del_cpu_mask = [](bitmask *p) { + if (p != nullptr) { + numa_node_bm_free(p); + } + }; + for (int i = 0; i < numa_count; i++) { + auto msk = numa_allocate_cpumask(); + if (msk == nullptr) { + return false; + } + + for (int j = 0; j < cpu_per_numa; j++) { + set_bmp_bit(msk, start + j, 1); + } + numa_cpu_map.emplace_back(msk, del_cpu_mask); + start += cpu_per_numa; + } + initok = true; + return true; +} + +bool numa_affinity_manager::bind_numa(int group_id) { + if (initok) { + pid_t pid = gettid(); + return (numa_sched_setaffinity( + pid, numa_cpu_map[group_id%numa_cpu_map.size()].get()) == 0); + } + + return false; +} + +void numa_affinity_manager::print_cpumask(const string &name, bitmask *msk) { + cout << name << ": "; + for (unsigned int i = 0; i < msk->size; i++) { + if (get_bmp_bit(msk, i)) { + cout << i << " "; + } + } + cout << endl; +} +void numa_affinity_manager::dump() { + cout << "initok: " << initok << endl; + cout << "cpu_count: " << cpu_count << endl; + cout << "numa_count: " << numa_count << endl; + + for (unsigned int i = 0; i < numa_cpu_map.size(); i++) { + string name = "numa_cpu_map[" + to_string(i) + "]"; + print_cpumask(name, numa_cpu_map[i].get()); + } +} \ No newline at end of file diff --git a/plugin/thread_pool/numa_affinity_manager.h b/plugin/thread_pool/numa_affinity_manager.h index 3471d3287369b6f8900544c9a30485570101c915..b1d1033c8b3e78846242f5158acd820c94dbbf8e 100644 --- a/plugin/thread_pool/numa_affinity_manager.h +++ b/plugin/thread_pool/numa_affinity_manager.h @@ -20,62 +20,37 @@ #include #include #include -#include +#include +#include "sys/syscall.h" using namespace std; +extern struct bitmask *numa_bitmask; + +void numa_end(void); +int numa_num_configured_nodes(void); + +struct bitmask { + unsigned long size; + unsigned long *mask_cpu; +}; + class numa_affinity_manager { public: - numa_affinity_manager(){}; - virtual ~numa_affinity_manager(){}; - - bool init() { - initok = false; - cpu_count = get_sys_cpu(); - numa_count = get_sys_numa(); - if (cpu_count <= 0 || numa_count <= 0 || - cpu_count % numa_count != 0) { - return false; - } - - int cpu_per_numa = cpu_count / numa_count; - int start = 0; - numa_cpu_map.clear(); - auto delete_cpumask = [](bitmask *ptr) { - if (ptr != nullptr) { - numa_free_cpumask(ptr); - } - }; - for (int i = 0; i < numa_count; i++) { - auto msk = numa_allocate_cpumask(); - if (msk == nullptr) { - return false; - } - - for (int j = 0; j < cpu_per_numa; j++) { - numa_bitmask_setbit(msk, start + j); - } - numa_cpu_map.emplace_back(msk, delete_cpumask); - start += cpu_per_numa; - } - initok = true; - return true; - } - - bool bind_numa(int group_id) { - if (initok) { - pid_t pid = gettid(); - return (numa_sched_setaffinity( - pid, numa_cpu_map[group_id%numa_cpu_map.size()].get()) == 0); - } + numa_affinity_manager(){} + virtual ~numa_affinity_manager(){} - return false; - } + bool init(); + bool bind_numa(int group_id); protected: int get_sys_cpu() { - return numa_num_configured_cpus(); + return sysconf(_SC_NPROCESSORS_CONF); + } + + int get_sys_cpu_only() { + return sysconf(_SC_NPROCESSORS_ONLN); } int get_sys_numa() { @@ -87,30 +62,14 @@ protected: } public: - void print_cpumask(const string &name, bitmask *msk) { - cout << name << ": "; - for (unsigned int i = 0; i < msk->size; i++) { - if (numa_bitmask_isbitset(msk, i)) { - cout << i << " "; - } - } - cout << endl; - } - void dump() { - cout << "initok: " << initok << endl; - cout << "cpu_count: " << cpu_count << endl; - cout << "numa_count: " << numa_count << endl; - - for (unsigned int i = 0; i < numa_cpu_map.size(); i++) { - string name = "numa_cpu_map[" + to_string(i) + "]"; - print_cpumask(name, numa_cpu_map[i].get()); - } - } + void print_cpumask(const string &name, bitmask *msk) ; + void dump(); private: bool initok{false}; int cpu_count{0}; int numa_count{0}; + vector> numa_cpu_map; }; diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index 8e608c80fd0680bf3febb68479ae1e9327ee8c41..80795b511ac11e2bd44201943ce577d4603b0d34 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -139,8 +139,8 @@ static void tp_dec_ref_count() { class ThreadPoolConnSet { public: - ThreadPoolConnSet() {}; - virtual ~ThreadPoolConnSet() {}; + ThreadPoolConnSet() {} + virtual ~ThreadPoolConnSet() {} bool empty() { bool ret = false; @@ -391,6 +391,7 @@ int change_group(connection_t *c, thread_group_t *group, thread_group_t *to_grou to_group->connection_count++; /* Ensure that there is a listener in the new group. */ int ret = 0; + mysql_mutex_lock(&to_group->mutex); if (!to_group->thread_count) ret = create_worker(to_group, false); mysql_mutex_unlock(&to_group->mutex); @@ -1735,6 +1736,7 @@ void tp_end() { } else { tp_end_thread = new std::thread(tp_end_func); } + numa_end(); DBUG_VOID_RETURN; }