diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc new file mode 100644 index 0000000000000000000000000000000000000000..82112ae44dce2f1ea5b939e012db07c23a888557 --- /dev/null +++ b/plugin/thread_pool/threadpool_unix.cc @@ -0,0 +1,778 @@ +/* Copyright (C) 2012 Monty Program Ab + Copyright (C) 2022 Huawei Technologies Co., Ltd + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include "threadpool_unix.h" +#include "sql/debug_sync.h" +#include "sql/log.h" +#include "sql/protocol_classic.h" +#include "my_sys.h" +#include "my_systime.h" +#include "mysql/thread_pool_priv.h" // thd_is_transaction_active() +#include "mysql/plugin.h" +#include "threadpool.h" +#include +#include + +#define MYSQL_SERVER 1 + +/** Maximum number of native events a listener can read in one go */ +#define MAX_EVENTS 1024 + +/** Define if wait_begin() should create threads if necessary without waiting +for stall detection to kick in */ +#define THREADPOOL_CREATE_THREADS_ON_WAIT + +/** Indicates that threadpool was initialized*/ +static bool threadpool_started = false; + +/* + Define PSI Keys for performance schema. + We have a mutex per group, worker threads, condition per worker thread, + and timer thread with its own mutex and condition. +*/ + +#ifdef HAVE_PSI_INTERFACE +static PSI_mutex_key key_group_mutex; +static PSI_mutex_key key_timer_mutex; +static PSI_mutex_info mutex_list[] = { + {&key_group_mutex, "group_mutex", 0, 0, PSI_DOCUMENT_ME}, + {&key_timer_mutex, "timer_mutex", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}}; + +static PSI_cond_key key_worker_cond; +static PSI_cond_key key_timer_cond; +static PSI_cond_info cond_list[] = { + {&key_worker_cond, "worker_cond", 0, 0, PSI_DOCUMENT_ME}, + {&key_timer_cond, "timer_cond", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}}; + +static PSI_thread_key key_worker_thread; +static PSI_thread_key key_timer_thread; +static PSI_thread_info thread_list[] = { + {&key_worker_thread, "worker_thread", 0, 0, PSI_DOCUMENT_ME}, + {&key_timer_thread, "timer_thread", PSI_FLAG_SINGLETON, 0, + PSI_DOCUMENT_ME}}; +#endif // HAVE_PSI_INTERFACE + +thread_group_t all_groups[MAX_THREAD_GROUPS]; +numa_affinity_manager group_affinity; + +static uint group_count; + +/** + Used for printing "pool blocked" message, see + print_pool_blocked_message(); +*/ +static ulonglong pool_block_start; + +/* Global timer for all groups */ +struct pool_timer_t { + mysql_mutex_t mutex; + mysql_cond_t cond; + std::atomic current_microtime; + std::atomic next_timeout_check; + int tick_interval; + bool shutdown; +}; + +static pool_timer_t pool_timer; + +static void queue_put(thread_group_t *thread_group, connection_t *connection); +static int wake_thread(thread_group_t *thread_group, + bool due_to_stall) noexcept; +static void handle_event(connection_t *connection); +static int wake_or_create_thread(thread_group_t *thread_group, + bool due_to_stall = false); +static int create_worker(thread_group_t *thread_group, bool due_to_stall) noexcept; +static void *admin_port_worker_main(void *param); +static void *worker_main(void *param); +static void *connection_detach_worker(void *param); +static void check_stall(thread_group_t *thread_group); +static void connection_abort(connection_t *connection); +static void set_next_timeout_check(ulonglong abstime); +static void print_pool_blocked_message(bool) noexcept; + +THD *thd_to_detach = nullptr; + +class ThreadPoolConnSet { +public: + ThreadPoolConnSet() {}; + virtual ~ThreadPoolConnSet() {}; + + bool empty() { + bool ret = false; + mtx.lock(); + ret = conns.empty(); + mtx.unlock(); + return ret; + } + + void killConns() { + mtx.lock(); + for (auto &it: conns) { + THD *thd = it->thd; + if (current_thd != thd && thd->killed != THD::KILL_CONNECTION) { + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->killed = THD::KILL_CONNECTION; + tp_post_kill_notification(thd); + mysql_mutex_unlock(&thd->LOCK_thd_data); + } else if (current_thd == thd) { + thd_to_detach = thd; + } + } + mtx.unlock(); + } + + void insert(connection_t *c) { + mtx.lock(); + conns.insert(c); + mtx.unlock(); + } + + void erase(connection_t *c) { + mtx.lock(); + conns.erase(c); + mtx.unlock(); + } + +public: + std::set conns; + std::mutex mtx; +}; + +ThreadPoolConnSet threadpool_thds; + +int vio_cancel(Vio *vio, int how) +{ + int r= 0; + DBUG_ENTER("vio_cancel"); + + if (vio->inactive == false) + { + assert(vio->type == VIO_TYPE_TCPIP || + vio->type == VIO_TYPE_SOCKET || + vio->type == VIO_TYPE_SSL); + + assert(mysql_socket_getfd(vio->mysql_socket) >= 0); + if (mysql_socket_shutdown(vio->mysql_socket, how)) + r= -1; + } + + DBUG_RETURN(r); +} + +/** + Asynchronous network IO. + + We use native edge-triggered network IO multiplexing facility. + This maps to different APIs on different Unixes. + + Supported are currently Linux with epoll, Solaris with event ports, + OSX and BSD with kevent. All those API's are used with one-shot flags + (the event is signalled once client has written something into the socket, + then socket is removed from the "poll-set" until the command is finished, + and we need to re-arm/re-register socket) + + No implementation for poll/select/AIO is currently provided. + + The API closely resembles all of the above mentioned platform APIs + and consists of following functions. + + - io_poll_create() + Creates an io_poll descriptor + On Linux: epoll_create() + + - io_poll_associate_fd(int poll_fd, int fd, void *data) + Associate file descriptor with io poll descriptor + On Linux : epoll_ctl(..EPOLL_CTL_ADD)) + + - io_poll_disassociate_fd(int pollfd, int fd) + Associate file descriptor with io poll descriptor + On Linux: epoll_ctl(..EPOLL_CTL_DEL) + + + - io_poll_start_read(int poll_fd,int fd, void *data) + The same as io_poll_associate_fd(), but cannot be used before + io_poll_associate_fd() was called. + On Linux : epoll_ctl(..EPOLL_CTL_MOD) + + - io_poll_wait (int pollfd, native_event *native_events, int maxevents, + int timeout_ms) + + wait until one or more descriptors added with io_poll_associate_fd() + or io_poll_start_read() becomes readable. Data associated with + descriptors can be retrieved from native_events array, using + native_event_get_userdata() function. + + + On Linux: epoll_wait() +*/ + +#if defined(__linux__) +#ifndef EPOLLRDHUP +/* Early 2.6 kernel did not have EPOLLRDHUP */ +#define EPOLLRDHUP 0 +#endif +static int io_poll_create() noexcept { return epoll_create(1); } + +static int io_poll_associate_fd(int pollfd, int fd, void *data) noexcept { + struct epoll_event ev; + ev.data.u64 = 0; /* Keep valgrind happy */ + ev.data.ptr = data; + ev.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLRDHUP | EPOLLONESHOT; + return epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &ev); +} + +static int io_poll_start_read(int pollfd, int fd, void *data) noexcept { + struct epoll_event ev; + ev.data.u64 = 0; /* Keep valgrind happy */ + ev.data.ptr = data; + ev.events = EPOLLIN | EPOLLET | EPOLLERR | EPOLLRDHUP | EPOLLONESHOT; + return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev); +} + +static int io_poll_disassociate_fd(int pollfd, int fd) noexcept { + struct epoll_event ev; + return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev); +} + +/* + Wrapper around epoll_wait. + NOTE - in case of EINTR, it restarts with original timeout. Since we use + either infinite or 0 timeouts, this is not critical +*/ +static int io_poll_wait(int pollfd, native_event *native_events, int maxevents, + int timeout_ms) noexcept { + int ret; + do { + ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms); + } while (ret == -1 && errno == EINTR); + return ret; +} + +static void *native_event_get_userdata(native_event *event) noexcept { + return event->data.ptr; +} + +#elif defined(__FreeBSD__) || defined(__APPLE__) +static int io_poll_create() noexcept { return kqueue(); } + +static int io_poll_start_read(int pollfd, int fd, void *data) noexcept { + struct kevent ke; + EV_SET(&ke, fd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, data); + return kevent(pollfd, &ke, 1, 0, 0, 0); +} + +static int io_poll_associate_fd(int pollfd, int fd, void *data) noexcept { + struct kevent ke; + EV_SET(&ke, fd, EVFILT_READ, EV_ADD | EV_ONESHOT, 0, 0, data); + return io_poll_start_read(pollfd, fd, data); +} + +static int io_poll_disassociate_fd(int pollfd, int fd) noexcept { + struct kevent ke; + EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + return kevent(pollfd, &ke, 1, 0, 0, 0); +} + +static int io_poll_wait(int pollfd, struct kevent *events, int maxevents, + int timeout_ms) noexcept { + struct timespec ts; + int ret; + if (timeout_ms >= 0) { + ts.tv_sec = timeout_ms / 1000; + ts.tv_nsec = (timeout_ms % 1000) * 1000000; + } + do { + ret = kevent(pollfd, 0, 0, events, maxevents, + (timeout_ms >= 0) ? &ts : nullptr); + } while (ret == -1 && errno == EINTR); + return ret; +} + +static void *native_event_get_userdata(native_event *event) noexcept { + return event->udata; +} +#else +#error not ported yet to this OS +#endif + +namespace { + +/* + Prevent too many active threads executing at the same time, if the workload is + not CPU bound. +*/ +inline bool too_many_active_threads( + const thread_group_t &thread_group) noexcept { + return (thread_group.active_thread_count >= + 1 + (int)threadpool_oversubscribe && + !thread_group.stalled); +} + +/* + Limit the number of 'busy' threads by 1 + threadpool_toobusy. A thread + is busy if it is in either the active state or the waiting state (i.e. between + thd_wait_begin() / thd_wait_end() calls). +*/ +inline bool too_many_busy_threads(const thread_group_t &thread_group) noexcept { + return (thread_group.active_thread_count + thread_group.waiting_thread_count > + 1 + (int)threadpool_toobusy); +} + +inline bool too_many_connection(const thread_group_t &thread_group) noexcept { + return (thread_group.connection_count > (int)threadpool_toobusy - 1); +} + +/* + Checks if a given connection is eligible to enter the high priority queue + based on its current thread_pool_high_prio_mode value, available high + priority tickets and transactional state and whether any locks are held. +*/ +inline bool connection_is_high_prio(const connection_t &c) noexcept { + const ulong mode = tp_get_thdvar_high_prio_mode(c.thd); + + return (mode == TP_HIGH_PRIO_MODE_STATEMENTS) || + (mode == TP_HIGH_PRIO_MODE_TRANSACTIONS && c.tickets > 0 && + (thd_is_transaction_active(c.thd) || + c.thd->variables.option_bits & OPTION_TABLE_LOCK || + c.thd->locked_tables_mode != LTM_NONE || + c.thd->mdl_context.has_locks() || + c.thd->global_read_lock.is_acquired() || + c.thd->mdl_context.has_locks(MDL_key::USER_LEVEL_LOCK) || + c.thd->mdl_context.has_locks(MDL_key::LOCKING_SERVICE))); +} + +inline bool connection_is_worker_continue(const connection_t &c) noexcept { + if (c.thd->is_admin_connection()) { + return true; + } + + if (c.thread_group != &all_groups[c.thd->thread_id() % group_count]) { + return false; + } + + if (!too_many_connection(*(c.thread_group))) { + return true; + } + + const ulong mode = tp_get_thdvar_high_prio_mode(c.thd); + bool ret = (mode == TP_HIGH_PRIO_MODE_TRANSACTIONS && c.tickets > 0 && + (thd_is_transaction_active(c.thd) || + c.thd->variables.option_bits & OPTION_TABLE_LOCK || + c.thd->locked_tables_mode != LTM_NONE || + c.thd->mdl_context.has_locks() || + c.thd->global_read_lock.is_acquired() || + c.thd->mdl_context.has_locks(MDL_key::USER_LEVEL_LOCK) || + c.thd->mdl_context.has_locks(MDL_key::LOCKING_SERVICE))); + return ret; +} + +} // namespace + +/* Dequeue element from a workqueue */ +static connection_t *queue_get(thread_group_t *thread_group) noexcept { + DBUG_ENTER("queue_get"); + thread_group->queue_event_count++; + connection_t *c; + + if ((c = thread_group->high_prio_queue.front())) { + thread_group->high_prio_queue.remove(c); + } + /* + Don't pick events from the low priority queue if there are too many + active + waiting threads. + */ + else if (!too_many_busy_threads(*thread_group) && + (c = thread_group->queue.front())) { + thread_group->queue.remove(c); + } + DBUG_RETURN(c); +} + +static connection_t *queue_get(thread_group_t *group, operation_origin origin) { + connection_t *ret = queue_get(group); + if (ret != nullptr) { + TP_INCREMENT_GROUP_COUNTER(group, dequeues[(int)origin]); + } + return ret; +} + +static inline void queue_push(thread_group_t *thread_group, connection_t *connection) +{ + connection->enqueue_time= pool_timer.current_microtime; + thread_group->queue.push_back(connection); +} + +static inline void high_prio_queue_push(thread_group_t *thread_group, connection_t *connection) +{ + connection->enqueue_time= pool_timer.current_microtime; + thread_group->high_prio_queue.push_back(connection); +} + +class Thd_timeout_checker : public Do_THD_Impl { + private: + pool_timer_t *const m_timer; + + public: + Thd_timeout_checker(pool_timer_t *timer) noexcept : m_timer(timer) {} + + virtual ~Thd_timeout_checker() {} + + virtual void operator()(THD *thd) noexcept { + if (thd_get_net_read_write(thd) != 1) return; + + connection_t *connection = (connection_t *)thd->scheduler.data; + if (!connection) return; + + if (connection->abs_wait_timeout < + m_timer->current_microtime.load(std::memory_order_relaxed)) { + /* Wait timeout exceeded, kill connection. */ + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->killed = THD::KILL_CONNECTION; + tp_post_kill_notification(thd); + mysql_mutex_unlock(&thd->LOCK_thd_data); + } else { + set_next_timeout_check(connection->abs_wait_timeout); + } + } +}; + +/* + Handle wait timeout : + Find connections that have been idle for too long and kill them. + Also, recalculate time when next timeout check should run. +*/ +static void timeout_check(pool_timer_t *timer) { + DBUG_ENTER("timeout_check"); + + /* Reset next timeout check, it will be recalculated in the loop below */ + timer->next_timeout_check.store(ULLONG_MAX, std::memory_order_relaxed); + + Thd_timeout_checker thd_timeout_checker(timer); + Global_THD_manager::get_instance()->do_for_all_thd_copy(&thd_timeout_checker); + + DBUG_VOID_RETURN; +} + +/* + Timer thread. + + Periodically, check if one of the thread groups is stalled. Stalls happen if + events are not being dequeued from the queue, or from the network, Primary + reason for stall can be a lengthy executing non-blocking request. It could + also happen that thread is waiting but wait_begin/wait_end is forgotten by + storage engine. Timer thread will create a new thread in group in case of + a stall. + + Besides checking for stalls, timer thread is also responsible for terminating + clients that have been idle for longer than wait_timeout seconds. + + TODO: Let the timer sleep for long time if there is no work to be done. + Currently it wakes up rather often on and idle server. +*/ +static void *timer_thread(void *param) noexcept { + my_thread_init(); + DBUG_ENTER("timer_thread"); + + pool_timer_t *timer = (pool_timer_t *)param; + timer->next_timeout_check.store(ULLONG_MAX, std::memory_order_relaxed); + timer->current_microtime.store(my_microsecond_getsystime(), + std::memory_order_relaxed); + + for (;;) { + struct timespec ts; + + set_timespec_nsec(&ts, timer->tick_interval * 1000000ULL); + mysql_mutex_lock(&timer->mutex); + int err = mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts); + if (timer->shutdown) { + mysql_mutex_unlock(&timer->mutex); + break; + } + if (err == ETIMEDOUT) { + timer->current_microtime.store(my_microsecond_getsystime(), + std::memory_order_relaxed); + + /* Check stalls in thread groups */ + for (size_t i = 0; i < array_elements(all_groups); i++) { + if (all_groups[i].connection_count) check_stall(&all_groups[i]); + } + + /* Check if any client exceeded wait_timeout */ + if (timer->next_timeout_check.load(std::memory_order_relaxed) <= + timer->current_microtime.load(std::memory_order_relaxed)) + timeout_check(timer); + } + mysql_mutex_unlock(&timer->mutex); + } + + mysql_mutex_destroy(&timer->mutex); + my_thread_end(); + return nullptr; +} + +/* + Check if both the high and low priority queues are empty. + + NOTE: we also consider the low priority queue empty in case it has events, but + they cannot be processed due to the too_many_busy_threads() limit. +*/ +static bool queues_are_empty(const thread_group_t &tg) noexcept { + return (tg.high_prio_queue.is_empty() && + (tg.queue.is_empty() || too_many_busy_threads(tg))); +} + +static void check_stall(thread_group_t *thread_group) { + if (mysql_mutex_trylock(&thread_group->mutex) != 0) { + /* Something happens. Don't disturb */ + return; + } + + /* + Check if listener is present. If not, check whether any IO + events were dequeued since last time. If not, this means + listener is either in tight loop or thd_wait_begin() + was forgotten. Create a new worker(it will make itself listener). + */ + if (!thread_group->listener && !thread_group->io_event_count) { + wake_or_create_thread(thread_group, true); + mysql_mutex_unlock(&thread_group->mutex); + return; + } + + /* Reset io event count */ + thread_group->io_event_count = 0; + + /* + Check whether requests from the workqueues are being dequeued. + + The stall detection and resolution works as follows: + + 1. There is a counter thread_group->queue_event_count for the number of + events removed from the queues. Timer resets the counter to 0 on each + run. + 2. Timer determines stall if this counter remains 0 since last check + and at least one of the high and low priority queues is not empty. + 3. Once timer determined a stall it sets thread_group->stalled flag and + wakes and idle worker (or creates a new one, subject to throttling). + 4. The stalled flag is reset, when an event is dequeued. + + Q : Will this handling lead to an unbound growth of threads, if queues + stall permanently? + A : No. If queues stall permanently, it is an indication for many very long + simultaneous queries. The maximum number of simultanoues queries is + max_connections, further we have threadpool_max_threads limit, upon which no + worker threads are created. So in case there is a flood of very long + queries, threadpool would slowly approach thread-per-connection behavior. + NOTE: + If long queries never wait, creation of the new threads is done by timer, + so it is slower than in real thread-per-connection. However if long queries + do wait and indicate that via thd_wait_begin/end callbacks, thread creation + will be faster. + */ + if (!thread_group->queue_event_count && !queues_are_empty(*thread_group)) { + thread_group->stalled = true; + TP_INCREMENT_GROUP_COUNTER(thread_group, stalls); + wake_or_create_thread(thread_group, true); + } + + /* Reset queue event count */ + thread_group->queue_event_count = 0; + + mysql_mutex_unlock(&thread_group->mutex); +} + +static void start_timer(pool_timer_t *timer) noexcept { + my_thread_handle thread_id; + DBUG_ENTER("start_timer"); + mysql_mutex_init(key_timer_mutex, &timer->mutex, nullptr); + mysql_cond_init(key_timer_cond, &timer->cond); + timer->shutdown = false; + mysql_thread_create(key_timer_thread, &thread_id, nullptr, timer_thread, timer); + DBUG_VOID_RETURN; +} + +static void stop_timer(pool_timer_t *timer) noexcept { + DBUG_ENTER("stop_timer"); + mysql_mutex_lock(&timer->mutex); + timer->shutdown = true; + mysql_cond_signal(&timer->cond); + mysql_mutex_unlock(&timer->mutex); + DBUG_VOID_RETURN; +} + +/** + Poll for socket events and distribute them to worker threads + In many case current thread will handle single event itself. + + @return a ready connection, or NULL on shutdown +*/ +static connection_t *listener(thread_group_t *thread_group) { + DBUG_ENTER("listener"); + connection_t *retval = nullptr; + + for (;;) { + if (thread_group->shutdown) break; + + native_event ev[MAX_EVENTS]; + int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1); + + DBUG_EXECUTE_IF("threadpool_io_poll_wait_at_least_2_events", + { + while (cnt < 2) + { + int cnt_again = io_poll_wait(thread_group->pollfd, ev + cnt, MAX_EVENTS - cnt, -1); + cnt += cnt_again; + } + } + ); + + TP_INCREMENT_GROUP_COUNTER(thread_group, polls[LISTENER]); + if (cnt <= 0) { + assert(thread_group->shutdown); + break; + } + + mysql_mutex_lock(&thread_group->mutex); + + if (thread_group->shutdown) { + mysql_mutex_unlock(&thread_group->mutex); + break; + } + + thread_group->io_event_count += cnt; + + /* + We got some network events and need to make decisions : whether + listener hould handle events and whether or not any wake worker + threads so they can handle events. + + Q1 : Should listener handle an event itself, or put all events into + queue and let workers handle the events? + + Solution : + Generally, listener that handles events itself is preferable. We do not + want listener thread to change its state from waiting to running too + often, Since listener has just woken from poll, it better uses its time + slice and does some work. Besides, not handling events means they go to + the queue, and often to wake another worker must wake up to handle the + event. This is not good, as we want to avoid wakeups. + + The downside of listener that also handles queries is that we can + potentially leave thread group for long time not picking the new + network events. It is not a major problem, because this stall will be + detected sooner or later by the timer thread. Still, relying on timer + is not always good, because it may "tick" too slow (large timer_interval) + + We use following strategy to solve this problem - if queue was not empty + we suspect flood of network events and listener stays, Otherwise, it + handles a query. + + + Q2: If queue is not empty, how many workers to wake? + + Solution: + We generally try to keep one thread per group active (threads handling + queries are considered active, unless they stuck in inside some "wait") + Thus, we will wake only one worker, and only if there is not active + threads currently,and listener is not going to handle a query. When we + don't wake, we hope that currently active threads will finish fast and + handle the queue. If this does not happen, timer thread will detect stall + and wake a worker. + + NOTE: Currently nothing is done to detect or prevent long queuing times. + A solutionc for the future would be to give up "one active thread per + group" principle, if events stay in the queue for too long, and just wake + more workers. + */ + + const bool listener_picks_event = threadpool_dedicated_listener? false : + (thread_group->high_prio_queue.is_empty() && thread_group->queue.is_empty()); + + /* + If listener_picks_event is set, listener thread will handle first event, + and put the rest into the queue. If listener_pick_event is not set, all + events go to the queue. + */ + for (int i = (listener_picks_event) ? 1 : 0; i < cnt; i++) { + connection_t *c = (connection_t *)native_event_get_userdata(&ev[i]); + if (connection_is_high_prio(*c)) { + c->tickets--; + thread_group->high_prio_queue.push_back(c); + } else { + c->tickets = tp_get_thdvar_high_prio_tickets(c->thd); + queue_push(thread_group, c); + } + } + + if (listener_picks_event) { + /* Handle the first event. */ + retval = (connection_t *)native_event_get_userdata(&ev[0]); + TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues[LISTENER]); + mysql_mutex_unlock(&thread_group->mutex); + break; + } + + /* The remaining threads can be created at most */ + int workers_in_need = (int)threadpool_toobusy - + thread_group->active_thread_count - thread_group->waiting_thread_count; + + /* There are no remaining threads and the thread group is stalled */ + if (workers_in_need <= 0 && thread_group->active_thread_count == 0) { + workers_in_need = 1; + } + + /* The number of threads that can be created and + the number of threads that are really needed, whichever is smaller */ + workers_in_need = workers_in_need > cnt ? cnt : workers_in_need; + + /* Wake up or create the required threads */ + for (int i = 0; i < workers_in_need; i++) { + /* We added some work items to queue, now wake a worker. */ + if (wake_thread(thread_group, false)) { + /* + Wake failed, hence groups has no idle threads. Now check if there are + any threads in the group except listener. + In order to achieve the best running performance of the + number of threads, the conditions for the wake-up or + creation of worker threads are relaxed. + The queue is not empty, and listener is not going to handle + events. In order to drain the queue, we create a worker here. + Alternatively, we could just rely on timer to detect stall, and + create thread, but waiting for timer would be an inefficient and + pointless delay. + */ + create_worker(thread_group, false); + } + } + mysql_mutex_unlock(&thread_group->mutex); + } + DBUG_RETURN(retval); +} + +/** + Adjust thread counters in group or global + whenever thread is created or is about to exit + + @param thread_group + @param count - 1, when new thread is created + -1, when thread is about to exit +*/ +static void add_thread_count(thread_group_t *thread_group, + int32 count) noexcept { + thread_group->thread_count += count; + /* worker starts out and end in "active" state */ + thread_group->active_thread_count += count; + tp_stats.num_worker_threads.fetch_add(count, std::memory_order_relaxed); +} diff --git a/plugin/thread_pool/threadpool_unix.h b/plugin/thread_pool/threadpool_unix.h new file mode 100644 index 0000000000000000000000000000000000000000..3c561f2da75484170dd86029185be7e786973c93 --- /dev/null +++ b/plugin/thread_pool/threadpool_unix.h @@ -0,0 +1,135 @@ +/* Copyright (C) 2012 Monty Program Ab + Copyright (C) 2022 Huawei Technologies Co., Ltd + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, + USA */ + +#ifndef THREADPOOL_UNIX_H_ +#define THREADPOOL_UNIX_H_ + +#include "mysql/service_thd_wait.h" +#include "sql/sql_plist.h" +#include "sql/mysqld.h" +#include "threadpool.h" +#include "violite.h" +#include "numa_affinity_manager.h" + +#ifdef __linux__ +#include +typedef struct epoll_event native_event; +#endif +#if defined(__FreeBSD__) || defined(__APPLE__) +#include +typedef struct kevent native_event; +#endif +#if defined(__sun) +#include +typedef port_event_t native_event; +#endif + +#define my_microsecond_getsystime() (my_getsystime()/10) + +struct thread_group_t; + +/* Per-thread structure for workers */ +struct worker_thread_t { + ulonglong event_count; /* number of request handled by this thread */ + thread_group_t *thread_group; + worker_thread_t *next_in_list; + worker_thread_t **prev_in_list; + + mysql_cond_t cond; + bool woken; +}; + +typedef I_P_List< + worker_thread_t, + I_P_List_adapter> + worker_list_t; + +struct connection_t { + THD *thd; + thread_group_t *thread_group; + connection_t *next_in_queue; + connection_t **prev_in_queue; + ulonglong abs_wait_timeout; + ulonglong enqueue_time; + bool logged_in; + bool bound_to_poll_descriptor; + bool waiting; + uint tickets; +}; + +typedef I_P_List, + I_P_List_counter, I_P_List_fast_push_back> + connection_queue_t; + +const int NQUEUES = 2; /* We have high and low priority queues */ + +enum operation_origin +{ + WORKER, + LISTENER +}; + +struct thread_group_counters_t +{ + ulonglong thread_creations; + ulonglong thread_creations_due_to_stall; + ulonglong wakes; + ulonglong wakes_due_to_stall; + ulonglong throttles; + ulonglong stalls; + ulonglong dequeues[2]; + ulonglong polls[2]; +}; + +struct alignas(128) thread_group_t { + mysql_mutex_t mutex; + connection_queue_t queue; + connection_queue_t high_prio_queue; + worker_list_t waiting_threads; + worker_thread_t *listener; + pthread_attr_t *pthread_attr; + int pollfd; + int thread_count; + int admin_port_thread_count; + int dump_thread_count; + int active_thread_count; + int connection_count; + int waiting_thread_count; + /* Stats for the deadlock detection timer routine.*/ + int io_event_count; + int queue_event_count; + ulonglong last_thread_creation_time; + int shutdown_pipe[2]; + bool shutdown; + bool stalled; + thread_group_counters_t counters; + char padding[320 - sizeof(thread_group_counters_t)]; +}; + +static_assert(sizeof(thread_group_t) == 512, + "sizeof(thread_group_t) must be 512 to avoid false sharing"); + +#define TP_INCREMENT_GROUP_COUNTER(group, var) do {group->counters.var++;}while(0) + +extern thread_group_t all_groups[MAX_THREAD_GROUPS]; +extern numa_affinity_manager group_affinity; + +#endif // THREADPOOL_UNIX_H_ +