From 7ad3f0631d05c0934fe273782ec2f1be805bfbff Mon Sep 17 00:00:00 2001 From: liuwei Date: Thu, 2 Nov 2023 16:16:40 +0800 Subject: [PATCH] add thread-pool for mysql8.0.30 --- plugin/thread_pool/threadpool_unix.cc | 965 ++++++++++++++++++++++++++ 1 file changed, 965 insertions(+) diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index 82112ae44..cbe30d1d8 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -776,3 +776,968 @@ static void add_thread_count(thread_group_t *thread_group, thread_group->active_thread_count += count; tp_stats.num_worker_threads.fetch_add(count, std::memory_order_relaxed); } + +/** + Creates a new worker thread. + thread_mutex must be held when calling this function + + NOTE: in rare cases, the number of threads can exceed + threadpool_max_threads, because we need at least 2 threads + per group to prevent deadlocks (one listener + one worker) +*/ +static int create_worker(thread_group_t *thread_group, + bool due_to_stall) noexcept { + my_thread_handle thread_id; + bool max_threads_reached = false; + int err; + + DBUG_ENTER("create_worker"); + if (tp_stats.num_worker_threads.load(std::memory_order_relaxed) >= + (int)threadpool_max_threads && + thread_group->thread_count >= 2) { + err = 1; + max_threads_reached = true; + goto end; + } + + err = mysql_thread_create(key_worker_thread, &thread_id, + thread_group->pthread_attr, worker_main, + thread_group); + if (!err) { + thread_group->last_thread_creation_time = my_microsecond_getsystime(); + Global_THD_manager::get_instance()->inc_thread_created(); + add_thread_count(thread_group, 1); + TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations); + + if (due_to_stall) { + TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall); + } + } else { + set_my_errno(errno); + } + +end: + if (err) { + print_pool_blocked_message(max_threads_reached); + } else { + pool_block_start = 0; /* Reset pool blocked timer, if it was set */ + } + + DBUG_RETURN(err); +} + +/** + Calculate microseconds throttling delay for thread creation. + + The value depends on how many threads are already in the group: + small number of threads means no delay, the more threads the larger + the delay. + + The actual values were not calculated using any scientific methods. + They just look right, and behave well in practice. + + TODO: Should throttling depend on thread_pool_stall_limit? +*/ +static ulonglong microsecond_throttling_interval( + const thread_group_t &thread_group) noexcept { + const int count = thread_group.thread_count; + + if (count < 4) return 0; + + if (count < 8) return 50 * 1000; + + if (count < 16) return 100 * 1000; + + return 200 * 1000; +} + +/** + Wakes a worker thread, or creates a new one. + + Worker creation is throttled, so we avoid too many threads + to be created during the short time. +*/ +static int wake_or_create_thread(thread_group_t *thread_group, + bool due_to_stall) { + DBUG_ENTER("wake_or_create_thread"); + + if (thread_group->shutdown) DBUG_RETURN(0); + + if (wake_thread(thread_group, due_to_stall) == 0) DBUG_RETURN(0); + + if (thread_group->thread_count > thread_group->connection_count) + DBUG_RETURN(-1); + + /* In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. */ + if (thread_group->active_thread_count < + (1 + (int)threadpool_oversubscribe)) { + /* + We're better off creating a new thread here with no delay, either there + are not enough active workers, or they all are all blocking and there was no + idle thread to wakeup. Smells like a potential deadlock or very slowly + executing requests, e.g sleeps or user locks. + */ + DBUG_RETURN(create_worker(thread_group, due_to_stall)); + } + + const ulonglong now = my_microsecond_getsystime(); + const ulonglong time_since_last_thread_created = + (now - thread_group->last_thread_creation_time); + + /* Throttle thread creation. */ + if (time_since_last_thread_created > + microsecond_throttling_interval(*thread_group)) { + DBUG_RETURN(create_worker(thread_group, due_to_stall)); + } + + TP_INCREMENT_GROUP_COUNTER(thread_group, throttles); + DBUG_RETURN(-1); +} + +static int thread_group_init(thread_group_t *thread_group, + pthread_attr_t *thread_attr) noexcept { + DBUG_ENTER("thread_group_init"); + thread_group->pthread_attr = thread_attr; + mysql_mutex_init(key_group_mutex, &thread_group->mutex, nullptr); + thread_group->pollfd = -1; + thread_group->shutdown_pipe[0] = -1; + thread_group->shutdown_pipe[1] = -1; + thread_group->thread_count = 0; + thread_group->admin_port_thread_count = 0; + thread_group->dump_thread_count = 0; + thread_group->active_thread_count = 0; + thread_group->connection_count = 0; + thread_group->waiting_thread_count = 0; + thread_group->io_event_count = 0; + thread_group->queue_event_count = 0; + thread_group->shutdown = false; + thread_group->stalled = false; + DBUG_RETURN(0); +} + +static void thread_group_destroy(thread_group_t *thread_group) noexcept { + mysql_mutex_destroy(&thread_group->mutex); + if (thread_group->pollfd != -1) { + close(thread_group->pollfd); + thread_group->pollfd = -1; + } + for (int i = 0; i < 2; i++) { + if (thread_group->shutdown_pipe[i] != -1) { + close(thread_group->shutdown_pipe[i]); + thread_group->shutdown_pipe[i] = -1; + } + } +} + +/** + Wake sleeping thread from waiting list +*/ +static int wake_thread(thread_group_t *thread_group, bool due_to_stall) noexcept { + DBUG_ENTER("wake_thread"); + worker_thread_t *thread = thread_group->waiting_threads.front(); + if (thread) { + thread->woken = true; + thread_group->waiting_threads.remove(thread); + mysql_cond_signal(&thread->cond); + TP_INCREMENT_GROUP_COUNTER(thread_group, wakes); + if (due_to_stall) { + TP_INCREMENT_GROUP_COUNTER(thread_group, wakes_due_to_stall); + } + DBUG_RETURN(0); + } + DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */ +} + +/** + Shutdown for thread group +*/ +static void thread_group_close(thread_group_t *thread_group) noexcept { + DBUG_ENTER("thread_group_close"); + + mysql_mutex_lock(&thread_group->mutex); + if (thread_group->thread_count == 0) { + mysql_mutex_unlock(&thread_group->mutex); + thread_group_destroy(thread_group); + DBUG_VOID_RETURN; + } + + thread_group->shutdown = true; + thread_group->listener = nullptr; + + if (pipe(thread_group->shutdown_pipe)) { + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; + } + + /* Wake listener */ + if (io_poll_associate_fd(thread_group->pollfd, + thread_group->shutdown_pipe[0], nullptr)) { + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; + } + char c = 0; + if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) { + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; + } + + /* Wake all workers. */ + while (wake_thread(thread_group, false) == 0) { + } + + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + +/* + Add work to the queue. Maybe wake a worker if they all sleep. + + Currently, this function is only used when new connections need to + perform login (this is done in worker threads). +*/ +static void queue_put(thread_group_t *thread_group, connection_t *connection) { + DBUG_ENTER("queue_put"); + + mysql_mutex_lock(&thread_group->mutex); + connection->tickets = tp_get_thdvar_high_prio_tickets(connection->thd); + connection->enqueue_time = pool_timer.current_microtime; + + queue_push(thread_group, connection); + + /* In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. */ + if (thread_group->active_thread_count < + 1 + (int)threadpool_oversubscribe) { + wake_or_create_thread(thread_group, false); + } + + mysql_mutex_unlock(&thread_group->mutex); + + DBUG_VOID_RETURN; +} + +/** + Retrieve a connection with pending event. + + Pending event in our case means that there is either a pending login request + (if connection is not yet logged in), or there are unread bytes on the socket. + + If there are no pending events currently, thread will wait. + If timeout specified in abstime parameter passes, the function returns nullptr. + + @param current_thread - current worker thread + @param thread_group - current thread group + @param abstime - absolute wait timeout + + @return + connection with pending event. + nullptr is returned if timeout has expired,or on shutdown. +*/ +static connection_t *get_event(worker_thread_t *current_thread, + thread_group_t *thread_group, + struct timespec *abstime) { + DBUG_ENTER("get_event"); + connection_t *connection = nullptr; + int err = 0; + + mysql_mutex_lock(&thread_group->mutex); + assert(thread_group->active_thread_count >= 0); + + for (;;) { + const bool oversubscribed = too_many_active_threads(*thread_group); + if (thread_group->shutdown) break; + + /* Check if queue is not empty */ + if (!oversubscribed) { + connection = queue_get(thread_group, WORKER); + if (connection) break; + } + + /* If there is currently no listener in the group, become one. */ + if (!thread_group->listener) { + thread_group->listener = current_thread; + thread_group->active_thread_count--; + mysql_mutex_unlock(&thread_group->mutex); + + connection = listener(thread_group); + + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count++; + /* There is no listener anymore, it just returned. */ + thread_group->listener = nullptr; + break; + } + + /* + Last thing we try before going to sleep is to + pick a single event via epoll, without waiting (timeout 0) + */ + if (!oversubscribed) { + native_event nev; + if (io_poll_wait(thread_group->pollfd, &nev, 1, 0) == 1) { + thread_group->io_event_count++; + TP_INCREMENT_GROUP_COUNTER(thread_group, polls[WORKER]); + connection = (connection_t *)native_event_get_userdata(&nev); + + /* + Since we are going to perform an out-of-order event processing for the + connection, first check whether it is eligible for high priority + processing. We can get here even if there are queued events, so it + must either have a high priority ticket, or there must be not too many + busy threads (as if it was coming from a low priority queue). + */ + if (connection_is_high_prio(*connection)) + connection->tickets--; + else if (too_many_busy_threads(*thread_group)) { + /* + Not eligible for high priority processing. Restore tickets and put + it into the low priority queue. + */ + connection->tickets = tp_get_thdvar_high_prio_tickets(connection->thd); + thread_group->queue.push_back(connection); + connection = nullptr; + } + + if (connection) { + TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues[WORKER]); + thread_group->queue_event_count++; + break; + } + } + } + + /* And now, finally sleep */ + current_thread->woken = false; /* wake() sets this to true */ + + /* + Add current thread to the head of the waiting list and wait. + It is important to add thread to the head rather than tail + as it ensures LIFO wakeup order (hot caches, working inactivity timeout) + */ + thread_group->waiting_threads.push_front(current_thread); + + thread_group->active_thread_count--; + if (abstime) { + err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex, + abstime); + } else { + err = mysql_cond_wait(¤t_thread->cond, &thread_group->mutex); + } + thread_group->active_thread_count++; + + if (!current_thread->woken) { + /* + Thread was not signalled by wake(), it might be a spurious wakeup or + a timeout. Anyhow, we need to remove ourselves from the list now. + If thread was explicitly woken, than caller removed us from the list. + */ + thread_group->waiting_threads.remove(current_thread); + } + + if (err) break; + } + + thread_group->stalled = false; + mysql_mutex_unlock(&thread_group->mutex); + + DBUG_RETURN(connection); +} + +/** + Tells the pool that worker starts waiting on IO, lock, condition, + sleep() or similar. +*/ + +static void wait_begin(thread_group_t *thread_group) noexcept { + DBUG_ENTER("wait_begin"); + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count--; + thread_group->waiting_thread_count++; + + assert(thread_group->active_thread_count >= 0); + assert(thread_group->connection_count > 0); + +#ifdef THREADPOOL_CREATE_THREADS_ON_WAIT + /* In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. */ + if ((thread_group->active_thread_count < (1 + (int)threadpool_oversubscribe)) && + (!queues_are_empty(*thread_group) || !thread_group->listener)) { + /* + Group might stall while this thread waits, thus wake + or create a worker to prevent stall. + */ + wake_or_create_thread(thread_group); + } +#endif + + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + +/** + Tells the pool has finished waiting. +*/ +static void wait_end(thread_group_t *thread_group) noexcept { + DBUG_ENTER("wait_end"); + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count++; + thread_group->waiting_thread_count--; + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + +/** + Allocate/initialize a new connection structure. +*/ + +static connection_t *alloc_connection(THD *thd) noexcept { + DBUG_ENTER("alloc_connection"); + DBUG_EXECUTE_IF("simulate_tp_alloc_connection_oom", DBUG_RETURN(nullptr);); + + connection_t *connection = (connection_t *)my_malloc( + PSI_NOT_INSTRUMENTED /*key_memory_thread_pool_connection*/, + sizeof(connection_t), 0); + if (connection) { + connection->thd = thd; + connection->waiting = false; + connection->logged_in = false; + connection->bound_to_poll_descriptor = false; + connection->abs_wait_timeout = ULLONG_MAX; + connection->tickets = 0; + } + DBUG_RETURN(connection); +} + +/** + Add a new connection to thread pool.. +*/ + +bool tp_add_connection( + Channel_info *channel_info) { + DBUG_ENTER("Thread_pool_connection_handler::add_connection"); + + THD *const thd = channel_info->create_thd(); + + if (unlikely(!thd)) { + channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false); + DBUG_RETURN(true); + } + + connection_t *const connection = alloc_connection(thd); + + if (unlikely(!connection)) { + thd->get_protocol_classic()->end_net(); + delete thd; + // channel will be closed by send_error_and_close_channel() + channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false); + DBUG_RETURN(true); + } + + delete channel_info; + + thd->set_new_thread_id(); + thd->start_utime = my_micro_time(); + + threadpool_thds.insert(connection); + Global_THD_manager::get_instance()->add_thd(thd); + + thd->scheduler.data = connection; + + /* Assign connection to a group. */ + thread_group_t *group = &all_groups[thd->thread_id() % group_count]; + + connection->thread_group = group; + + if (thd->is_admin_connection()) { + my_thread_handle thread_id; + mysql_mutex_lock(&group->mutex); + int err = mysql_thread_create(key_worker_thread, &thread_id, + group->pthread_attr, admin_port_worker_main, connection); + + if (err) { + set_my_errno(errno); + print_pool_blocked_message(false); + } else { + group->admin_port_thread_count++; + } + mysql_mutex_unlock(&group->mutex); + } else { + mysql_mutex_lock(&group->mutex); + group->connection_count++; + mysql_mutex_unlock(&group->mutex); + + /* + Add connection to the work queue. Actual login + will be done by a worker thread. + */ + queue_put(group, connection); + } + + DBUG_RETURN(false); +} + +/** + Terminate connection. +*/ +static void connection_abort(connection_t *connection) { + DBUG_ENTER("connection_abort"); + threadpool_thds.erase(connection); + + thread_group_t *group = connection->thread_group; + bool is_admin_port = connection->thd->is_admin_connection(); + threadpool_remove_connection(connection->thd); + + if (!is_admin_port) { + mysql_mutex_lock(&group->mutex); + group->connection_count--; + mysql_mutex_unlock(&group->mutex); + } + + my_free(connection); + DBUG_VOID_RETURN; +} + +/** + Detach connection. +*/ +static void connection_detach(connection_t *connection) { + DBUG_ENTER("connection_detach"); + threadpool_thds.erase(connection); + + thread_group_t *group = connection->thread_group; + bool is_admin_port = connection->thd->is_admin_connection(); + Vio *const vio = connection->thd->get_protocol_classic()->get_vio(); + const int fd = mysql_socket_getfd(vio->mysql_socket); + mysql_mutex_lock(&group->mutex); + io_poll_disassociate_fd(group->pollfd, fd); + connection->bound_to_poll_descriptor = false; + mysql_mutex_unlock(&group->mutex); + + if (!is_admin_port) { + mysql_mutex_lock(&group->mutex); + group->connection_count--; + mysql_mutex_unlock(&group->mutex); + } + + my_thread_handle thread_id; + + if (mysql_thread_create(key_worker_thread, &thread_id, group->pthread_attr, + connection_detach_worker, connection->thd)) { + threadpool_remove_connection(connection->thd); + } + + my_free(connection); + DBUG_VOID_RETURN; +} + + +static void *connection_detach_worker(void *param) { + my_thread_init(); + DBUG_ENTER("connection_detach_worker"); + THD *thd = static_cast(param); + assert(thd != nullptr); + thread_attach(thd); + + while (1) { + if (threadpool_process_request(thd)) { + break; + } + } + + threadpool_remove_connection(thd); + return nullptr; +} + +/** + MySQL scheduler callback : kill connection +*/ + +void tp_post_kill_notification(THD *thd) noexcept { + DBUG_ENTER("tp_post_kill_notification"); + if (current_thd == thd || thd->system_thread) { + DBUG_VOID_RETURN; + } + + Vio *vio = thd->get_protocol_classic()->get_vio(); + if (vio) vio_cancel(vio, SHUT_RD); + DBUG_VOID_RETURN; +} + +alignas(CPU_LEVEL1_DCACHE_LINESIZE) std::atomic tp_waits[THD_WAIT_LAST]; + +/** + MySQL scheduler callback: wait begin +*/ +void tp_wait_begin(THD *thd, int type MY_ATTRIBUTE((unused))) { + DBUG_ENTER("tp_wait_begin"); + + if (thd == nullptr) { + DBUG_VOID_RETURN; + } + + connection_t *connection = (connection_t *)thd->scheduler.data; + + if (connection && connection->thd && + !connection->thd->is_admin_connection()) { + assert(!connection->waiting); + connection->waiting = true; + assert(type > 0 && type < THD_WAIT_LAST); + tp_waits[type]++; + wait_begin(connection->thread_group); + } + DBUG_VOID_RETURN; +} + +/** + MySQL scheduler callback: wait end +*/ + +void tp_wait_end(THD *thd) { + DBUG_ENTER("tp_wait_end"); + + if (thd == nullptr) { + DBUG_VOID_RETURN; + } + connection_t *connection = (connection_t *)thd->scheduler.data; + + if (connection && connection->thd && + !connection->thd->is_admin_connection()) { + assert(connection->waiting); + connection->waiting = false; + wait_end(connection->thread_group); + } + DBUG_VOID_RETURN; +} + +static void set_next_timeout_check(ulonglong abstime) { + DBUG_ENTER("set_next_timeout_check"); + while (abstime < pool_timer.next_timeout_check.load()) { + uint64 old = pool_timer.next_timeout_check.load(); + pool_timer.next_timeout_check.compare_exchange_weak(old, abstime); + } + DBUG_VOID_RETURN; +} + + + + inline ulong get_wait_timeout(THD *thd) noexcept { + return thd->variables.net_wait_timeout; + } + +/** + Set wait timeout for connection. +*/ + +static void set_wait_timeout(connection_t *c) noexcept { + DBUG_ENTER("set_wait_timeout"); + /* + Calculate wait deadline for this connection. + Instead of using my_microsecond_getsystime() which has a syscall + overhead, use pool_timer.current_microtime and take + into account that its value could be off by at most + one tick interval. + */ + + c->abs_wait_timeout = + pool_timer.current_microtime.load(std::memory_order_relaxed) + + 1000LL * pool_timer.tick_interval + + 1000000LL * get_wait_timeout(c->thd); + + set_next_timeout_check(c->abs_wait_timeout); + DBUG_VOID_RETURN; +} + +/** + Handle a (rare) special case,where connection needs to + migrate to a different group because group_count has changed + after thread_pool_size setting. +*/ + +static int change_group(connection_t *c, thread_group_t *old_group, + thread_group_t *new_group) { + assert(c->thread_group == old_group); + + /* Remove connection from the old group. */ + if (c->bound_to_poll_descriptor) { + Vio *const vio = c->thd->get_protocol_classic()->get_vio(); + const int fd = mysql_socket_getfd(vio->mysql_socket); + mysql_mutex_lock(&old_group->mutex); + io_poll_disassociate_fd(old_group->pollfd, fd); + c->bound_to_poll_descriptor = false; + } else { + mysql_mutex_lock(&old_group->mutex); + } + c->thread_group->connection_count--; + mysql_mutex_unlock(&old_group->mutex); + + /* Add connection to the new group. */ + mysql_mutex_lock(&new_group->mutex); + c->thread_group = new_group; + new_group->connection_count++; + /* Ensure that there is a listener in the new group. */ + int ret = 0; + if (!new_group->thread_count) ret = create_worker(new_group, false); + mysql_mutex_unlock(&new_group->mutex); + return ret; +} + +static int start_io(connection_t *connection) { + /* + Usually, connection will stay in the same group for the entire + connection's life. However, we do allow group_count to + change at runtime, which means in rare cases when it changes is + connection should need to migrate to another group, this ensures + to ensure equal load between groups. + + So we recalculate in which group the connection should be, based + on thread_id and current group count, and migrate if necessary. + */ + thread_group_t *const group = + &all_groups[connection->thd->thread_id() % group_count]; + + if (group != connection->thread_group) { + if (change_group(connection, connection->thread_group, group)) return -1; + } + + /* + Bind to poll descriptor if not yet done. + */ + Vio *vio = connection->thd->get_protocol_classic()->get_vio(); + int fd = mysql_socket_getfd(vio->mysql_socket); + if (!connection->bound_to_poll_descriptor) { + connection->bound_to_poll_descriptor = true; + return io_poll_associate_fd(group->pollfd, fd, connection); + } + + return io_poll_start_read(group->pollfd, fd, connection); +} + +static void handle_event(connection_t *connection) { + DBUG_ENTER("handle_event"); + int err = 0; + + while (1) { + if (!connection->logged_in) { + err = threadpool_add_connection(connection->thd); + connection->logged_in = true; + } else { + err = threadpool_process_request(connection->thd); + } + + if (err) { + goto end; + } + + if (connection->thd == thd_to_detach) { + connection_detach(connection); + goto end_return; + } + + set_wait_timeout(connection); + + if (!connection_is_worker_continue(*connection)) { + break; + } + } + + if (!connection->thd->is_admin_connection()) { + err = start_io(connection); + } + +end: + if (err || connection->thd->is_admin_connection()) { + connection_abort(connection); + } + +end_return: + DBUG_VOID_RETURN; +} + +static void *admin_port_worker_main(void *param) { + my_thread_init(); + DBUG_ENTER("admin_port_worker_main"); + +#ifdef HAVE_PSI_THREAD_INTERFACE + PSI_THREAD_CALL(set_thread_account) + (nullptr, 0, nullptr, 0); +#endif + + connection_t *connection = static_cast(param); + assert(connection != nullptr); + assert(connection->thread_group != nullptr); + thread_group_t *group = connection->thread_group; + + handle_event(connection); + + mysql_mutex_lock(&group->mutex); + group->admin_port_thread_count--; + mysql_mutex_unlock(&group->mutex); + + my_thread_end(); + return nullptr; +} + +/** + Worker thread's main +*/ +static void *worker_main(void *param) { + my_thread_init(); + + DBUG_ENTER("worker_main"); + + thread_group_t *thread_group = static_cast(param); + assert(thread_group != nullptr); + + if (threadpool_sched_affinity) { + group_affinity.bind_numa((thread_group - all_groups) / sizeof(thread_group_t)); + } + + /* Init per-thread structure */ + worker_thread_t this_thread; + mysql_cond_init(key_worker_cond, &this_thread.cond); + this_thread.thread_group = thread_group; + this_thread.event_count = 0; + +#ifdef HAVE_PSI_THREAD_INTERFACE + PSI_THREAD_CALL(set_thread_account) + (nullptr, 0, nullptr, 0); +#endif + + /* Run event loop */ + for (;;) { + struct timespec ts; + set_timespec(&ts, threadpool_idle_timeout); + connection_t *connection = get_event(&this_thread, thread_group, &ts); + + if (!connection) { + break; + } + + this_thread.event_count++; + handle_event(connection); + } + + /* Thread shutdown: cleanup per-worker-thread structure. */ + mysql_cond_destroy(&this_thread.cond); + + bool last_thread = false; /* last thread in group exits */ + mysql_mutex_lock(&thread_group->mutex); + add_thread_count(thread_group, -1); + last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown); + mysql_mutex_unlock(&thread_group->mutex); + + /* Last thread in group exits and pool is terminating, destroy group.*/ + if (last_thread) { + thread_group_destroy(thread_group); + } + + my_thread_end(); + return nullptr; +} + +bool tp_init() { + DBUG_ENTER("tp_init"); + threadpool_started = true; + group_affinity.init(); + + for (uint i = 0; i < array_elements(all_groups); i++) { + thread_group_init(&all_groups[i], get_connection_attrib()); + } + tp_set_threadpool_size(threadpool_size); + if (group_count == 0) { + /* Something went wrong */ + sql_print_error("Can't set threadpool size to %d", threadpool_size); + DBUG_RETURN(true); + } +#ifdef HAVE_PSI_INTERFACE + mysql_mutex_register("threadpool", mutex_list, array_elements(mutex_list)); + mysql_cond_register("threadpool", cond_list, array_elements(cond_list)); + mysql_thread_register("threadpool", thread_list, array_elements(thread_list)); +#endif + + pool_timer.tick_interval = threadpool_stall_limit; + start_timer(&pool_timer); + DBUG_RETURN(false); +} + +void tp_end_thread() { + if (!threadpool_started) { + return; + } + + while (!threadpool_thds.empty()) { + my_sleep(10000); + } + + stop_timer(&pool_timer); + + for (uint i = 0; i < array_elements(all_groups); i++) { + thread_group_close(&all_groups[i]); + } + + threadpool_started = false; +} + +void tp_end() { + DBUG_ENTER("tp_end"); + threadpool_thds.killConns(); + + std::thread exit_tp(tp_end_thread); + exit_tp.detach(); + DBUG_VOID_RETURN; +} + +/** Ensure that poll descriptors are created when threadpool_size changes */ +void tp_set_threadpool_size(uint size) noexcept { + if (!threadpool_started) return; + + bool success = true; + for (uint i = 0; i < size; i++) { + thread_group_t *group = &all_groups[i]; + mysql_mutex_lock(&group->mutex); + if (group->pollfd == -1) { + group->pollfd = io_poll_create(); + success = (group->pollfd >= 0); + if (!success) { + sql_print_error("io_poll_create() failed, errno=%d\n", errno); + break; + } + } + mysql_mutex_unlock(&all_groups[i].mutex); + if (!success) { + group_count = i; + return; + } + } + group_count = size; +} + +void tp_set_threadpool_stall_limit(uint limit) noexcept { + if (!threadpool_started) { + return; + } + + mysql_mutex_lock(&(pool_timer.mutex)); + pool_timer.tick_interval = limit; + mysql_mutex_unlock(&(pool_timer.mutex)); + mysql_cond_signal(&(pool_timer.cond)); +} + +/** + Calculate number of idle/waiting threads in the pool. + + Sum idle threads over all groups. + Don't do any locking, it is not required for stats. +*/ +int tp_get_idle_thread_count() noexcept { + int sum = 0; + for (uint i = 0; + i < array_elements(all_groups) && (all_groups[i].pollfd >= 0); i++) { + sum += (all_groups[i].thread_count - all_groups[i].active_thread_count); + } + return sum; +} -- Gitee