From aee608f9579f589add4d689da40a2bf43d80fd44 Mon Sep 17 00:00:00 2001 From: liangruifeng <960641173@qq.com> Date: Fri, 6 Dec 2024 15:20:40 +0800 Subject: [PATCH 01/10] Fix endless cycle problem and Change functions as safety functions. --- .../cpp/src/orcfile/OmniRLEv2.cc | 16 ++++----- .../cpp/src/parquet/ParquetDecoder.h | 2 +- .../src/parquet/ParquetTypedRecordReader.cpp | 2 +- .../cpp/src/shuffle/ock_splitter.cpp | 4 +-- .../cpp/src/common/common.h | 2 +- .../cpp/src/io/Compression.cc | 6 ++-- .../cpp/src/io/MemoryPool.cc | 7 ++-- .../cpp/src/shuffle/splitter.cpp | 34 +++++++++++-------- .../cpp/test/utils/test_utils.cpp | 13 +++---- .../cpp/test/utils/test_utils.h | 2 +- 10 files changed, 48 insertions(+), 40 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc index 24340ab8b..a5c515d3c 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc @@ -641,7 +641,7 @@ namespace omniruntime::reader { uint64_t b0, b1, b2, b3, b4, b5, b6, b7; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -683,7 +683,7 @@ namespace omniruntime::reader { uint64_t b0, b1, b2, b3, b4, b5, b6; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -723,7 +723,7 @@ namespace omniruntime::reader { uint64_t b0, b1, b2, b3, b4, b5; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -761,7 +761,7 @@ namespace omniruntime::reader { uint64_t b0, b1, b2, b3, b4; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -797,7 +797,7 @@ namespace omniruntime::reader { uint32_t b0, b1, b2, b3; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -832,7 +832,7 @@ namespace omniruntime::reader { uint32_t b0, b1, b2; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -865,7 +865,7 @@ namespace omniruntime::reader { uint16_t b0, b1; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); buffer += 2; @@ -895,7 +895,7 @@ namespace omniruntime::reader { bufferNum = std::min(bufferNum, static_cast(offset + len - curIdx)); // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { data[curIdx++] = *buffer++; } bufferStart = reinterpret_cast(buffer); diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h index d6661ee7d..8a04e338c 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h @@ -70,7 +70,7 @@ namespace omniruntime::reader { for (int i = num_values - 1; i >= 0; --i) { if (!nulls[i]) { idx_decode--; - std::memmove(buffer + i, buffer + idx_decode, sizeof(T)); + memmove_s(buffer + i, sizeof(T), buffer + idx_decode, sizeof(T)); } } assert(idx_decode == 0); diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp index edd7b07a6..262f1498d 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp @@ -502,7 +502,7 @@ Status RawBytesToDecimal64Bytes(const uint8_t* bytes, int32_t length, void DefLevelsToNullsSIMD(const int16_t* def_levels, int64_t num_def_levels, const int16_t max_def_level, int64_t* values_read, int64_t* null_count, bool* nulls) { - for (int i = 0; i < num_def_levels; ++i) { + for (int64_t i = 0; i < num_def_levels; ++i) { if (def_levels[i] < max_def_level) { nulls[i] = true; (*null_count)++; diff --git a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_splitter.cpp b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_splitter.cpp index 68f4ce912..ee6de06c4 100644 --- a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_splitter.cpp +++ b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_splitter.cpp @@ -212,7 +212,7 @@ bool OckSplitter::WriteFixedWidthValueTemple(BaseVector *vector, bool isDict, st for (uint32_t index = 0; index < rowNum; ++index) { uint32_t idIndex = rowIndexes[index]; if (UNLIKELY(idIndex >= idsNum)) { - LOG_ERROR("Invalid idIndex %d, idsNum.", idIndex, idsNum); + LOG_ERROR("Invalid idIndex %d, idsNum %d.", idIndex, idsNum); return false; } uint32_t rowIndex = reinterpret_cast(ids)[idIndex]; @@ -257,7 +257,7 @@ bool OckSplitter::WriteDecimal128(BaseVector *vector, bool isDict, std::vector= idsNum)) { - LOG_ERROR("Invalid idIndex %d, idsNum.", idIndex, idsNum); + LOG_ERROR("Invalid idIndex %d, idsNum %d.", idIndex, idsNum); return false; } uint32_t rowIndex = reinterpret_cast(ids)[idIndex]; diff --git a/omnioperator/omniop-spark-extension/cpp/src/common/common.h b/omnioperator/omniop-spark-extension/cpp/src/common/common.h index 1578b8514..5b72ba5c7 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/common/common.h +++ b/omnioperator/omniop-spark-extension/cpp/src/common/common.h @@ -67,7 +67,7 @@ int32_t BytesGen(uint64_t offsetsAddr, std::string &nullStr, uint64_t valuesAddr } if (len != 0) { - memcpy((char *) (values + offsets[i]), addr, len); + memcpy_s((char *) (values + offsets[i]), len, addr, len); valueTotalLen += len; } } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc index 720f7ff19..01f89bc45 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc @@ -24,6 +24,7 @@ #include #include #include +#include "securec.h" #include "zlib.h" #include "zstd.h" @@ -189,8 +190,9 @@ namespace spark { char * header = outputBuffer + outputPosition - totalCompressedSize - 3; if (totalCompressedSize >= static_cast(bufferSize)) { writeHeader(header, static_cast(bufferSize), true); - memcpy( + memcpy_s( header + 3, + static_cast(bufferSize), rawInputBuffer.data(), static_cast(bufferSize)); @@ -396,7 +398,7 @@ namespace spark { } int sizeToWrite = std::min(totalSizeToWrite, outputSize - outputPosition); - memcpy(dst, dataToWrite, static_cast(sizeToWrite)); + memcpy_s(dst, static_cast(sizeToWrite), dataToWrite, static_cast(sizeToWrite)); outputPosition += sizeToWrite; dataToWrite += sizeToWrite; diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc b/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc index b4fd9e345..4c4b25feb 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc @@ -23,6 +23,7 @@ #include #include #include +#include "securec.h" namespace spark { @@ -91,7 +92,7 @@ namespace spark { if (buf) { T* buf_old = buf; buf = reinterpret_cast(memoryPool.malloc(sizeof(T) * newCapacity)); - memcpy(buf, buf_old, sizeof(T) * currentSize); + memcpy_s(buf, sizeof(T) * currentSize, buf_old, sizeof(T) * currentSize); memoryPool.free(reinterpret_cast(buf_old)); } else { buf = reinterpret_cast(memoryPool.malloc(sizeof(T) * newCapacity)); @@ -113,7 +114,7 @@ namespace spark { void DataBuffer::resize(uint64_t newSize) { reserve(newSize); if (newSize > currentSize) { - memset(buf + currentSize, 0, newSize - currentSize); + memset_s(buf + currentSize, newSize - currentSize, 0, newSize - currentSize); } currentSize = newSize; } @@ -131,7 +132,7 @@ namespace spark { void DataBuffer::resize(uint64_t newSize) { reserve(newSize); if (newSize > currentSize) { - memset(buf + currentSize, 0, newSize - currentSize); + memset_s(buf + currentSize, newSize - currentSize, 0, newSize - currentSize); } currentSize = newSize; } diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 2970d5fab..ccfa024c1 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -25,7 +25,7 @@ SplitOptions SplitOptions::Defaults() { return SplitOptions(); } // 计算分区id,每个batch初始化 int Splitter::ComputeAndCountPartitionId(VectorBatch& vb) { auto num_rows = vb.GetRowCount(); - std::memset(partition_id_cnt_cur_, 0, num_partitions_ * sizeof(int32_t)); + memset_s(partition_id_cnt_cur_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); partition_id_.resize(num_rows); if (singlePartitionFlag) { @@ -121,7 +121,7 @@ int Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t new_size) { int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { const auto num_rows = vb.GetRowCount(); for (uint col = 0; col < fixed_width_array_idx_.size(); ++col) { - std::memset(partition_buffer_idx_offset_, 0, num_partitions_ * sizeof(int32_t)); + memset_s(partition_buffer_idx_offset_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); auto col_idx_vb = fixed_width_array_idx_[col]; auto col_idx_schema = singlePartitionFlag ? col_idx_vb : (col_idx_vb - 1); const auto& dst_addrs = partition_fixed_width_value_addrs_[col]; @@ -352,7 +352,7 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ std::shared_ptr validity_buffer ( new Buffer((uint8_t *)ptr_tmp, partition_id_cnt_cur_[pid], new_size)); dst_addrs[pid] = const_cast(validity_buffer->data_); - std::memset(validity_buffer->data_, 0, new_size); + memset_s(validity_buffer->data_, new_size, 0, new_size); partition_fixed_width_buffers_[col][pid][0] = std::move(validity_buffer); fixed_nullBuffer_size_[pid] += new_size; } @@ -361,7 +361,7 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ // 计算并填充数据 auto src_addr = const_cast((uint8_t *)( reinterpret_cast(omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vb.Get(col_idx))))); - std::memset(partition_buffer_idx_offset_, 0, num_partitions_ * sizeof(int32_t)); + memset_s(partition_buffer_idx_offset_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); const auto num_rows = vb.GetRowCount(); for (auto row = 0; row < num_rows; ++row) { auto pid = partition_id_[row]; @@ -774,12 +774,14 @@ void Splitter::SerializingFixedColumns(int32_t partitionId, } if ((onceCopyLen - destCopyedLength) >= (cacheBatchSize - splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp])) { memCopyLen = cacheBatchSize - splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp]; - memcpy((uint8_t*)(ptr_value->data_) + destCopyedLength, + memcpy_s((uint8_t*)(ptr_value->data_) + destCopyedLength, + memCopyLen, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][1]->data_ + splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp], memCopyLen); // (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])) 等比例计算null数组偏移 if (partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0] != nullptr) { - memcpy((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), + memcpy_s((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), + memCopyLen / (1 << column_type_id_[colIndexTmpSchema]), partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_ + (splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] / (1 << column_type_id_[colIndexTmpSchema])), memCopyLen / (1 << column_type_id_[colIndexTmpSchema])); // 释放内存 @@ -795,13 +797,15 @@ void Splitter::SerializingFixedColumns(int32_t partitionId, splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] = 0; // 初始化下一个cacheBatch的起始偏移 } else { memCopyLen = onceCopyLen - destCopyedLength; - memcpy((uint8_t*)(ptr_value->data_) + destCopyedLength, + memcpy_s((uint8_t*)(ptr_value->data_) + destCopyedLength, + memCopyLen, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][1]->data_ + splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp], memCopyLen); // (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])) 等比例计算null数组偏移 if(partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0] != nullptr) { - memcpy((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), + memcpy_s((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), + memCopyLen / (1 << column_type_id_[colIndexTmpSchema]), partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_ + (splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] / (1 << column_type_id_[colIndexTmpSchema])), memCopyLen / (1 << column_type_id_[colIndexTmpSchema])); } @@ -910,7 +914,7 @@ int32_t Splitter::ProtoWritePartition(int32_t partition_id, std::unique_ptr(vecBatchProto->ByteSizeLong())); if (bufferStream->Next(&bufferOut, &sizeOut)) { - std::memcpy(bufferOut, &vecBatchProtoSize, sizeof(vecBatchProtoSize)); + memcpy_s(bufferOut, sizeof(vecBatchProtoSize), &vecBatchProtoSize, sizeof(vecBatchProtoSize)); if (sizeof(vecBatchProtoSize) < static_cast(sizeOut)) { bufferStream->BackUp(sizeOut - sizeof(vecBatchProtoSize)); } @@ -977,7 +981,7 @@ int32_t Splitter::ProtoWritePartitionByRow(int32_t partition_id, std::unique_ptr } uint32_t protoRowBatchSize = reversebytes_uint32t(static_cast(protoRowBatch->ByteSizeLong())); if (bufferStream->Next(&bufferOut, &sizeOut)) { - std::memcpy(bufferOut, &protoRowBatchSize, sizeof(protoRowBatchSize)); + memcpy_s(bufferOut, sizeof(protoRowBatchSize), &protoRowBatchSize, sizeof(protoRowBatchSize)); if (sizeof(protoRowBatchSize) < static_cast(sizeOut)) { bufferStream->BackUp(sizeOut - sizeof(protoRowBatchSize)); } @@ -1061,7 +1065,7 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptrSerializeToZeroCopyStream(bufferStream.get()); @@ -1133,7 +1137,7 @@ int Splitter::protoSpillPartitionByRow(int32_t partition_id, std::unique_ptrSerializeToZeroCopyStream(bufferStream.get()); @@ -1162,7 +1166,7 @@ int Splitter::WriteDataFileProto() { for (auto pid = 0; pid < num_partitions_; ++pid) { protoSpillPartition(pid, bufferStream); } - std::memset(partition_id_cnt_cache_, 0, num_partitions_ * sizeof(uint64_t)); + memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); outStream->close(); return 0; } @@ -1232,7 +1236,7 @@ void Splitter::MergeSpilled() { } } - std::memset(partition_id_cnt_cache_, 0, num_partitions_ * sizeof(uint64_t)); + memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); ReleaseVarcharVector(); num_row_splited_ = 0; cached_vectorbatch_size_ = 0; @@ -1305,7 +1309,7 @@ void Splitter::WriteSplit() { ProtoWritePartition(pid, bufferOutPutStream, bufferOut, sizeOut); } - std::memset(partition_id_cnt_cache_, 0, num_partitions_ * sizeof(uint64_t)); + memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); ReleaseVarcharVector(); num_row_splited_ = 0; cached_vectorbatch_size_ = 0; diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp index 6b824bf31..d3bf7f402 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp @@ -455,26 +455,27 @@ void Test_splitter_close(long splitter_addr) { delete splitter; } -void GetFilePath(const char *path, const char *filename, char *filepath) { - strcpy(filepath, path); +void GetFilePath(const char *path, const char *filename, char *filepath, const uint64_t filepathLen) { + strcpy_s(filepath, filepathLen, path); if(filepath[strlen(path) - 1] != '/') { - strcat(filepath, "/"); + strcat_s(filepath, filepathLen, "/"); } - strcat(filepath, filename); + strcat_s(filepath, filepathLen, filename); } void DeletePathAll(const char* path) { DIR *dir; struct dirent *dirInfo; struct stat statBuf; - char filepath[256] = {0}; + static constexpr uint32_t FILE_PATH_LEN = 256; + char filepath[FILE_PATH_LEN] = {0}; lstat(path, &statBuf); if (S_ISREG(statBuf.st_mode)) { remove(path); } else if (S_ISDIR(statBuf.st_mode)) { if ((dir = opendir(path)) != NULL) { while ((dirInfo = readdir(dir)) != NULL) { - GetFilePath(path, dirInfo->d_name, filepath); + GetFilePath(path, dirInfo->d_name, filepath, FILE_PATH_LEN); if (strcmp(dirInfo->d_name, ".") == 0 || strcmp(dirInfo->d_name, "..") == 0) { continue; } diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h index d822b5773..512dd9d17 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h @@ -126,7 +126,7 @@ void Test_splitter_stop(long splitter_id); void Test_splitter_close(long splitter_id); -void GetFilePath(const char *path, const char *filename, char *filepath); +void GetFilePath(const char *path, const char *filename, char *filepath, const uint64_t filepathLen); void DeletePathAll(const char* path); -- Gitee From 898da515e95ff3be28ed2e849fc50e57c94cb66e Mon Sep 17 00:00:00 2001 From: fhyb_llb <7570576+fhyb_llb@user.noreply.gitee.com> Date: Fri, 13 Dec 2024 16:59:45 +0800 Subject: [PATCH 02/10] Community case fixing --- .../boostkit/spark/util/OmniAdaptorUtil.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala index ceab7e07d..3892d0d0c 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala @@ -357,9 +357,12 @@ object OmniAdaptorUtil { if (projectExprIdList.nonEmpty) { val projectOutput = ListBuffer[Attribute]() for (index <- projectExprIdList.indices) { - for (col <- output) { - if (col.exprId.equals(projectExprIdList(index))) { - projectOutput += col + breakable { + for (col <- output) { + if (col.exprId.equals(projectExprIdList(index))) { + projectOutput += col + break + } } } } @@ -373,10 +376,13 @@ object OmniAdaptorUtil { if (projectExprIdList.nonEmpty) { val indexList = ListBuffer[Int]() for (index <- projectExprIdList.indices) { - for (i <- output.indices) { - val col = output(i) - if (col.exprId.equals(projectExprIdList(index))) { - indexList += i + breakable { + for (i <- output.indices) { + val col = output(i) + if (col.exprId.equals(projectExprIdList(index))) { + indexList += i + break + } } } } -- Gitee From f46346a2fb9ee5b00dfaf103d8453dbee36f7f39 Mon Sep 17 00:00:00 2001 From: kongxinghan Date: Fri, 13 Dec 2024 19:11:40 +0800 Subject: [PATCH 03/10] fix BHJ share hash table BUG --- .../joins/ColumnarBroadcastHashJoinExec.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index bb3430b6c..335cd0a30 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -64,7 +64,7 @@ case class ColumnarBroadcastHashJoinExec( isNullAwareAntiJoin: Boolean = false, projectList: Seq[NamedExpression] = Seq.empty) extends HashJoin { - + val shareHashTableKey : String = s"${left.id}-${right.id}" override def verboseStringWithOperatorId(): String = { val joinCondStr = if (condition.isDefined) { s"${condition.get}${condition.get.dataType}" @@ -356,7 +356,7 @@ case class ColumnarBroadcastHashJoinExec( buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) if (isShared) { - OmniHashBuilderWithExprOperatorFactory.saveHashBuilderOperatorAndFactory(buildPlan.id, + OmniHashBuilderWithExprOperatorFactory.saveHashBuilderOperatorAndFactory(shareHashTableKey, opFactory, op) } val deserializer = VecBatchSerializerFactory.create() @@ -371,7 +371,7 @@ case class ColumnarBroadcastHashJoinExec( } catch { case e: Exception => { if (isShared) { - OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlan.id) + OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(shareHashTableKey) } else { op.close() opFactory.close() @@ -388,7 +388,7 @@ case class ColumnarBroadcastHashJoinExec( if (enableShareBuildOp && canShareBuildOp) { OmniHashBuilderWithExprOperatorFactory.gLock.lock() try { - buildOpFactory = OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(buildPlan.id) + buildOpFactory = OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(shareHashTableKey) if (buildOpFactory == null) { val (opFactory, op) = createBuildOpFactoryAndOp(true) buildOpFactory = opFactory @@ -421,7 +421,7 @@ case class ColumnarBroadcastHashJoinExec( lookupOpFactory.close() if (enableShareBuildOp && canShareBuildOp) { OmniHashBuilderWithExprOperatorFactory.gLock.lock() - OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlan.id) + OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(shareHashTableKey) OmniHashBuilderWithExprOperatorFactory.gLock.unlock() } else { buildOp.close() -- Gitee From ebe4c4953347c84bfe885b0bfa62bf95e1e20015 Mon Sep 17 00:00:00 2001 From: dengzhaochu Date: Sat, 14 Dec 2024 18:05:47 +0800 Subject: [PATCH 04/10] fix literal -0.0d transform to json --- .../spark/expression/OmniExpressionAdaptorSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptorSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptorSuite.scala index ded676538..03b953a77 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptorSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptorSuite.scala @@ -311,5 +311,13 @@ class OmniExpressionAdaptorSuite extends SparkFunSuite { checkJsonKeyValueIgnoreKeySequence(coalesceExp, coalesceResult, coalesce) } + test("test double -0.0") { + val literal = Literal(-0.0d) + val result = rewriteToOmniJsonExpressionLiteral(literal, Map.empty) + val expected = "{\"exprType\":\"LITERAL\",\"dataType\":3,\"isNull\":false,\"value\":-0.0}" + checkJsonKeyValueIgnoreKeySequence(expected, result, literal) + } + + } -- Gitee From fc3e04a0a5caf334a5c5f8858be1546fd0890485 Mon Sep 17 00:00:00 2001 From: kongxinghan Date: Sat, 14 Dec 2024 22:49:22 +0800 Subject: [PATCH 05/10] fix BHJ share hash table BUG --- .../joins/ColumnarBroadcastHashJoinExec.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index 335cd0a30..1f958b998 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.util.{MergeIterator, SparkMemoryUtils} import org.apache.spark.sql.execution.vectorized.OmniColumnVector import org.apache.spark.sql.vectorized.ColumnarBatch - +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec /** * Performs an inner hash join of two child relations. When the output RDD of this operator is * being constructed, a Spark job is asynchronously started to calculate the values for the @@ -64,7 +64,7 @@ case class ColumnarBroadcastHashJoinExec( isNullAwareAntiJoin: Boolean = false, projectList: Seq[NamedExpression] = Seq.empty) extends HashJoin { - val shareHashTableKey : String = s"${left.id}-${right.id}" + override def verboseStringWithOperatorId(): String = { val joinCondStr = if (condition.isDefined) { s"${condition.get}${condition.get.dataType}" @@ -336,7 +336,12 @@ case class ColumnarBroadcastHashJoinExec( val projectListIndex = getProjectListIndex(projectExprIdList, prunedStreamedOutput, prunedBuildOutput) val lookupJoinType = OmniExpressionAdaptor.toOmniJoinType(joinType) val canShareBuildOp = (lookupJoinType != OMNI_JOIN_TYPE_RIGHT && lookupJoinType != OMNI_JOIN_TYPE_FULL) - + val buildPlanId = buildPlan match { + case exec: BroadcastQueryStageExec => + exec.broadcast.id + case _ => + buildPlan.id + } streamedPlan.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val filter: Optional[String] = condition match { case Some(expr) => @@ -356,7 +361,7 @@ case class ColumnarBroadcastHashJoinExec( buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) if (isShared) { - OmniHashBuilderWithExprOperatorFactory.saveHashBuilderOperatorAndFactory(shareHashTableKey, + OmniHashBuilderWithExprOperatorFactory.saveHashBuilderOperatorAndFactory(buildPlanId, opFactory, op) } val deserializer = VecBatchSerializerFactory.create() @@ -371,7 +376,7 @@ case class ColumnarBroadcastHashJoinExec( } catch { case e: Exception => { if (isShared) { - OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(shareHashTableKey) + OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlanId) } else { op.close() opFactory.close() @@ -388,7 +393,7 @@ case class ColumnarBroadcastHashJoinExec( if (enableShareBuildOp && canShareBuildOp) { OmniHashBuilderWithExprOperatorFactory.gLock.lock() try { - buildOpFactory = OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(shareHashTableKey) + buildOpFactory = OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(buildPlanId) if (buildOpFactory == null) { val (opFactory, op) = createBuildOpFactoryAndOp(true) buildOpFactory = opFactory @@ -421,7 +426,7 @@ case class ColumnarBroadcastHashJoinExec( lookupOpFactory.close() if (enableShareBuildOp && canShareBuildOp) { OmniHashBuilderWithExprOperatorFactory.gLock.lock() - OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(shareHashTableKey) + OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlanId) OmniHashBuilderWithExprOperatorFactory.gLock.unlock() } else { buildOp.close() -- Gitee From 34e4bc1d442def4d483656a228b17c9dc85e0e18 Mon Sep 17 00:00:00 2001 From: dengzhaochu Date: Mon, 16 Dec 2024 00:54:28 +0800 Subject: [PATCH 06/10] Reduce the scope of sharing build op. Restricted to same plan --- .../sql/execution/joins/ColumnarBroadcastHashJoinExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index 1f958b998..cca636913 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -338,7 +338,7 @@ case class ColumnarBroadcastHashJoinExec( val canShareBuildOp = (lookupJoinType != OMNI_JOIN_TYPE_RIGHT && lookupJoinType != OMNI_JOIN_TYPE_FULL) val buildPlanId = buildPlan match { case exec: BroadcastQueryStageExec => - exec.broadcast.id + exec.plan.id case _ => buildPlan.id } -- Gitee From 62d63c8eb24d7c3acaaeee256e438c2ee12c6338 Mon Sep 17 00:00:00 2001 From: fhyb_llb <7570576+fhyb_llb@user.noreply.gitee.com> Date: Mon, 2 Dec 2024 17:02:29 +0800 Subject: [PATCH 07/10] multi version --- .../omniop-spark-extension/.gitignore | 38 ++++++++++ .../apache/spark/sql/execution/SortExec.scala | 70 +++++++++---------- 2 files changed, 73 insertions(+), 35 deletions(-) create mode 100644 omnioperator/omniop-spark-extension/.gitignore diff --git a/omnioperator/omniop-spark-extension/.gitignore b/omnioperator/omniop-spark-extension/.gitignore new file mode 100644 index 000000000..5ff6309b7 --- /dev/null +++ b/omnioperator/omniop-spark-extension/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 0ddf89b8c..9f70e5ae9 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -42,11 +42,11 @@ import org.apache.spark.util.collection.unsafe.sort.PrefixComparator * If set, will spill every 'frequency' records. * */ abstract class SortExecBase( - sortOrder: Seq[SortOrder], - global: Boolean, - child: SparkPlan, - testSpillFrequency: Int = 0) - extends UnaryExecNode with BlockingOperatorWithCodegen { + sortOrder: Seq[SortOrder], + global: Boolean, + child: SparkPlan, + testSpillFrequency: Int = 0) + extends UnaryExecNode with BlockingOperatorWithCodegen { override def output: Seq[Attribute] = child.output @@ -70,11 +70,11 @@ abstract class SortExecBase( protected val sorterClassName: String protected def newSorterInstance( - ordering: Ordering[InternalRow], - prefixComparator: PrefixComparator, - prefixComputer: PrefixComputer, - pageSize: Long, - canSortFullyWIthPrefix: Boolean): AbstractUnsafeRowSorter + ordering: Ordering[InternalRow], + prefixComparator: PrefixComparator, + prefixComputer: PrefixComputer, + pageSize: Long, + canSortFullyWIthPrefix: Boolean): AbstractUnsafeRowSorter private[sql] var rowSorter: AbstractUnsafeRowSorter = _ @@ -100,10 +100,10 @@ abstract class SortExecBase( val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix override def computePrefix(row: InternalRow): - UnsafeExternalRowSorter.PrefixComputer.Prefix = { - val prefix = prefixProjection.apply(row) - result.isNull = prefix.isNullAt(0) - result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0) + UnsafeExternalRowSorter.PrefixComputer.Prefix = { + val prefix = prefixProjection.apply(row) + result.isNull = prefix.isNullAt(0) + result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0) result } } @@ -167,9 +167,9 @@ abstract class SortExecBase( val addToSorter = ctx.freshName("addToSorter") val addToSorterFuncName = ctx.addNewFunction(addToSorter, s""" - | private void $addToSorter() throws java.io.IOException { - | ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} - | } + | private void $addToSorter() throws java.io.IOException { + | ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} + | } """.stripMargin.trim) val outputRow = ctx.freshName("outputRow") @@ -178,29 +178,29 @@ abstract class SortExecBase( val spillSizeBefore = ctx.freshName("spillSizeBefore") val sortTime = metricTerm(ctx, "sortTime") s""" - | if ($needToSort) { - | long $spillSizeBefore = $metrics.memoryBytesSpilled(); - | $addToSorterFuncName(); - | $sortedIterator = $sorterVariable.sort(); - | $sortTime.add($sorterVariable.getSortTimeNanos() / $NANOS_PER_MILLIS); - | $peakMemory.add($sorterVariable.getPeakMemoryUsage()); - | $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore); - | $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage()); - | $needToSort = false; - | } - | - | while ($limitNotReachedCond $sortedIterator.hasNext()) { - | UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next(); - | ${consume(ctx, null, outputRow)} - | if (shouldStop()) return; - | } + | if ($needToSort) { + | long $spillSizeBefore = $metrics.memoryBytesSpilled(); + | $addToSorterFuncName(); + | $sortedIterator = $sorterVariable.sort(); + | $sortTime.add($sorterVariable.getSortTimeNanos() / $NANOS_PER_MILLIS); + | $peakMemory.add($sorterVariable.getPeakMemoryUsage()); + | $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore); + | $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage()); + | $needToSort = false; + | } + | + | while ($limitNotReachedCond $sortedIterator.hasNext()) { + | UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next(); + | ${consume(ctx, null, outputRow)} + | if (shouldStop()) return; + | } """.stripMargin.trim } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { s""" - | ${row.code} - | $sorterVariable.insertRow((UnsafeRow)${row.value}); + | ${row.code} + | $sorterVariable.insertRow((UnsafeRow)${row.value}); """.stripMargin } -- Gitee From 2a648d37f31e3fb487b821317b84c039cb7e1094 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Fri, 13 Dec 2024 15:11:53 +0800 Subject: [PATCH 08/10] =?UTF-8?q?nestedloopJoin=E7=AE=97=E5=AD=90=E6=96=B0?= =?UTF-8?q?=E5=A2=9EisShare=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../boostkit/spark/ColumnarPluginConfig.scala | 8 +++ .../ColumnarBroadcastNestedLoopJoinExec.scala | 53 +++++++++++++++---- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index ea846f188..e274da689 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -63,6 +63,8 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { def enableShareBroadcastJoinHashTable: Boolean = conf.getConf(ENABLE_SHARE_BROADCAST_JOIN_HASH_TABLE) + def enableShareBroadcastJoinNestedTable: Boolean = conf.getConf(ENABLE_SHARE_BROADCAST_JOIN_NESTED_TABLE) + def enableHeuristicJoinReorder: Boolean = conf.getConf(ENABLE_HEURISTIC_JOIN_REORDER) def enableDelayCartesianProduct: Boolean = conf.getConf(ENABLE_DELAY_CARTESIAN_PRODUCT) @@ -301,6 +303,12 @@ object ColumnarPluginConfig { .booleanConf .createWithDefault(true) + val ENABLE_SHARE_BROADCAST_JOIN_NESTED_TABLE = buildConf("spark.omni.sql.columnar.broadcastJoin.sharenestedtable") + .internal() + .doc("enable or disable share columnar BroadcastNestedLoopJoin buildtable") + .booleanConf + .createWithDefault(true) + val ENABLE_HEURISTIC_JOIN_REORDER = buildConf("spark.sql.heuristicJoinReorder.enabled") .internal() .doc("enable or disable heuristic join reorder") diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 56814fb34..225e54d95 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -182,6 +182,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( } val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getSessionConf + val enableShareBuildOp: Boolean = columnarConf.enableShareBroadcastJoinNestedTable val enableJoinBatchMerge: Boolean = columnarConf.enableJoinBatchMerge val projectExprIdList = getExprIdForProjectList(projectList) @@ -207,6 +208,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( val projectListIndex = getProjectListIndex(projectExprIdList, prunedStreamedOutput, prunedBuildOutput) val lookupJoinType = OmniExpressionAdaptor.toOmniJoinType(joinType) + val canShareBuildOp = (lookupJoinType != OMNI_JOIN_TYPE_RIGHT) val relation = buildPlan.executeBroadcast[ColumnarHashedRelation]() streamedPlan.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val filter: Optional[String] = condition match { @@ -217,13 +219,17 @@ case class ColumnarBroadcastNestedLoopJoinExec( Optional.empty() } - def createBuildOpFactoryAndOp(): (OmniNestedLoopJoinBuildOperatorFactory, OmniOperator) = { + def createBuildOpFactoryAndOp(isShared: Boolean): (OmniNestedLoopJoinBuildOperatorFactory, OmniOperator) = { val startBuildCodegen = System.nanoTime() val opFactory = new OmniNestedLoopJoinBuildOperatorFactory(buildTypes, buildOutputCols) val op = opFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) + if (isShared) { + OmniNestedLoopJoinBuildOperatorFactory.saveNestedLoopJoinBuilderOperatorAndFactory(buildPlan.id, + opFactory, op) + } val deserializer = VecBatchSerializerFactory.create() relation.value.buildData.foreach { input => val startBuildInput = System.nanoTime() @@ -235,9 +241,13 @@ case class ColumnarBroadcastNestedLoopJoinExec( op.getOutput } catch { case e: Exception => { - op.close() - opFactory.close() - throw new RuntimeException("NestedLoopJoinBuilder getOutput failed") + if (isShared) { + OmniNestedLoopJoinBuildOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlan.id) + } else { + op.close() + opFactory.close() + } + throw new RuntimeException("Nested loop join builder getOutput failed") } } buildGetOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildGetOp) @@ -245,9 +255,27 @@ case class ColumnarBroadcastNestedLoopJoinExec( } var buildOp: OmniOperator = null var buildOpFactory: OmniNestedLoopJoinBuildOperatorFactory = null - val (opFactory, op) = createBuildOpFactoryAndOp() - buildOpFactory = opFactory - buildOp = op + if (enableShareBuildOp && canShareBuildOp) { + OmniNestedLoopJoinBuildOperatorFactory.gLock.lock() + try { + buildOpFactory = OmniNestedLoopJoinBuildOperatorFactory.getNestedLoopJoinBuilderOperatorFactory(buildPlan.id) + if (buildOpFactory == null) { + val (opFactory, op) = createBuildOpFactoryAndOp(true) + buildOpFactory = opFactory + buildOp = op + } + } catch { + case e: Exception => { + throw new RuntimeException("nested loop build failed. errmsg:" + e.getMessage()) + } + } finally { + OmniNestedLoopJoinBuildOperatorFactory.gLock.unlock() + } + } else { + val (opFactory, op) = createBuildOpFactoryAndOp(false) + buildOpFactory = opFactory + buildOp = op + } val startLookupCodegen = System.nanoTime() @@ -261,9 +289,14 @@ case class ColumnarBroadcastNestedLoopJoinExec( SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { lookupOp.close() lookupOpFactory.close() - buildOp.close() - buildOpFactory.close() - + if (enableShareBuildOp && canShareBuildOp) { + OmniNestedLoopJoinBuildOperatorFactory.gLock.lock() + OmniNestedLoopJoinBuildOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlan.id) + OmniNestedLoopJoinBuildOperatorFactory.gLock.unlock() + } else { + buildOp.close() + buildOpFactory.close() + } }) val resultSchema = this.schema -- Gitee From 9625689469feb9bb5fef29f8ab71af39daecc31f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Fri, 13 Dec 2024 16:50:17 +0800 Subject: [PATCH 09/10] =?UTF-8?q?nestedloopJoin=E7=AE=97=E5=AD=90=E6=96=B0?= =?UTF-8?q?=E5=A2=9EisShare=E5=8A=9F=E8=83=BD,=E5=9F=BA=E4=B8=8E=E6=A3=80?= =?UTF-8?q?=E8=A7=86=E6=84=8F=E8=A7=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/huawei/boostkit/spark/ColumnarPluginConfig.scala | 4 ++-- .../joins/ColumnarBroadcastNestedLoopJoinExec.scala | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index e274da689..aae1d2c86 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -303,9 +303,9 @@ object ColumnarPluginConfig { .booleanConf .createWithDefault(true) - val ENABLE_SHARE_BROADCAST_JOIN_NESTED_TABLE = buildConf("spark.omni.sql.columnar.broadcastJoin.sharenestedtable") + val ENABLE_SHARE_BROADCAST_JOIN_NESTED_TABLE = buildConf("spark.omni.sql.columnar.broadcastNestedJoin.shareBuildTable") .internal() - .doc("enable or disable share columnar BroadcastNestedLoopJoin buildtable") + .doc("enable or disable share columnar broadcastNestedJoin buildtable") .booleanConf .createWithDefault(true) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 225e54d95..672d735e7 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -208,7 +208,6 @@ case class ColumnarBroadcastNestedLoopJoinExec( val projectListIndex = getProjectListIndex(projectExprIdList, prunedStreamedOutput, prunedBuildOutput) val lookupJoinType = OmniExpressionAdaptor.toOmniJoinType(joinType) - val canShareBuildOp = (lookupJoinType != OMNI_JOIN_TYPE_RIGHT) val relation = buildPlan.executeBroadcast[ColumnarHashedRelation]() streamedPlan.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val filter: Optional[String] = condition match { @@ -255,7 +254,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( } var buildOp: OmniOperator = null var buildOpFactory: OmniNestedLoopJoinBuildOperatorFactory = null - if (enableShareBuildOp && canShareBuildOp) { + if (enableShareBuildOp) { OmniNestedLoopJoinBuildOperatorFactory.gLock.lock() try { buildOpFactory = OmniNestedLoopJoinBuildOperatorFactory.getNestedLoopJoinBuilderOperatorFactory(buildPlan.id) @@ -289,7 +288,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { lookupOp.close() lookupOpFactory.close() - if (enableShareBuildOp && canShareBuildOp) { + if (enableShareBuildOp) { OmniNestedLoopJoinBuildOperatorFactory.gLock.lock() OmniNestedLoopJoinBuildOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlan.id) OmniNestedLoopJoinBuildOperatorFactory.gLock.unlock() -- Gitee From 3640c2303eb9b0dcd9ef4a60c3b15a8a7af1a90a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Mon, 16 Dec 2024 20:08:21 +0800 Subject: [PATCH 10/10] =?UTF-8?q?=E8=B0=83=E6=95=B4buildPlan.id?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ColumnarBroadcastNestedLoopJoinExec.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 672d735e7..0780d7ce6 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -26,14 +26,7 @@ import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, isSimpleColumn, isSimpleColumnForAll} import com.huawei.boostkit.spark.util.OmniAdaptorUtil -import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{getExprIdForProjectList, getIndexArray, getProjectListIndex,pruneOutput, reorderOutputVecs, transColBatchToOmniVecs} -import nova.hetu.omniruntime.constants.JoinType._ -import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.OmniOperator -import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} -import nova.hetu.omniruntime.operator.join.{OmniNestedLoopJoinBuildOperatorFactory, OmniNestedLoopJoinLookupOperatorFactory} -import nova.hetu.omniruntime.vector.VecBatch -import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory +import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{getExprIdForProjectList, getIndexArray, getProjectListIndex, pruneOutput, reorderOutputVecs, transColBatchToOmniVecs} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -42,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec import org.apache.spark.sql.execution.{CodegenSupport, ColumnarHashedRelation, ExplainUtils, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.util.{MergeIterator, SparkMemoryUtils} @@ -217,6 +211,12 @@ case class ColumnarBroadcastNestedLoopJoinExec( case _ => Optional.empty() } + val buildPlanId = buildPlan match { + case exec: BroadcastQueryStageExec => + exec.plan.id + case _ => + buildPlan.id + } def createBuildOpFactoryAndOp(isShared: Boolean): (OmniNestedLoopJoinBuildOperatorFactory, OmniOperator) = { val startBuildCodegen = System.nanoTime() @@ -226,7 +226,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) if (isShared) { - OmniNestedLoopJoinBuildOperatorFactory.saveNestedLoopJoinBuilderOperatorAndFactory(buildPlan.id, + OmniNestedLoopJoinBuildOperatorFactory.saveNestedLoopJoinBuilderOperatorAndFactory(buildPlanId, opFactory, op) } val deserializer = VecBatchSerializerFactory.create() @@ -241,7 +241,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( } catch { case e: Exception => { if (isShared) { - OmniNestedLoopJoinBuildOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlan.id) + OmniNestedLoopJoinBuildOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlanId) } else { op.close() opFactory.close() @@ -257,7 +257,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( if (enableShareBuildOp) { OmniNestedLoopJoinBuildOperatorFactory.gLock.lock() try { - buildOpFactory = OmniNestedLoopJoinBuildOperatorFactory.getNestedLoopJoinBuilderOperatorFactory(buildPlan.id) + buildOpFactory = OmniNestedLoopJoinBuildOperatorFactory.getNestedLoopJoinBuilderOperatorFactory(buildPlanId) if (buildOpFactory == null) { val (opFactory, op) = createBuildOpFactoryAndOp(true) buildOpFactory = opFactory @@ -290,7 +290,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( lookupOpFactory.close() if (enableShareBuildOp) { OmniNestedLoopJoinBuildOperatorFactory.gLock.lock() - OmniNestedLoopJoinBuildOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlan.id) + OmniNestedLoopJoinBuildOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlanId) OmniNestedLoopJoinBuildOperatorFactory.gLock.unlock() } else { buildOp.close() -- Gitee