diff --git a/patches/atomicBloom.patch b/patches/atomicBloom.patch new file mode 100644 index 0000000000000000000000000000000000000000..b2e0e9ad9f6918fa7b937971d7072b73c551b61a --- /dev/null +++ b/patches/atomicBloom.patch @@ -0,0 +1,107 @@ +diff --git a/sql/bloom_helpers.h b/sql/bloom_helpers.h +index 0091a48..2b8a990 100644 +--- a/sql/bloom_helpers.h ++++ b/sql/bloom_helpers.h +@@ -6,6 +6,7 @@ + #include + #include + #include ++#include + + #include "m_ctype.h" + #include "my_inttypes.h" +@@ -86,6 +87,81 @@ private: + segment_type *segment_arr = nullptr; + }; + ++// atomic version ++template ++class bloom_filter> { ++ private: ++ using segment_type = std::atomic; ++ const size_t segment_length_in_bytes = sizeof(segment_internal_type); // 1 2 4 8 ++ const size_t segment_length_in_bits = segment_length_in_bytes * 8; // 8 16 32 64 ++ const size_t segment_len_nbits = log2(segment_length_in_bits); // 3 4 5 6 ++ const size_t segment_mask = (1 << segment_len_nbits) - 1; // 7 15 31 63 ++ ++ inline size_t get_bitseg(size_t h_val, size_t index, size_t mask) { ++ return (h_val >> (index * segment_len_nbits)) & mask; ++ } ++ ++ public: ++ bloom_filter() { } ++ ++ /// Initialize a BloomFilter. ++ /// ++ /// @param bloom_size ++ /// size of the bloom filter, in bytes. ++ /// @param mem_root ++ /// the MEM_ROOT to allocate memory on for the bloom filter. ++ bool Init(size_t bloom_size, MEM_ROOT *mem_root) { ++ if(bloom_size < sizeof(segment_type)) { ++ bloom_size = sizeof(segment_type); ++ } ++ segment_arr = (segment_type*)mem_root->Alloc(bloom_size); ++ if (segment_arr == nullptr) { ++ return true; ++ } ++ ++ n_segments = bloom_size / segment_length_in_bytes; ++ segment_index_mask = n_segments - 1; ++ ++ for (int i = 0; i < n_segments; i++) { ++ segment_arr[i] = 0; ++ } ++ ++ return false; ++ } ++ ++ void set(size_t h_val) { ++ size_t seg_offset = get_bitseg(h_val, 2, segment_index_mask); ++ size_t offset1 = get_bitseg(h_val, 1, segment_mask); ++ size_t offset2 = get_bitseg(h_val, 0, segment_mask); ++ ++ segment_internal_type to_insert = 0; ++ to_insert |= (((segment_internal_type)1) << offset1); ++ to_insert |= (((segment_internal_type)1) << offset2); ++ ++ segment_arr[seg_offset].fetch_or(to_insert); ++ } ++ bool test(size_t h_val) { ++ size_t seg_offset = get_bitseg(h_val, 2, segment_index_mask); ++ size_t offset1 = get_bitseg(h_val, 1, segment_mask); ++ size_t offset2 = get_bitseg(h_val, 0, segment_mask); ++ ++ segment_internal_type to_insert = 0; ++ to_insert |= (((segment_internal_type)1) << offset1); ++ to_insert |= (((segment_internal_type)1) << offset2); ++ ++ segment_internal_type prev = segment_arr[seg_offset]; ++ ++ return ((prev & to_insert) != to_insert); ++ } ++ ++ size_t getSize() { return n_segments * segment_length_in_bytes; } ++ ++private: ++ size_t n_segments; ++ size_t segment_index_mask; ++ segment_type *segment_arr = nullptr; ++}; ++ + template + class HyperLogLog { + const double pow_2_64 = pow(2.0, 64.0); +diff --git a/sql/hash_join_buffer.h b/sql/hash_join_buffer.h +index 9c8ed89..4256eb6 100644 +--- a/sql/hash_join_buffer.h ++++ b/sql/hash_join_buffer.h +@@ -372,7 +372,7 @@ class HashJoinRowBuffer { + hash_map_iterator m_last_row_stored; + + // The following functions and fields are related to bloom filters +- using bloom_filter_type = bloom_filter; ++ using bloom_filter_type = bloom_filter>; + const size_t m_bloom_max_mem_available; + unique_ptr_destroy_only m_bloom_filter; + unique_ptr_destroy_only m_hash_store; diff --git a/patches/bloomLog.patch b/patches/bloomLog.patch new file mode 100644 index 0000000000000000000000000000000000000000..3499791d6963f388a2c794cfef3ebdc72032b111 --- /dev/null +++ b/patches/bloomLog.patch @@ -0,0 +1,517 @@ +diff --git a/sql/hash_join_buffer.h b/sql/hash_join_buffer.h +index 9c8ed89..51ddfc7 100644 +--- a/sql/hash_join_buffer.h ++++ b/sql/hash_join_buffer.h +@@ -391,6 +391,13 @@ class HashJoinRowBuffer { + bool need_bloom() const { return m_need_bloom; } + void set_need_bloom(bool need_bloom) { m_need_bloom = need_bloom; } + ++ public: ++ //test ++ uint64_t m_bloomdbg_n_not_filtered = 0; // 没被过滤 ++ uint64_t m_bloomdbg_n_filtered = 0; // 被过滤 ++ uint64_t m_bloomdbg_n_false_positive = 0; // 误判(被判有实际上没有) ++ uint64_t m_bloomdbg_n_negative = 0; // 实际上没有连接结果的 ++ uint64_t m_bloomdbg_n_write_probe = 0; // 被驱动表落盘了多少行 + }; + } // namespace hash_join_buffer + +diff --git a/sql/hash_join_iterator.cc b/sql/hash_join_iterator.cc +index 9bc0df0..96efb8e 100644 +--- a/sql/hash_join_iterator.cc ++++ b/sql/hash_join_iterator.cc +@@ -55,6 +55,15 @@ + #include "tables_contained_in.h" + + constexpr size_t HashJoinIterator::kMaxChunks; ++//case3修改 ++#include ++static time_t timeMs() { ++ using std::chrono::duration_cast; ++ using std::chrono::milliseconds; ++ using std::chrono::seconds; ++ using std::chrono::system_clock; ++ return duration_cast(system_clock::now().time_since_epoch()).count(); ++} + + static bool ShouldStartRecursiveJoin( + size_t max_bloom_size, double num_row_build_chunk, double num_row_probe_chunk, +@@ -64,7 +73,24 @@ static bool ShouldStartRecursiveJoin( + static bool NeedBloom(THD *thd, const unique_ptr_destroy_only &build_input, + const unique_ptr_destroy_only &probe_input, + const Prealloced_array &m_join_conditions, +- size_t max_bloom_size, bool is_recursive_join, bool allow_spill_to_disk); ++ size_t max_bloom_size, bool is_recursive_join, bool allow_spill_to_disk,int thr); ++ ++//max_bloom_size=0:不开布隆 ++//max_bloom_size>0:允许布隆,布隆占字节数不超过这个数 ++//原本max_bloom_size=0时就不允许递归,但此测试版本里以下情况例外(无论max_bloom_size是多少): ++//hash_join_test=3:不允许递归 ++//hash_join_test=4:允许递归 ++//hash_join_test=6:如果m_allow_spill_disk==true则强制开布隆 ++//hash_join_test=7:第零层判断是否进行第一层递归时,强行返回“是” ++//hash_join_test=其他:没有效果 ++ ++//综上: ++//源码流程:设置hash_join_test =3,max_bloom_size=0 ++//布隆+递归流程:设置max_bloom_size>0,hash_join_test=4 ++//只开递归:设置max_bloom_size=0,hash_join_test=4 ++//只开布隆:设置max_bloom_size>0,hash_join_test=3 ++ ++static const bool countErr = false;//统计误判率。注:会造成明显的额外开销(5%+),log输出的[HashStore] read=xxx也会增大! + + HashJoinIterator::HashJoinIterator( + THD *thd, unique_ptr_destroy_only build_input, +@@ -206,6 +232,7 @@ bool HashJoinIterator::InitProbeIterator() { + } + + bool HashJoinIterator::Init() { ++ hash_join_test = thd()->variables.hash_join_test; + // Clean up memory allocated by sub-iterators. + subIterator.reset(nullptr); + m_subiterator_mem_root.Clear(); +@@ -261,13 +288,15 @@ bool HashJoinIterator::Init() { + m_build_chunk_current_row = 0; + m_probe_chunk_current_row = 0; + m_current_chunk = -1; ++m_debug_begin_time = timeMs(); + + // The HashJoinIterator::Init() will be called multiple times when there are + // subqueries, so the need_bloom state will have to be reset each time Init() + // is called. + bool need_bloom = NeedBloom(thd(), m_build_input, m_probe_input, m_join_conditions, + m_row_buffer.bloom_filter_get_max_mem(), +- m_recursion_depth > 0, m_allow_spill_to_disk); ++ m_recursion_depth > 0, m_allow_spill_to_disk, ++ hash_join_test); + m_row_buffer.set_need_bloom(need_bloom); + + // Build the hash table. +@@ -275,6 +304,7 @@ bool HashJoinIterator::Init() { + DBUG_ASSERT(thd()->is_error()); // my_error should have been called. + return true; + } ++ m_debug_build_time = timeMs() - m_debug_begin_time;//case3修改 + + if (m_state == State::END_OF_ROWS) { + // BuildHashTable() decided that the join is done (the build input is +@@ -639,7 +669,7 @@ bool HashJoinIterator::ReadNextHashJoinChunk() { + if (move_to_next_chunk) { + m_current_chunk++; + m_build_chunk_current_row = 0; +- ++m_debug_n_memfull_logged=false; + // Since we are moving to a new set of chunk files, ensure that we read from + // the chunk file and not from the probe row saving file. + m_read_from_probe_row_saving = false; +@@ -688,13 +718,29 @@ bool HashJoinIterator::ReadNextHashJoinChunk() { + // if it is not required this time, it will not be required the next time + // we get here. Therefore, there won't be duplicate join results. + bool m_recursive_join = ShouldStartRecursiveJoin( +- m_row_buffer.bloom_filter_get_max_mem(), +- build_chunk.num_rows(), +- m_chunk_files_on_disk[m_current_chunk].probe_chunk.num_rows(), +- m_build_chunk_current_row, m_recursion_depth, +- hash_join_recursion_bucket_limit, +- hash_join_recursion_depth_limit, m_join_type == JoinType::INNER, +- m_has_dominant_hash); ++ hash_join_test == 4 ? 100000 : //hash_join_test=4时允许递归 ++ hash_join_test == 3 ? 0 : m_row_buffer.bloom_filter_get_max_mem(),//hash_join_test=3时不开递归 ++ build_chunk.num_rows(), ++ m_chunk_files_on_disk[m_current_chunk].probe_chunk.num_rows(), ++ m_build_chunk_current_row, m_recursion_depth, ++ hash_join_recursion_bucket_limit, hash_join_recursion_depth_limit, ++ m_join_type == JoinType::INNER, m_has_dominant_hash); ++ if(hash_join_test==7&&m_recursion_depth == 0)m_recursive_join=true;//hash_join_test=7时强制开一层递归 ++ ++ // 打印调试信息 ++ if (m_recursion_depth == 0 && !m_debug_n_memfull_logged) { ++ m_debug_n_memfull_logged=true; ++ ++ //printf("[full] idx=%u depth=%u B=%lld P=%lld P/b=%f B/b=%f case3=%d)\n", ++ printf("[full] idx=%u B=%lld P=%lld P/b=%f B/b=%f case3=%d\n", ++ m_current_chunk, ++ //m_recursion_depth, ++ build_chunk.num_rows(), ++ m_chunk_files_on_disk[m_current_chunk].probe_chunk.num_rows(), ++ 1.0f * m_chunk_files_on_disk[m_current_chunk].probe_chunk.num_rows() / m_build_chunk_current_row, ++ 1.0f * build_chunk.num_rows() / m_build_chunk_current_row, ++ m_recursive_join); ++ } + + if(m_recursive_join) { + if (InitRowBuffer()) { // Free memory allocated by hash map. +@@ -792,11 +838,18 @@ bool HashJoinIterator::ReadRowFromProbeIterator() { + if (m_allow_spill_to_disk) { + m_hash_join_type = HashJoinType::SPILL_TO_DISK; + m_state = State::LOADING_NEXT_CHUNK_PAIR; ++ /*if(m_recursion_depth==0){ ++ printf("[probeEnd] thr=%d need=%d filtered=%lu/%lu=%f err: %lu/%lu=%f\n", hash_join_test, ++ m_row_buffer.need_bloom(), m_row_buffer.m_bloomdbg_n_filtered, m_row_buffer.m_bloomdbg_n_not_filtered+m_row_buffer.m_bloomdbg_n_filtered, ++ 1.0f*m_row_buffer.m_bloomdbg_n_filtered/(m_row_buffer.m_bloomdbg_n_not_filtered+m_row_buffer.m_bloomdbg_n_filtered), ++ m_row_buffer.m_bloomdbg_n_false_positive,m_row_buffer.m_bloomdbg_n_negative, ++ 1.0f*m_row_buffer.m_bloomdbg_n_false_positive/(m_row_buffer.m_bloomdbg_n_negative)); ++ }*/ + + // We currently disable bloom filter while processing chunk files. + m_row_buffer.set_need_bloom(false); + m_probe_row_match_flag = false; +- ++ + return false; + } + +@@ -966,6 +1019,7 @@ void HashJoinIterator::LookupProbeRowInHashTable() { + } + return; + } ++ bool needbl=m_row_buffer.need_bloom(); + + hash_join_buffer::Key key( + pointer_cast(m_temporary_row_and_join_key_buffer.ptr()), +@@ -973,7 +1027,8 @@ void HashJoinIterator::LookupProbeRowInHashTable() { + + if (m_row_buffer.need_bloom() && + m_row_buffer.bloom_filter_test(m_row_buffer.bloom_filter_hash(key))) { +- m_filtered_by_bloom = true; ++ m_filtered_by_bloom = true; ++ m_row_buffer.m_bloomdbg_n_filtered++; + if (m_join_type == JoinType::ANTI || m_join_type == JoinType::OUTER) { + m_hash_map_iterator = m_row_buffer.end(); + m_hash_map_end = m_row_buffer.end(); +@@ -989,9 +1044,18 @@ void HashJoinIterator::LookupProbeRowInHashTable() { + } else { + SetReadingProbeRowState(); + } ++ if(countErr){//没有结果 ++ m_row_buffer.m_bloomdbg_n_negative++; ++ } + return; + } + ++ //没被过滤,说明期望是有结果的。 ++ ++ if(needbl){ ++ m_row_buffer.m_bloomdbg_n_not_filtered++; ++ } ++ + + if ((m_join_type == JoinType::SEMI || m_join_type == JoinType::ANTI) && + m_extra_condition == nullptr) { +@@ -1010,6 +1074,12 @@ void HashJoinIterator::LookupProbeRowInHashTable() { + m_hash_map_iterator = range.first; + m_hash_map_end = range.second; + } ++ if(needbl){ ++ if(countErr && m_dbg_build_hashes.find(m_row_buffer.bloom_filter_hash(key)) == m_dbg_build_hashes.end()) {//判错了 ++ m_row_buffer.m_bloomdbg_n_false_positive++; ++ m_row_buffer.m_bloomdbg_n_negative++; ++ } ++ } + + m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE; + } +@@ -1050,6 +1120,7 @@ bool HashJoinIterator::WriteProbeRowToDiskIfApplicable() { + if ((m_join_type == JoinType::INNER || m_join_type == JoinType::OUTER) || + !found_match) { + if (on_disk_hash_join() && m_current_chunk == -1) { ++ m_row_buffer.m_bloomdbg_n_write_probe++; + if (WriteRowToChunk(thd(), &m_chunk_files_on_disk, + false /* write_to_build_chunk */, + m_probe_input_tables, m_join_conditions, +@@ -1254,6 +1325,9 @@ int HashJoinIterator::Read() { + return result; + } + case State::END_OF_ROWS: ++ //if(m_recursion_depth == 0) printf("HashJoin end. savingfile read=%d write=%d", n_probe_read, n_probe_write);//case3 打印一下 ++ m_debug_total_time = timeMs() - m_debug_begin_time; ++ PrintDebugInfo(); + return -1; + } + } +@@ -1311,7 +1385,7 @@ std::vector HashJoinIterator::DebugString() const { + } + + bool HashJoinIterator::InitWritingToProbeRowSavingFile() { +- m_write_to_probe_row_saving = true; ++ m_write_to_probe_row_saving = true;n_probe_write++; + return m_probe_row_saving_write_file.Init(m_probe_input_tables, + m_join_type == JoinType::OUTER); + } +@@ -1319,7 +1393,7 @@ bool HashJoinIterator::InitWritingToProbeRowSavingFile() { + bool HashJoinIterator::InitReadingFromProbeRowSavingFile() { + m_probe_row_saving_read_file = std::move(m_probe_row_saving_write_file); + m_probe_row_saving_read_file_current_row = 0; +- m_read_from_probe_row_saving = true; ++ m_read_from_probe_row_saving = true;n_probe_read++; + return m_probe_row_saving_read_file.Rewind(); + } + +@@ -1408,14 +1482,90 @@ HashJoinIterator *HashJoinIterator::GetRootHJIterator() { + if(m_recursion_depth == 0) return this; + return ((HashJoinChunkIterator*) m_build_input.get())->GetParentHashJoinIterator()->GetRootHJIterator(); + } ++std::string HashJoinIterator::GetTableString(hash_join_buffer::TableCollection m_tables) { ++ std::stringstream ss; ++ int f = 0; ++ for (const hash_join_buffer::Table &it : m_tables.tables()) { ++ if(f++) ss<<","; ++ ss<table()->alias; ++ } ++ return ss.str(); ++} ++ ++void HashJoinIterator::PrintDebugInfo() { ++ if(m_debug_mute) return; ++ if(m_debug_logged) return;//只打印一次 ++ m_debug_logged = true; ++ ++ HashJoinIterator *root = GetRootHJIterator(); ++ ++ root->m_debug_build_time_arr[m_recursion_depth] += m_debug_build_time; ++ root->m_debug_total_time_arr[m_recursion_depth] += m_debug_total_time; ++ root->m_debug_n_cnt_arr[m_recursion_depth]++; ++ root->m_debug_n_chunk_arr[m_recursion_depth] += m_chunk_files_on_disk.size(); ++ root->m_debug_pf_read_arr[m_recursion_depth] += n_probe_read; ++ root->m_debug_pf_write_arr[m_recursion_depth] += n_probe_write; ++ root->m_debug_bloom[m_recursion_depth] += m_row_buffer.need_bloom(); ++ root->m_debug_bloom_err[m_recursion_depth] += m_row_buffer.m_bloomdbg_n_false_positive; ++ root->m_debug_bloom_filtered[m_recursion_depth] += m_row_buffer.m_bloomdbg_n_filtered; ++ root->m_debug_bloom_nfil[m_recursion_depth] += m_row_buffer.m_bloomdbg_n_not_filtered; ++ root->m_debug_bloom_noresult[m_recursion_depth] += m_row_buffer.m_bloomdbg_n_negative; ++ ++ int d = m_recursion_depth; ++ int (&dbg_info)[129] = root->m_debug_n_buckets[d]; ++ dbg_info[m_chunk_files_on_disk.size()]++; ++ ++ std::stringstream ss; ++ ++ if(m_recursion_depth == 0) { ++ ss<<"[HashJoinIterator] input="< 0; i++) { ++ std::stringstream b; ++ for(int j = 0, f = 0; j < 129; j++){ ++ if(m_debug_n_buckets[i][j]){ ++ b << (f++ ? "," : "") << j << "x" << m_debug_n_buckets[i][j]; ++ } ++ } ++ int cn = m_debug_n_cnt_arr[i]; ++ //printf("Depth=%d count=%d total=%.1fs(avg:%.2fs) build=%.1fs(avg:%.2fs) avg_n_chunk=%.1f chunk_arr=%s pr=%d pw=%d filtered=%lld/%lld=%f err=%lld/%lld=%f\n", ++ printf("Depth=%d count=%d total=%.1fs(avg:%.2fs) build=%.1fs(avg:%.2fs) filtered=%lld/%lld=%f err=%lld/%lld=%f\n", ++ i, cn, ++ m_debug_total_time_arr[i]/1000.0, m_debug_total_time_arr[i]/cn/1000.0, ++ m_debug_build_time_arr[i]/1000.0, m_debug_build_time_arr[i]/cn/1000.0, ++ //m_debug_n_chunk_arr[i]/cn, ++ //b.str().c_str(), ++ //m_debug_pf_read_arr[i], m_debug_pf_write_arr[i], ++ m_debug_bloom_filtered[i], m_debug_bloom_nfil[i]+m_debug_bloom_filtered[i], ++ 1.0f*m_debug_bloom_filtered[i]/(m_debug_bloom_nfil[i]+m_debug_bloom_filtered[i]), ++ m_debug_bloom_err[i], m_debug_bloom_noresult[i], ++ 1.0f*m_debug_bloom_err[i]/m_debug_bloom_noresult[i]); ++ } ++ printf("==========\n"); ++ } ++} + + // Determine whether a bloom filter should be used. + static bool NeedBloom(THD *thd, const unique_ptr_destroy_only &build_input, + const unique_ptr_destroy_only &probe_input, + const Prealloced_array &m_join_conditions, +- size_t max_bloom_size, bool is_recursive_join, bool allow_spill_to_disk) { +- if (max_bloom_size == 0) return false; +- if (allow_spill_to_disk == 0) return false; ++ size_t max_bloom_size, bool is_recursive_join, bool allow_spill_to_disk, int thr) { ++ if (max_bloom_size == 0){ ++ if(!is_recursive_join){ ++ printf("[NeedBloom] false (maxsize=0)\n"); ++ } ++ return false; ++ } ++ if (allow_spill_to_disk == 0) { ++ if(!is_recursive_join){ ++ printf("[NeedBloom] false (allow_spill_to_disk=0)\n"); ++ } ++ return false; ++ } + + bool build_is_table_scan_iter = typeid(*build_input.get()) == typeid(TableScanIterator); + bool probe_is_table_scan_iter = typeid(*probe_input.get()) == typeid(TableScanIterator); +@@ -1467,6 +1617,14 @@ static bool NeedBloom(THD *thd, const unique_ptr_destroy_only &buil + bool same_table_and_field = field_same == 0 && table_same == 0; + bool need_bloom = !build_input_too_big && !same_table_and_field && !no_condition; + ++ if(thr == 6) need_bloom = true; ++ ++ if(!is_recursive_join){ ++ printf("[NeedBloom] nBbuildRow=%.0f nProbeRow=%.0f buildTooBig=%d noCondition=%d sameTable&Field=%d thr=%d needBloom=%d\n", ++ build_input->expected_rows(), probe_input->expected_rows(), build_input_too_big, no_condition, ++ same_table_and_field, thr, need_bloom); ++ } ++ + return need_bloom; + } + +@@ -1477,22 +1635,38 @@ bool HashJoinIterator::ConstructBloomWithHashStore() { + ha_rows n_rows = hash_store->GetNumberOfRows(); + size_t cardinality = hash_store->EstimateCardinality(); + hash_store->Rewind(); ++ if(m_recursion_depth==0&&m_state==State::READING_ROW_FROM_PROBE_ITERATOR) ++ printf("[HashStore] nRows=%lld cardinality=%ld ", n_rows, cardinality); + + size_t bloom_size = (size_t)(2 * cardinality - 1); + if(m_row_buffer.bloom_filter_init(bloom_size)) { + return true; + } ++ ++ if(countErr)m_dbg_build_hashes.clear(); + +- size_t tmp; ++ long long st = timeMs(); //测试用 ++ size_t chksum = 0; //测试用 ++ size_t tmp = 0; + for (ha_rows i = 0; i < n_rows; i++) { + if (hash_store->ReadKey(tmp)) { + return true; + } ++ if(countErr) m_dbg_build_hashes.insert(tmp); ++ chksum ^= tmp; + m_row_buffer.bloom_filter_set(tmp); + } + + m_has_dominant_hash = m_row_buffer.has_dominant_hash(); + ++ if(m_recursion_depth==0&&m_state==State::READING_ROW_FROM_PROBE_ITERATOR){ ++ if(countErr) printf("actualCard=%lu cardErr=%.5f ",m_dbg_build_hashes.size(), 1.0*m_dbg_build_hashes.size()/cardinality-1.0); ++ //printf("chksum=%ld read=%lldms bloom_size=%zu actual_size=%zu has_dominant_hash=%d\n", ++ // chksum, timeMs() - st, bloom_size, m_row_buffer.bloom_filter_get_size(), m_has_dominant_hash); ++ printf(" read=%lldms bloom_size=%zu actual_size=%zu has_dominant_hash=%d\n", ++ timeMs() - st, bloom_size, m_row_buffer.bloom_filter_get_size(), m_has_dominant_hash); ++ } ++ + return false; + } + +@@ -1517,4 +1691,4 @@ static bool ShouldStartRecursiveJoin( + } + + return cost_direct > cost_recursive * 1.05; +-} +\ No newline at end of file ++} +diff --git a/sql/hash_join_iterator.h b/sql/hash_join_iterator.h +index 9072b50..6c1be4e 100644 +--- a/sql/hash_join_iterator.h ++++ b/sql/hash_join_iterator.h +@@ -28,6 +28,7 @@ + #include + #include + #include ++#include + + #include "my_alloc.h" + #include "my_inttypes.h" +@@ -663,6 +664,7 @@ class HashJoinIterator final : public RowIterator { + unique_ptr_destroy_only build_chunk_iterator; + unique_ptr_destroy_only probe_chunk_iterator; + ++ + static constexpr uint32 hash_join_recursion_depth_limit = 4; + static constexpr uint32 hash_join_recursion_bucket_limit = 16; + +@@ -670,8 +672,37 @@ class HashJoinIterator final : public RowIterator { + + bool ConstructBloomWithHashStore(); + ++//////////测试用的变量////////// ++ int n_probe_read = 0;//读了几次ProbeRowSavingFile ++ int n_probe_write = 0;//写了几次ProbeRowSavingFile ++ ++ //下面几个应当是从thd.variables里取来的变量,暂时设为定值 ++ uint32 hash_join_test; ++ bool m_debug_mute = false;//是否不输出信息 ++ bool m_debug_logged = false;//是否已经输出过调试信息,避免太啰嗦只输出一次。 ++ bool m_debug_n_memfull_logged = 0;//[full]信息输出了没有 ++ //std::string m_debug_info[10];//各层的递归信息 ++ int m_debug_n_buckets[10][129]={0}; ++ time_t m_debug_build_time_arr[10]={0}; ++ time_t m_debug_total_time_arr[10]={0}; ++ int m_debug_n_cnt_arr[10]={0};//第i层递归执行了几次 ++ double m_debug_n_chunk_arr[10]={0.0};//第i层递归分块数之和 ++ time_t m_debug_begin_time = 0;//测试用,记录哈希连接开始的时间 ++ time_t m_debug_build_time = 0; ++ time_t m_debug_total_time = 0; ++ int m_debug_pf_read_arr[10]={0}; ++ int m_debug_pf_write_arr[10]={0}; ++ int m_debug_bloom[10]={0};//废弃 ++ long long m_debug_bloom_filtered[10]={0}; ++ long long m_debug_bloom_nfil[10]={0}; ++ long long m_debug_bloom_err[10]={0}; ++ long long m_debug_bloom_noresult[10]={0}; + HashJoinIterator *GetRootHJIterator(); +- ++ void PrintDebugInfo(); ++ std::string GetTableString(hash_join_buffer::TableCollection m_tables); ++ std::set m_dbg_build_hashes; ++//////////测试结束////////// ++ + bool BeginRecursiveHashJoin(); + + // The level of the recursive partition +diff --git a/sql/sql_union.cc b/sql/sql_union.cc +index 70e8dee..5f0a9bd 100644 +--- a/sql/sql_union.cc ++++ b/sql/sql_union.cc +@@ -1232,6 +1232,21 @@ bool SELECT_LEX_UNIT::execute(THD *thd) { + */ + Change_current_select save_select(thd); + ++ printf("================================= QUERY =================================\n"); ++ const MYSQL_LEX_CSTRING &qstr = thd->query(); ++ static char buf[1024 * 32]; ++ for (size_t i = 0; i < qstr.length; i++) buf[i] = (qstr.str[i] == '\n' || qstr.str[i] == '\r') ? '|' : qstr.str[i]; ++ buf[qstr.length] = '\0'; ++ printf("%s\n", buf); ++ printf("join_buff_size = %ld\n", thd->variables.join_buff_size); ++ //printf("hash_join_recursion_depth_limit = %u\n", ++ // thd->variables.hash_join_recursion_depth_limit); ++ printf("hash_join_test = %u\n", ++ thd->variables.hash_join_test); ++ printf("hash_join_max_bloom_size = %lu\n", ++ thd->variables.hash_join_max_bloom_size); ++ printf("=========================================================================\n"); ++ + return ExecuteIteratorQuery(thd); + } + +diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc +index 3545508..1459d7c 100644 +--- a/sql/sys_vars.cc ++++ b/sql/sys_vars.cc +@@ -2145,6 +2145,10 @@ static Sys_var_ulong Sys_join_buffer_size( + "join_buffer_size", "The size of the buffer that is used for full joins", + HINT_UPDATEABLE SESSION_VAR(join_buff_size), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(128, ULONG_MAX), DEFAULT(256 * 1024), BLOCK_SIZE(128)); ++static Sys_var_uint Sys_hash_join_recursion_threshold( ++ "hash_join_test", "How many times as many rows as there are in memory to be processed should there be before triggering a recursion when the hash table is full?", ++ HINT_UPDATEABLE SESSION_VAR(hash_join_test), CMD_LINE(REQUIRED_ARG), ++ VALID_RANGE(0, UINT_MAX32), DEFAULT(0), BLOCK_SIZE(1)); + + static Sys_var_ulong Sys_hash_join_max_bloom_size( + "hash_join_max_bloom_size", "The maximum size of bloom filter used in hash join." +diff --git a/sql/system_variables.h b/sql/system_variables.h +index c9b72a7..fbf1137 100644 +--- a/sql/system_variables.h ++++ b/sql/system_variables.h +@@ -209,6 +209,7 @@ struct System_variables { + uint cte_max_recursion_depth; + ulonglong histogram_generation_max_mem_size; + ulong join_buff_size; ++ uint hash_join_test; + ulong hash_join_max_bloom_size; + ulong lock_wait_timeout; + ulong max_allowed_packet; diff --git a/patches/mysqld_log.docx b/patches/mysqld_log.docx new file mode 100644 index 0000000000000000000000000000000000000000..09338de331200bc081b655911c8ed46d95799e71 Binary files /dev/null and b/patches/mysqld_log.docx differ diff --git a/patches/readme.txt b/patches/readme.txt new file mode 100644 index 0000000000000000000000000000000000000000..22cf712f8f6c4293bd136e9757e8fe6bc3f4d637 --- /dev/null +++ b/patches/readme.txt @@ -0,0 +1,5 @@ +atomicBloom.patch 是并行版本布隆 +bloomLog.patch 是带输出信息的哈希连接 +在当前的mysql8.0.20-bloom-filter分支上可直接使用这两个patch且相互没有冲突。 + +mysqld输出.docx 是打上bloomLog.patch之后运行哈希连接语句会产生的日志示例以及解释。 \ No newline at end of file diff --git a/sql/README.en.md b/sql/README.en.md new file mode 100644 index 0000000000000000000000000000000000000000..3d6f714a33e99524c2e536ceb0a7e445a21d91c7 --- /dev/null +++ b/sql/README.en.md @@ -0,0 +1,36 @@ +# HashJoinBloomFilter + +#### Description +{**When you're done, you can delete the content in this README and update the file with details for others getting started with your repository**} + +#### Software Architecture +Software architecture description + +#### Installation + +1. xxxx +2. xxxx +3. xxxx + +#### Instructions + +1. xxxx +2. xxxx +3. xxxx + +#### Contribution + +1. Fork the repository +2. Create Feat_xxx branch +3. Commit your code +4. Create Pull Request + + +#### Gitee Feature + +1. You can use Readme\_XXX.md to support different languages, such as Readme\_en.md, Readme\_zh.md +2. Gitee blog [blog.gitee.com](https://blog.gitee.com) +3. Explore open source project [https://gitee.com/explore](https://gitee.com/explore) +4. The most valuable open source project [GVP](https://gitee.com/gvp) +5. The manual of Gitee [https://gitee.com/help](https://gitee.com/help) +6. The most popular members [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/sql/README.md b/sql/README.md new file mode 100644 index 0000000000000000000000000000000000000000..7f9598baffe974e9da56c56e3312714d9dc4f744 --- /dev/null +++ b/sql/README.md @@ -0,0 +1,39 @@ +# HashJoinBloomFilter + +#### 介绍 +{**以下是 Gitee 平台说明,您可以替换此简介** +Gitee 是 OSCHINA 推出的基于 Git 的代码托管平台(同时支持 SVN)。专为开发者提供稳定、高效、安全的云端软件开发协作平台 +无论是个人、团队、或是企业,都能够用 Gitee 实现代码托管、项目管理、协作开发。企业项目请看 [https://gitee.com/enterprises](https://gitee.com/enterprises)} + +#### 软件架构 +软件架构说明 + + +#### 安装教程 + +1. xxxx +2. xxxx +3. xxxx + +#### 使用说明 + +1. xxxx +2. xxxx +3. xxxx + +#### 参与贡献 + +1. Fork 本仓库 +2. 新建 Feat_xxx 分支 +3. 提交代码 +4. 新建 Pull Request + + +#### 特技 + +1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md +2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com) +3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目 +4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目 +5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) +6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/sql/hash_join_buffer.h b/sql/hash_join_buffer.h index 3d1457c7dedb0dfb5b0bcfc67ffcfdd5f3acfcb0..9c8ed89d4e91fde2063d462b12ee529dd8d898ae 100644 --- a/sql/hash_join_buffer.h +++ b/sql/hash_join_buffer.h @@ -372,7 +372,7 @@ class HashJoinRowBuffer { hash_map_iterator m_last_row_stored; // The following functions and fields are related to bloom filters - using bloom_filter_type = bloom_filter; + using bloom_filter_type = bloom_filter; const size_t m_bloom_max_mem_available; unique_ptr_destroy_only m_bloom_filter; unique_ptr_destroy_only m_hash_store; diff --git a/sql/hash_join_iterator.cc b/sql/hash_join_iterator.cc index e22d629713ab4f8ab603010162e79aa9c70e55d4..9bc0df0de643ea5e087a35e7e8eb40e32958a613 100644 --- a/sql/hash_join_iterator.cc +++ b/sql/hash_join_iterator.cc @@ -64,7 +64,7 @@ static bool ShouldStartRecursiveJoin( static bool NeedBloom(THD *thd, const unique_ptr_destroy_only &build_input, const unique_ptr_destroy_only &probe_input, const Prealloced_array &m_join_conditions, - size_t max_bloom_size, bool is_recursive_join); + size_t max_bloom_size, bool is_recursive_join, bool allow_spill_to_disk); HashJoinIterator::HashJoinIterator( THD *thd, unique_ptr_destroy_only build_input, @@ -263,10 +263,11 @@ bool HashJoinIterator::Init() { m_current_chunk = -1; // The HashJoinIterator::Init() will be called multiple times when there are - // subqueries, so the need_bloom state will have to be reset each time the - // Init() is called. + // subqueries, so the need_bloom state will have to be reset each time Init() + // is called. bool need_bloom = NeedBloom(thd(), m_build_input, m_probe_input, m_join_conditions, - m_row_buffer.bloom_filter_get_max_mem(), m_recursion_depth > 0); + m_row_buffer.bloom_filter_get_max_mem(), + m_recursion_depth > 0, m_allow_spill_to_disk); m_row_buffer.set_need_bloom(need_bloom); // Build the hash table. @@ -540,6 +541,9 @@ bool HashJoinIterator::BuildHashTable() { // that we only read unmatched probe rows. InitWritingToProbeRowSavingFile(); } + if(ConstructBloomWithHashStore()){ + return true; + } SetReadingProbeRowState(); return false; } @@ -1409,8 +1413,9 @@ HashJoinIterator *HashJoinIterator::GetRootHJIterator() { static bool NeedBloom(THD *thd, const unique_ptr_destroy_only &build_input, const unique_ptr_destroy_only &probe_input, const Prealloced_array &m_join_conditions, - size_t max_bloom_size, bool is_recursive_join) { + size_t max_bloom_size, bool is_recursive_join, bool allow_spill_to_disk) { if (max_bloom_size == 0) return false; + if (allow_spill_to_disk == 0) return false; bool build_is_table_scan_iter = typeid(*build_input.get()) == typeid(TableScanIterator); bool probe_is_table_scan_iter = typeid(*probe_input.get()) == typeid(TableScanIterator); @@ -1473,9 +1478,8 @@ bool HashJoinIterator::ConstructBloomWithHashStore() { size_t cardinality = hash_store->EstimateCardinality(); hash_store->Rewind(); - size_t bloom_size = (size_t)((-1.0) * cardinality * log(0.05) / - (log(2.0) * log(2.0))); - if(m_row_buffer.bloom_filter_init(bloom_size)){ + size_t bloom_size = (size_t)(2 * cardinality - 1); + if(m_row_buffer.bloom_filter_init(bloom_size)) { return true; } @@ -1507,11 +1511,10 @@ static bool ShouldStartRecursiveJoin( double cost_direct = num_row_build_chunk + ceil(alpha) * num_row_probe_chunk; double cost_recursive = 3 * num_row_build_chunk + (2 + ceil((alpha - 1) / max_bucket_count)) * num_row_probe_chunk - num_read; - - // I/O cost of probe row saving file - if (!is_inner_join) { - cost_direct += ceil(alpha) * num_row_probe_chunk; - cost_recursive += (ceil((alpha - 1) / max_bucket_count) + 1) * num_row_probe_chunk; + + if(!is_inner_join) { + // The additional cost caused by probe row saving should be the same. } + return cost_direct > cost_recursive * 1.05; } \ No newline at end of file