diff --git a/plugin/thread_pool/threadpool.h b/plugin/thread_pool/threadpool.h index 0dc80e019c6632e34af65020acab6e5c727b6f6f..38fdee21deec1126d777abedde1e1374106cc145 100644 --- a/plugin/thread_pool/threadpool.h +++ b/plugin/thread_pool/threadpool.h @@ -62,6 +62,7 @@ extern bool tp_add_connection(Channel_info *); extern void tp_end(void); extern void tp_fake_end(void); extern void threadpool_remove_connection(THD *thd); +extern bool thread_attach(THD *thd); extern THD_event_functions tp_event_functions; diff --git a/plugin/thread_pool/threadpool_common.cc b/plugin/thread_pool/threadpool_common.cc index 95bf6bb45b1536542f3c7c05d6ea7727428a3047..a86fcad243fa1bb11348021b3d29ee374f3ad22a 100644 --- a/plugin/thread_pool/threadpool_common.cc +++ b/plugin/thread_pool/threadpool_common.cc @@ -104,7 +104,7 @@ class Worker_thread_context { /* Attach/associate the connection with the OS thread, */ -static bool thread_attach(THD *thd) { +bool thread_attach(THD *thd) { #ifndef NDEBUG set_my_thread_var_id(thd->thread_id()); #endif diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index 5ae06cfc7a38009954aa7caf695d717b90923765..9fca0207452a091f16208f8c60a314e9f5cabd50 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -94,11 +94,14 @@ static int wake_or_create_thread(thread_group_t *thread_group, static int create_worker(thread_group_t *thread_group, bool due_to_stall) noexcept; static void *admin_port_worker_main(void *param); static void *worker_main(void *param); +static void *connection_detach_worker(void *param); static void check_stall(thread_group_t *thread_group); static void connection_abort(connection_t *connection); static void set_next_timeout_check(ulonglong abstime); static void print_pool_blocked_message(bool) noexcept; +THD *thd_to_detach = nullptr; + class ThreadPoolConnSet { public: ThreadPoolConnSet() {}; @@ -121,6 +124,8 @@ public: thd->killed = THD::KILL_CONNECTION; tp_post_kill_notification(thd); mysql_mutex_unlock(&thd->LOCK_thd_data); + } else if (current_thd == thd) { + thd_to_detach = thd; } } mtx.unlock(); @@ -1246,6 +1251,57 @@ static void connection_abort(connection_t *connection) { DBUG_VOID_RETURN; } +/** + Detach connection. +*/ +static void connection_detach(connection_t *connection) { + DBUG_ENTER("connection_detach"); + threadpool_thds.erase(connection); + + thread_group_t *group = connection->thread_group; + bool is_admin_port = connection->thd->is_admin_connection(); + Vio *const vio = connection->thd->get_protocol_classic()->get_vio(); + const int fd = mysql_socket_getfd(vio->mysql_socket); + mysql_mutex_lock(&group->mutex); + io_poll_disassociate_fd(group->pollfd, fd); + connection->bound_to_poll_descriptor = false; + mysql_mutex_unlock(&group->mutex); + + if (!is_admin_port) { + mysql_mutex_lock(&group->mutex); + group->connection_count--; + mysql_mutex_unlock(&group->mutex); + } + + my_thread_handle thread_id; + + if (mysql_thread_create(key_worker_thread, &thread_id, group->pthread_attr, + connection_detach_worker, connection->thd)) { + threadpool_remove_connection(connection->thd); + } + + my_free(connection); + DBUG_VOID_RETURN; +} + + +static void *connection_detach_worker(void *param) { + my_thread_init(); + DBUG_ENTER("connection_detach_worker"); + THD *thd = static_cast(param); + assert(thd != nullptr); + thread_attach(thd); + + while (1) { + if (threadpool_process_request(thd)) { + break; + } + } + + threadpool_remove_connection(thd); + return nullptr; +} + /** MySQL scheduler callback : kill connection */ @@ -1425,10 +1481,15 @@ static void handle_event(connection_t *connection) { if (err) { goto end; } + + if (connection->thd == thd_to_detach) { + connection_detach(connection); + goto end_return; + } set_wait_timeout(connection); - if (connection->thd->is_admin_connection()) { + if (!connection->thd->is_admin_connection()) { break; } } @@ -1442,6 +1503,7 @@ end: connection_abort(connection); } +end_return: DBUG_VOID_RETURN; }