From 2e663b344ddf35b75e9ca44c12dd4091beff0998 Mon Sep 17 00:00:00 2001 From: PeterWeiWang <715533650@qq.com> Date: Thu, 21 Apr 2022 14:39:33 +0800 Subject: [PATCH] support for parallel queries. --- .../collections/disabled-valgrind-pq-kp.def | 52 +++++++++++++++++++ mysql-test/r/explain_tree.result-pq | 6 +-- mysql-test/r/hash_join.result-pq | 4 -- mysql-test/t/hash_join.test | 6 ++- sql/binary_heap.h | 2 +- sql/item.h | 2 +- sql/msg_queue.cc | 19 +++++-- sql/msg_queue.h | 1 + sql/pq_condition.cc | 9 ++-- sql/query_result.h | 1 + sql/records.cc | 2 + sql/sql_parallel.cc | 12 ++++- sql/sql_parallel.h | 9 +++- storage/innobase/include/row0sel.h | 2 +- 14 files changed, 105 insertions(+), 22 deletions(-) create mode 100644 mysql-test/collections/disabled-valgrind-pq-kp.def diff --git a/mysql-test/collections/disabled-valgrind-pq-kp.def b/mysql-test/collections/disabled-valgrind-pq-kp.def new file mode 100644 index 000000000..6ce94af7e --- /dev/null +++ b/mysql-test/collections/disabled-valgrind-pq-kp.def @@ -0,0 +1,52 @@ +# Tests in this file are disabled for --pq test. +auth_sec.secure_file_priv_warnings : BUG#001 origin code failed +component_keyring_file.encrypt_explicit : BUG#001 origin code failed +component_keyring_file.engine : BUG#001 origin code failed +component_keyring_file.keyring_component_status : BUG#001 origin code failed +component_keyring_file.keyring_encryption_test : BUG#001 origin code failed +component_keyring_file.load : BUG#001 origin code failed +component_keyring_file.log_encrypt_1 : BUG#001 origin code failed +component_keyring_file.log_encrypt_2 : BUG#001 origin code failed +component_keyring_file.log_encrypt_3 : BUG#001 origin code failed +component_keyring_file.log_encrypt_4 : BUG#001 origin code failed +component_keyring_file.log_encrypt_5 : BUG#001 origin code failed +component_keyring_file.log_encrypt_6 : BUG#001 origin code failed +component_keyring_file.log_encrypt_kill : BUG#001 origin code failed +component_keyring_file.migration : BUG#001 origin code failed +component_keyring_file.rpl_binlog_cache_encryption : BUG#001 origin code failed +component_keyring_file.table_encrypt_2 : BUG#001 origin code failed +component_keyring_file.table_encrypt_3 : BUG#001 origin code failed +component_keyring_file.table_encrypt_5 : BUG#001 origin code failed +component_keyring_file.table_encrypt_kill : BUG#001 origin code failed +component_keyring_file.tablespace_encrypt_1 : BUG#001 origin code failed +component_keyring_file.tablespace_encrypt_4 : BUG#001 origin code failed +component_keyring_file.tablespace_encrypt_6 : BUG#001 origin code failed +component_keyring_file.udf_test : BUG#001 origin code failed +component_keyring_file.undo_encrypt_basic : BUG#001 origin code failed +component_keyring_file.undo_encrypt_bootstrap : BUG#001 origin code failed +component_keyring_file.undo_tablespace_encrypt : BUG#001 origin code failed +gis.st_collect : BUG#001 origin code failed +innodb.innodb_buffer_pool_resize : BUG#000 origin code failed +innodb.innodb_buffer_pool_resize_with_chunks : BUG#000 origin code failed +main.archive_plugin : BUG#000 origin code failed +main.blackhole_plugin : BUG#000 origin code failed +main.bug12969156 : BUG#001 origin code failed +main.dd_pfs : BUG#000 origin code failed +main.file_contents : BUG#000 origin code failed +main.log_buffered-big : BUG#000 origin code failed +main.mysql_load_data_local_dir : BUG#000 origin code failed +main.mtr_unit_tests : BUG#000 origin code failed +main.range_with_memory_limit : BUG#000 origin code failed +main.read_only_ddl : BUG#000 origin code failed +main.udf : BUG#001 origin code failed +perfschema.error_log : BUG#001 origin code failed +rpl.rpl_connection_error_message : BUG#001 origin code failed +rpl.rpl_multi_source_init_failure : BUG#001 origin code failed +rpl.rpl_multi_source_mts_reset_worker_info : BUG#001 origin code failed +rpl_gtid.rpl_gtid_heartbeat_2slave : BUG#001 origin code failed +secondary_engine.cost_threshold : BUG#000 origin code failed +sys_vars.innodb_buffer_pool_size_basic : BUG#000 origin code failed +sys_vars.plugin_dir_basic : BUG#000 origin code failed +x.connection_multi_bind_address : BUG#001 origin code failed +x.connection_multi_bind_address_ipv6 : BUG#001 origin code failed + diff --git a/mysql-test/r/explain_tree.result-pq b/mysql-test/r/explain_tree.result-pq index 8e8f8d6a1..86117deca 100644 --- a/mysql-test/r/explain_tree.result-pq +++ b/mysql-test/r/explain_tree.result-pq @@ -723,9 +723,9 @@ Table Op Msg_type Msg_text test.t1 analyze status OK EXPLAIN ANALYZE SELECT * FROM t1 AS a JOIN t1 AS b ON a.a=b.b ORDER BY a.b+b.a LIMIT 3; EXPLAIN --> Limit: 3 row(s) (cost=0.00..0.00 rows=0) (actual time=N.NNN..N.NNN rows=0 loops=1) - -> Table scan on (cost=2.50..2.50 rows=0) (never executed) - -> Temporary table (cost=2.50..2.50 rows=0) (actual time=N.NNN..N.NNN rows=0 loops=1) +-> Limit: 3 row(s) (cost=0.00..0.00 rows=0) (actual time=N.NNN..N.NNN rows=1 loops=1) + -> Table scan on (cost=2.50..2.50 rows=0) (actual time=N.NNN..N.NNN rows=1 loops=1) + -> Temporary table (cost=2.50..2.50 rows=0) (actual time=N.NNN..N.NNN rows=1 loops=1) -> Parallel scan on (actual time=N.NNN..N.NNN rows=1 loops=1) -> Limit: 3 row(s) (actual time=N.NNN..N.NNN rows=1 loops=1) -> Sort: `(a.b + b.a)`, limit input to 3 row(s) per chunk (actual time=N.NNN..N.NNN rows=1 loops=1) diff --git a/mysql-test/r/hash_join.result-pq b/mysql-test/r/hash_join.result-pq index cf8b6b855..daee4cbc1 100644 --- a/mysql-test/r/hash_join.result-pq +++ b/mysql-test/r/hash_join.result-pq @@ -491,10 +491,6 @@ COUNT_STAR > 0 SELECT COUNT(*) FROM t2 WHERE (t2.i) IN (SELECT t1.i FROM t1); COUNT(*) 1024 -SELECT COUNT_STAR > 0 FROM performance_schema.file_summary_by_event_name -WHERE event_name LIKE '%hash_join%'; -COUNT_STAR > 0 -0 DROP TABLE t1, t2; SET join_buffer_size = DEFAULT; SET optimizer_switch = DEFAULT; diff --git a/mysql-test/t/hash_join.test b/mysql-test/t/hash_join.test index 764978f63..789e26f6a 100644 --- a/mysql-test/t/hash_join.test +++ b/mysql-test/t/hash_join.test @@ -329,8 +329,10 @@ TRUNCATE performance_schema.file_summary_by_event_name; eval $hash_join_file_operations; SELECT COUNT(*) FROM t2 WHERE (t2.i) IN (SELECT t1.i FROM t1); --skip_if_hypergraph # Does not use weedout, so does not need row ID, and thus does not spill to disk. -eval $hash_join_file_operations; - +if (!$PQ_TEST) +{ + eval $hash_join_file_operations; +} DROP TABLE t1, t2; SET join_buffer_size = DEFAULT; SET optimizer_switch = DEFAULT; diff --git a/sql/binary_heap.h b/sql/binary_heap.h index 7628d16a1..debdb9c1e 100644 --- a/sql/binary_heap.h +++ b/sql/binary_heap.h @@ -37,7 +37,7 @@ class binary_heap { m_compare(cmp), m_arg(arg), m_thd(thd) {} - + ~binary_heap() {} /* @retval: false of success, and true otherwise. */ bool init_binary_heap() { if (m_capacity <= 0) { diff --git a/sql/item.h b/sql/item.h index a54c79554..920455421 100644 --- a/sql/item.h +++ b/sql/item.h @@ -6976,7 +6976,7 @@ class Item_aggregate_ref : public Item_ref { : Item_ref(context_arg, item, db_name_arg, table_name_arg, field_name_arg) { depended_from = depended_from_arg; } - + ~Item_aggregate_ref() {} void print(const THD *thd, String *str, enum_query_type query_type) const override { if (ref != nullptr && (*ref) != nullptr) { diff --git a/sql/msg_queue.cc b/sql/msg_queue.cc index 20ee39296..79dfb740c 100644 --- a/sql/msg_queue.cc +++ b/sql/msg_queue.cc @@ -41,7 +41,8 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, uint32 *written, bool nowait) { uint32 used; uint32 ringsize = m_queue->m_ring_size; - uint32 sent = 0, available; + uint32 sent = 0; + uint64 available = 0; /** only worker thread can send data to message queue */ THD *thd = current_thd; @@ -58,7 +59,12 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, used = wb - rb; assert(used <= ringsize); - available = std::min(ringsize - used, nbytes - sent); + uint32x2_t v_a = {ringsize, nbytes}; + uint32x2_t v_b = {used, sent}; + uint64x2_t v = vsubl_u32(v_a, v_b); + uint64 m_a = vgetq_lane_u64(v, 0); + uint64 m_b = vgetq_lane_u64(v, 1); + available = std::min(m_a, m_b); compiler_barrier(); if (m_queue->detached == MQ_HAVE_DETACHED) { @@ -95,7 +101,7 @@ MQ_RESULT MQueue_handle::send_bytes(uint32 nbytes, const void *data, /** compute the real write position in ring array */ offset = MOD(wb, ringsize); - sent_once = std::min(available, ringsize - offset); + sent_once = std::min((uint32)available, ringsize - offset); /** this barrier ensures that memcpy() is finished before end_wait() */ memory_barrier(); @@ -377,8 +383,11 @@ MQ_RESULT MQueue_handle::receive(void **datap, uint32 *nbytesp, bool nowait) { if (res != MQ_SUCCESS) { return res; } - m_partial_bytes += rb; - m_consume_pending += rb; + uint32x2_t v_a = {m_partial_bytes, m_consume_pending}; + uint32x2_t v_b = {rb, rb}; + v_a = vadd_u32(v_a, v_b); + m_partial_bytes = vget_lane_u32(v_a, 0); + m_consume_pending = vget_lane_u32(v_a, 1); if (m_partial_bytes >= nbytes) { break; } diff --git a/sql/msg_queue.h b/sql/msg_queue.h index 0abdc012b..92d75f8c9 100644 --- a/sql/msg_queue.h +++ b/sql/msg_queue.h @@ -190,6 +190,7 @@ class MQueue { m_buffer(ring), m_ring_size(ring_size), detached(MQ_NOT_DETACHED) {} + ~MQueue() {} }; class MQueue_handle { diff --git a/sql/pq_condition.cc b/sql/pq_condition.cc index 8587b489a..29c7b77bf 100644 --- a/sql/pq_condition.cc +++ b/sql/pq_condition.cc @@ -20,8 +20,8 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ - #include "sql/pq_condition.h" +#include #include "sql/item_strfunc.h" #include "sql/item_sum.h" #include "sql/mysqld.h" @@ -839,8 +839,11 @@ bool check_pq_running_threads(uint dop, ulong timeout_ms) { } if (success) { - parallel_threads_running += dop; - current_thd->pq_threads_running += dop; + uint32x2_t v_a = {parallel_threads_running, current_thd->pq_threads_running}; + uint32x2_t v_b = {dop, dop}; + v_a = vadd_u32(v_a, v_b); + parallel_threads_running = vget_lane_u32(v_a, 0); + current_thd->pq_threads_running = vget_lane_u32(v_a, 1); } mysql_mutex_unlock(&LOCK_pq_threads_running); diff --git a/sql/query_result.h b/sql/query_result.h index 4fc3495fb..02497d266 100644 --- a/sql/query_result.h +++ b/sql/query_result.h @@ -234,6 +234,7 @@ public: Query_result_mq (JOIN *join, MQueue_handle *msg_handler, handler *file=nullptr, bool stab_output=false); + ~Query_result_mq() {} bool send_result_set_metadata(THD *thd, const mem_root_deque &list, uint flags) override; bool send_data(THD *thd, const mem_root_deque &items) override; diff --git a/sql/records.cc b/sql/records.cc index a91977f81..4c6efc57c 100644 --- a/sql/records.cc +++ b/sql/records.cc @@ -582,6 +582,7 @@ int ParallelScanIterator::pq_error_code() { bool ParallelScanIterator::Init() { assert(current_thd == m_join->thd); + m_gather->waitReadEnd(); if (m_gather->init() || /** cur innodb data, should be called first(will change dop based on split count) */ @@ -606,6 +607,7 @@ int ParallelScanIterator::Read() { } int ParallelScanIterator::End() { + m_gather->signalReadEnd(); /** wait all workers to finish their execution */ pq_wait_workers_finished(); /** output error code */ diff --git a/sql/sql_parallel.cc b/sql/sql_parallel.cc index 8b03c28c8..e986ab074 100644 --- a/sql/sql_parallel.cc +++ b/sql/sql_parallel.cc @@ -467,6 +467,14 @@ bool Gather_operator::init() { */ void Gather_operator::signalAll() { m_table->file->ha_pq_signal_all(); } +void Gather_operator::signalReadEnd() { + m_read_end_mutex.unlock(); +} + +void Gather_operator::waitReadEnd() { + m_read_end_mutex.lock(); +} + void pq_free_gather(Gather_operator *gather) { THD *thd_temp = gather->m_template_join->thd; if (thd_temp == nullptr) return; @@ -787,6 +795,8 @@ void *pq_worker_exec(void *arg) { join->query_expression()->ExecuteIteratorQuery(thd); if (thd->lex->is_explain_analyze && mngr->m_id == 0) { + msg_handler->set_datched_status(MQ_HAVE_DETACHED); + mngr->m_gather->waitReadEnd(); Query_expression *unit = leader_thd->lex->unit; leader_thd->pq_explain += PrintQueryPlan( 0, unit->root_access_path(), @@ -1026,7 +1036,7 @@ ORDER *restore_optimized_group_order(SQL_I_List &orig_list, } idx++; } - *prev_ptr = 0; + *prev_ptr = nullptr; return header; } diff --git a/sql/sql_parallel.h b/sql/sql_parallel.h index e819f9564..18e41cf29 100644 --- a/sql/sql_parallel.h +++ b/sql/sql_parallel.h @@ -83,7 +83,7 @@ class MQ_record_gather { MQ_record_gather(THD *thd, QEP_TAB *tab) : m_thd(thd), m_tab(tab), m_exchange(nullptr) {} - + ~MQ_record_gather() {} bool mq_scan_init(Filesort *sort, int workers, uint ref_length, bool stab_output = false); @@ -146,6 +146,9 @@ class Gather_operator { CODE_STATE **m_code_state; bool table_scan; + private: + std::mutex m_read_end_mutex; + public: Gather_operator() = delete; @@ -158,6 +161,10 @@ class Gather_operator { void end(); void signalAll(); + + void signalReadEnd(); + + void waitReadEnd(); }; Gather_operator *make_pq_gather_operator(JOIN *join, QEP_TAB *tab, uint dop); diff --git a/storage/innobase/include/row0sel.h b/storage/innobase/include/row0sel.h index 1670f6796..83b7e6305 100644 --- a/storage/innobase/include/row0sel.h +++ b/storage/innobase/include/row0sel.h @@ -526,7 +526,7 @@ class Row_sel_get_clust_rec_for_mysql { /** Constructor */ Row_sel_get_clust_rec_for_mysql() : cached_clust_rec(nullptr), cached_old_vers(nullptr) {} - + ~Row_sel_get_clust_rec_for_mysql() {} /** Retrieve the clustered index record corresponding to a record in a non-clustered index. Does the necessary locking. @param[in] prebuilt prebuilt struct in the handle -- Gitee