From 17989073dd49ea3057cf261d361ccc59acba71c1 Mon Sep 17 00:00:00 2001 From: liuwei Date: Wed, 8 May 2024 16:02:23 +0800 Subject: [PATCH] add threadpool code for mysql --- plugin/thread_pool/threadpool_unix.cc | 748 +++++++++++++++++++++++++- 1 file changed, 747 insertions(+), 1 deletion(-) diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index d0e2cad75..a34ea4366 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -497,4 +497,750 @@ static connection_t *queue_get(thread_group_t *thread_group) noexcept { thread_group->queue.remove(c); } DBUG_RETURN(c); -} \ No newline at end of file +} + +static connection_t *queue_get(thread_group_t *group, operation_origin origin) { + connection_t *ret = queue_get(group); + if (ret != nullptr) { + TP_INCREMENT_GROUP_COUNTER(group, dequeues[(int)origin]); + } + return ret; +} + +static inline void queue_push(thread_group_t *thread_group, connection_t *connection) +{ + connection->enqueue_time= pool_timer.current_microtime; + thread_group->queue.push_back(connection); +} + +static inline void high_prio_queue_push(thread_group_t *thread_group, connection_t *connection) +{ + connection->enqueue_time= pool_timer.current_microtime; + thread_group->high_prio_queue.push_back(connection); +} + +class Thd_timeout_checker : public Do_THD_Impl { + private: + pool_timer_t *const m_timer; + + public: + Thd_timeout_checker(pool_timer_t *timer) noexcept : m_timer(timer) {} + + virtual ~Thd_timeout_checker() {} + + virtual void operator()(THD *thd) noexcept { + if (thd_get_net_read_write(thd) != 1) return; + + connection_t *connection = (connection_t *)thd->scheduler.data; + if (!connection) return; + + if (connection->abs_wait_timeout < + m_timer->current_microtime.load(std::memory_order_relaxed)) { + /* Wait timeout exceeded, kill connection. */ + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->killed = THD::KILL_CONNECTION; + tp_post_kill_notification(thd); + mysql_mutex_unlock(&thd->LOCK_thd_data); + } else { + set_next_timeout_check(connection->abs_wait_timeout); + } + } +}; + +/* + Handle wait timeout : + Find connections that have been idle for too long and kill them. + Also, recalculate time when next timeout check should run. +*/ +static void timeout_check(pool_timer_t *timer) { + DBUG_ENTER("timeout_check"); + + /* Reset next timeout check, it will be recalculated in the loop below */ + timer->next_timeout_check.store(ULLONG_MAX, std::memory_order_relaxed); + + Thd_timeout_checker thd_timeout_checker(timer); + Global_THD_manager::get_instance()->do_for_all_thd_copy(&thd_timeout_checker); + + DBUG_VOID_RETURN; +} + +/* + Timer thread. + + Periodically, check if one of the thread groups is stalled. Stalls happen if + events are not being dequeued from the queue, or from the network, Primary + reason for stall can be a lengthy executing non-blocking request. It could + also happen that thread is waiting but wait_begin/wait_end is forgotten by + storage engine. Timer thread will create a new thread in group in case of + a stall. + + Besides checking for stalls, timer thread is also responsible for terminating + clients that have been idle for longer than wait_timeout seconds. + + TODO: Let the timer sleep for long time if there is no work to be done. + Currently it wakes up rather often on and idle server. +*/ +static void *timer_thread(void *param) noexcept { + my_thread_init(); + DBUG_ENTER("timer_thread"); + + pool_timer_t *timer = (pool_timer_t *)param; + timer->next_timeout_check.store(ULLONG_MAX, std::memory_order_relaxed); + timer->current_microtime.store(my_microsecond_getsystime(), + std::memory_order_relaxed); + + for (;;) { + struct timespec ts; + + set_timespec_nsec(&ts, timer->tick_interval * 1000000ULL); + mysql_mutex_lock(&timer->mutex); + int err = mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts); + if (timer->shutdown) { + mysql_mutex_unlock(&timer->mutex); + break; + } + if (err == ETIMEDOUT) { + timer->current_microtime.store(my_microsecond_getsystime(), + std::memory_order_relaxed); + + /* Check stalls in thread groups */ + for (size_t i = 0; i < array_elements(all_groups); i++) { + if (all_groups[i].connection_count) check_stall(&all_groups[i]); + } + + /* Check if any client exceeded wait_timeout */ + if (timer->next_timeout_check.load(std::memory_order_relaxed) <= + timer->current_microtime.load(std::memory_order_relaxed)) + timeout_check(timer); + } + mysql_mutex_unlock(&timer->mutex); + } + + mysql_mutex_destroy(&timer->mutex); + my_thread_end(); + return nullptr; +} + +/* + Check if both the high and low priority queues are empty. + + NOTE: we also consider the low priority queue empty in case it has events, but + they cannot be processed due to the too_many_busy_threads() limit. +*/ +static bool queues_are_empty(const thread_group_t &tg) noexcept { + return (tg.high_prio_queue.is_empty() && + (tg.queue.is_empty() || too_many_busy_threads(tg))); +} + +static void check_stall(thread_group_t *thread_group) { + if (mysql_mutex_trylock(&thread_group->mutex) != 0) { + /* Something happens. Don't disturb */ + return; + } + + /* + Check if listener is present. If not, check whether any IO + events were dequeued since last time. If not, this means + listener is either in tight loop or thd_wait_begin() + was forgotten. Create a new worker(it will make itself listener). + */ + if (!thread_group->listener && !thread_group->io_event_count) { + wake_or_create_thread(thread_group, true); + mysql_mutex_unlock(&thread_group->mutex); + return; + } + + /* Reset io event count */ + thread_group->io_event_count = 0; + + /* + Check whether requests from the workqueues are being dequeued. + + The stall detection and resolution works as follows: + + 1. There is a counter thread_group->queue_event_count for the number of + events removed from the queues. Timer resets the counter to 0 on each + run. + 2. Timer determines stall if this counter remains 0 since last check + and at least one of the high and low priority queues is not empty. + 3. Once timer determined a stall it sets thread_group->stalled flag and + wakes and idle worker (or creates a new one, subject to throttling). + 4. The stalled flag is reset, when an event is dequeued. + + Q : Will this handling lead to an unbound growth of threads, if queues + stall permanently? + A : No. If queues stall permanently, it is an indication for many very long + simultaneous queries. The maximum number of simultanoues queries is + max_connections, further we have threadpool_max_threads limit, upon which no + worker threads are created. So in case there is a flood of very long + queries, threadpool would slowly approach thread-per-connection behavior. + NOTE: + If long queries never wait, creation of the new threads is done by timer, + so it is slower than in real thread-per-connection. However if long queries + do wait and indicate that via thd_wait_begin/end callbacks, thread creation + will be faster. + */ + if (!thread_group->queue_event_count && !queues_are_empty(*thread_group)) { + thread_group->stalled = true; + TP_INCREMENT_GROUP_COUNTER(thread_group, stalls); + wake_or_create_thread(thread_group, true); + } + + /* Reset queue event count */ + thread_group->queue_event_count = 0; + + mysql_mutex_unlock(&thread_group->mutex); +} + +static void start_timer(pool_timer_t *timer) noexcept { + my_thread_handle thread_id; + DBUG_ENTER("start_timer"); + mysql_mutex_init(key_timer_mutex, &timer->mutex, nullptr); + mysql_cond_init(key_timer_cond, &timer->cond); + timer->shutdown = false; + mysql_thread_create(key_timer_thread, &thread_id, nullptr, timer_thread, timer); + DBUG_VOID_RETURN; +} + +static void stop_timer(pool_timer_t *timer) noexcept { + DBUG_ENTER("stop_timer"); + mysql_mutex_lock(&timer->mutex); + timer->shutdown = true; + mysql_cond_signal(&timer->cond); + mysql_mutex_unlock(&timer->mutex); + DBUG_VOID_RETURN; +} + +/** + Poll for socket events and distribute them to worker threads + In many case current thread will handle single event itself. + + @return a ready connection, or NULL on shutdown +*/ +static connection_t *listener(thread_group_t *thread_group) { + DBUG_ENTER("listener"); + connection_t *retval = nullptr; + + for (;;) { + if (thread_group->shutdown) break; + + native_event ev[MAX_EVENTS]; + int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1); + + DBUG_EXECUTE_IF("threadpool_io_poll_wait_at_least_2_events", + { + while (cnt < 2) + { + int cnt_again = io_poll_wait(thread_group->pollfd, ev + cnt, MAX_EVENTS - cnt, -1); + cnt += cnt_again; + } + } + ); + + TP_INCREMENT_GROUP_COUNTER(thread_group, polls[LISTENER]); + if (cnt <= 0) { + assert(thread_group->shutdown); + break; + } + + mysql_mutex_lock(&thread_group->mutex); + + if (thread_group->shutdown) { + mysql_mutex_unlock(&thread_group->mutex); + break; + } + + thread_group->io_event_count += cnt; + + /* + We got some network events and need to make decisions : whether + listener hould handle events and whether or not any wake worker + threads so they can handle events. + + Q1 : Should listener handle an event itself, or put all events into + queue and let workers handle the events? + + Solution : + Generally, listener that handles events itself is preferable. We do not + want listener thread to change its state from waiting to running too + often, Since listener has just woken from poll, it better uses its time + slice and does some work. Besides, not handling events means they go to + the queue, and often to wake another worker must wake up to handle the + event. This is not good, as we want to avoid wakeups. + + The downside of listener that also handles queries is that we can + potentially leave thread group for long time not picking the new + network events. It is not a major problem, because this stall will be + detected sooner or later by the timer thread. Still, relying on timer + is not always good, because it may "tick" too slow (large timer_interval) + + We use following strategy to solve this problem - if queue was not empty + we suspect flood of network events and listener stays, Otherwise, it + handles a query. + + + Q2: If queue is not empty, how many workers to wake? + + Solution: + We generally try to keep one thread per group active (threads handling + queries are considered active, unless they stuck in inside some "wait") + Thus, we will wake only one worker, and only if there is not active + threads currently,and listener is not going to handle a query. When we + don't wake, we hope that currently active threads will finish fast and + handle the queue. If this does not happen, timer thread will detect stall + and wake a worker. + + NOTE: Currently nothing is done to detect or prevent long queuing times. + A solutionc for the future would be to give up "one active thread per + group" principle, if events stay in the queue for too long, and just wake + more workers. + */ + + const bool listener_picks_event = threadpool_dedicated_listener? false : + (thread_group->high_prio_queue.is_empty() && thread_group->queue.is_empty()); + + /* + If listener_picks_event is set, listener thread will handle first event, + and put the rest into the queue. If listener_pick_event is not set, all + events go to the queue. + */ + for (int i = (listener_picks_event) ? 1 : 0; i < cnt; i++) { + connection_t *c = (connection_t *)native_event_get_userdata(&ev[i]); + if (connection_is_high_prio(*c)) { + c->tickets--; + thread_group->high_prio_queue.push_back(c); + } else { + c->tickets = tp_get_thdvar_high_prio_tickets(c->thd); + queue_push(thread_group, c); + } + } + + if (listener_picks_event) { + /* Handle the first event. */ + retval = (connection_t *)native_event_get_userdata(&ev[0]); + TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues[LISTENER]); + mysql_mutex_unlock(&thread_group->mutex); + break; + } + + /* The remaining threads can be created at most */ + int workers_in_need = (int)threadpool_toobusy - + thread_group->active_thread_count - thread_group->waiting_thread_count; + + /* There are no remaining threads and the thread group is stalled */ + if (workers_in_need <= 0 && thread_group->active_thread_count == 0) { + workers_in_need = 1; + } + + /* The number of threads that can be created and + the number of threads that are really needed, whichever is smaller */ + workers_in_need = workers_in_need > cnt ? cnt : workers_in_need; + + /* Wake up or create the required threads */ + for (int i = 0; i < workers_in_need; i++) { + /* We added some work items to queue, now wake a worker. */ + if (wake_thread(thread_group, false)) { + /* + Wake failed, hence groups has no idle threads. Now check if there are + any threads in the group except listener. + In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. + The queue is not empty, and listener is not going to handle + events. In order to drain the queue, we create a worker here. + Alternatively, we could just rely on timer to detect stall, and + create thread, but waiting for timer would be an inefficient and + pointless delay. + */ + create_worker(thread_group, false); + } + } + mysql_mutex_unlock(&thread_group->mutex); + } + DBUG_RETURN(retval); +} + +/** + Adjust thread counters in group or global + whenever thread is created or is about to exit + + @param thread_group + @param count - 1, when new thread is created + -1, when thread is about to exit +*/ +static void add_thread_count(thread_group_t *thread_group, + int32 count) noexcept { + thread_group->thread_count += count; + /* worker starts out and end in "active" state */ + thread_group->active_thread_count += count; + tp_stats.num_worker_threads.fetch_add(count, std::memory_order_relaxed); +} + +/** + Creates a new worker thread. + thread_mutex must be held when calling this function + + NOTE: in rare cases, the number of threads can exceed + threadpool_max_threads, because we need at least 2 threads + per group to prevent deadlocks (one listener + one worker) +*/ +static int create_worker(thread_group_t *thread_group, + bool due_to_stall) noexcept { + my_thread_handle thread_id; + bool max_threads_reached = false; + int err; + + DBUG_ENTER("create_worker"); + if (tp_stats.num_worker_threads.load(std::memory_order_relaxed) >= + (int)threadpool_max_threads && + thread_group->thread_count >= 2) { + err = 1; + max_threads_reached = true; + goto end; + } + + err = mysql_thread_create(key_worker_thread, &thread_id, + thread_group->pthread_attr, worker_main, + thread_group); + if (!err) { + thread_group->last_thread_creation_time = my_microsecond_getsystime(); + Global_THD_manager::get_instance()->inc_thread_created(); + add_thread_count(thread_group, 1); + TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations); + + if (due_to_stall) { + TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall); + } + } else { + set_my_errno(errno); + } + +end: + if (err) { + print_pool_blocked_message(max_threads_reached); + } else { + pool_block_start = 0; /* Reset pool blocked timer, if it was set */ + } + + DBUG_RETURN(err); +} + +/** + Calculate microseconds throttling delay for thread creation. + + The value depends on how many threads are already in the group: + small number of threads means no delay, the more threads the larger + the delay. + + The actual values were not calculated using any scientific methods. + They just look right, and behave well in practice. + + TODO: Should throttling depend on thread_pool_stall_limit? +*/ +static ulonglong microsecond_throttling_interval( + const thread_group_t &thread_group) noexcept { + const int count = thread_group.thread_count; + + if (count < 4) return 0; + + if (count < 8) return 50 * 1000; + + if (count < 16) return 100 * 1000; + + return 200 * 1000; +} + +/** + Wakes a worker thread, or creates a new one. + + Worker creation is throttled, so we avoid too many threads + to be created during the short time. +*/ +static int wake_or_create_thread(thread_group_t *thread_group, + bool due_to_stall) { + DBUG_ENTER("wake_or_create_thread"); + + if (thread_group->shutdown) DBUG_RETURN(0); + + if (wake_thread(thread_group, due_to_stall) == 0) DBUG_RETURN(0); + + if (thread_group->thread_count > thread_group->connection_count) + DBUG_RETURN(-1); + + /* In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. */ + if (thread_group->active_thread_count < + (1 + (int)threadpool_oversubscribe)) { + /* + We're better off creating a new thread here with no delay, either there + are not enough active workers, or they all are all blocking and there was no + idle thread to wakeup. Smells like a potential deadlock or very slowly + executing requests, e.g sleeps or user locks. + */ + DBUG_RETURN(create_worker(thread_group, due_to_stall)); + } + + const ulonglong now = my_microsecond_getsystime(); + const ulonglong time_since_last_thread_created = + (now - thread_group->last_thread_creation_time); + + /* Throttle thread creation. */ + if (time_since_last_thread_created > + microsecond_throttling_interval(*thread_group)) { + DBUG_RETURN(create_worker(thread_group, due_to_stall)); + } + + TP_INCREMENT_GROUP_COUNTER(thread_group, throttles); + DBUG_RETURN(-1); +} + +static int thread_group_init(thread_group_t *thread_group, + pthread_attr_t *thread_attr) noexcept { + DBUG_ENTER("thread_group_init"); + thread_group->pthread_attr = thread_attr; + mysql_mutex_init(key_group_mutex, &thread_group->mutex, nullptr); + thread_group->pollfd = -1; + thread_group->shutdown_pipe[0] = -1; + thread_group->shutdown_pipe[1] = -1; + thread_group->thread_count = 0; + thread_group->admin_port_thread_count = 0; + thread_group->dump_thread_count = 0; + thread_group->active_thread_count = 0; + thread_group->connection_count = 0; + thread_group->waiting_thread_count = 0; + thread_group->io_event_count = 0; + thread_group->queue_event_count = 0; + thread_group->shutdown = false; + thread_group->stalled = false; + DBUG_RETURN(0); +} + +static void thread_group_destroy(thread_group_t *thread_group) noexcept { + mysql_mutex_destroy(&thread_group->mutex); + if (thread_group->pollfd != -1) { + close(thread_group->pollfd); + thread_group->pollfd = -1; + } + for (int i = 0; i < 2; i++) { + if (thread_group->shutdown_pipe[i] != -1) { + close(thread_group->shutdown_pipe[i]); + thread_group->shutdown_pipe[i] = -1; + } + } +} + +/** + Wake sleeping thread from waiting list +*/ +static int wake_thread(thread_group_t *thread_group, bool due_to_stall) noexcept { + DBUG_ENTER("wake_thread"); + worker_thread_t *thread = thread_group->waiting_threads.front(); + if (thread) { + thread->woken = true; + thread_group->waiting_threads.remove(thread); + mysql_cond_signal(&thread->cond); + TP_INCREMENT_GROUP_COUNTER(thread_group, wakes); + if (due_to_stall) { + TP_INCREMENT_GROUP_COUNTER(thread_group, wakes_due_to_stall); + } + DBUG_RETURN(0); + } + DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */ +} + +/** + Shutdown for thread group +*/ +static void thread_group_close(thread_group_t *thread_group) noexcept { + DBUG_ENTER("thread_group_close"); + + mysql_mutex_lock(&thread_group->mutex); + if (thread_group->thread_count == 0) { + mysql_mutex_unlock(&thread_group->mutex); + thread_group_destroy(thread_group); + DBUG_VOID_RETURN; + } + + thread_group->shutdown = true; + thread_group->listener = nullptr; + + if (pipe(thread_group->shutdown_pipe)) { + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; + } + + /* Wake listener */ + if (io_poll_associate_fd(thread_group->pollfd, + thread_group->shutdown_pipe[0], nullptr)) { + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; + } + char c = 0; + if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) { + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; + } + + /* Wake all workers. */ + while (wake_thread(thread_group, false) == 0) { + } + + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + +/* + Add work to the queue. Maybe wake a worker if they all sleep. + + Currently, this function is only used when new connections need to + perform login (this is done in worker threads). +*/ +static void queue_put(thread_group_t *thread_group, connection_t *connection) { + DBUG_ENTER("queue_put"); + + mysql_mutex_lock(&thread_group->mutex); + connection->tickets = tp_get_thdvar_high_prio_tickets(connection->thd); + connection->enqueue_time = pool_timer.current_microtime; + + queue_push(thread_group, connection); + + /* In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. */ + if (thread_group->active_thread_count < + 1 + (int)threadpool_oversubscribe) { + wake_or_create_thread(thread_group, false); + } + + mysql_mutex_unlock(&thread_group->mutex); + + DBUG_VOID_RETURN; +} + +/** + Retrieve a connection with pending event. + + Pending event in our case means that there is either a pending login request + (if connection is not yet logged in), or there are unread bytes on the socket. + + If there are no pending events currently, thread will wait. + If timeout specified in abstime parameter passes, the function returns nullptr. + + @param current_thread - current worker thread + @param thread_group - current thread group + @param abstime - absolute wait timeout + + @return + connection with pending event. + nullptr is returned if timeout has expired,or on shutdown. +*/ +static connection_t *get_event(worker_thread_t *current_thread, + thread_group_t *thread_group, + struct timespec *abstime) { + DBUG_ENTER("get_event"); + connection_t *connection = nullptr; + int err = 0; + + mysql_mutex_lock(&thread_group->mutex); + assert(thread_group->active_thread_count >= 0); + + for (;;) { + const bool oversubscribed = too_many_active_threads(*thread_group); + if (thread_group->shutdown) break; + + /* Check if queue is not empty */ + if (!oversubscribed) { + connection = queue_get(thread_group, WORKER); + if (connection) break; + } + + /* If there is currently no listener in the group, become one. */ + if (!thread_group->listener) { + thread_group->listener = current_thread; + thread_group->active_thread_count--; + mysql_mutex_unlock(&thread_group->mutex); + + connection = listener(thread_group); + + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count++; + /* There is no listener anymore, it just returned. */ + thread_group->listener = nullptr; + break; + } + + /* + Last thing we try before going to sleep is to + pick a single event via epoll, without waiting (timeout 0) + */ + if (!oversubscribed) { + native_event nev; + if (io_poll_wait(thread_group->pollfd, &nev, 1, 0) == 1) { + thread_group->io_event_count++; + TP_INCREMENT_GROUP_COUNTER(thread_group, polls[WORKER]); + connection = (connection_t *)native_event_get_userdata(&nev); + + /* + Since we are going to perform an out-of-order event processing for the + connection, first check whether it is eligible for high priority + processing. We can get here even if there are queued events, so it + must either have a high priority ticket, or there must be not too many + busy threads (as if it was coming from a low priority queue). + */ + if (connection_is_high_prio(*connection)) + connection->tickets--; + else if (too_many_busy_threads(*thread_group)) { + /* + Not eligible for high priority processing. Restore tickets and put + it into the low priority queue. + */ + connection->tickets = tp_get_thdvar_high_prio_tickets(connection->thd); + thread_group->queue.push_back(connection); + connection = nullptr; + } + + if (connection) { + TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues[WORKER]); + thread_group->queue_event_count++; + break; + } + } + } + + /* And now, finally sleep */ + current_thread->woken = false; /* wake() sets this to true */ + + /* + Add current thread to the head of the waiting list and wait. + It is important to add thread to the head rather than tail + as it ensures LIFO wakeup order (hot caches, working inactivity timeout) + */ + thread_group->waiting_threads.push_front(current_thread); + + thread_group->active_thread_count--; + if (abstime) { + err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex, + abstime); + } else { + err = mysql_cond_wait(¤t_thread->cond, &thread_group->mutex); + } + thread_group->active_thread_count++; + + if (!current_thread->woken) { + /* + Thread was not signalled by wake(), it might be a spurious wakeup or + a timeout. Anyhow, we need to remove ourselves from the list now. + If thread was explicitly woken, than caller removed us from the list. + */ + thread_group->waiting_threads.remove(current_thread); + } + + if (err) break; + } + + thread_group->stalled = false; + mysql_mutex_unlock(&thread_group->mutex); + + DBUG_RETURN(connection); +} -- Gitee