diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc index 24340ab8b775ac8fae65de19d6d1dd8b88fedac3..a5c515d3cb11759f7950d4e143c25b70627abe0b 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc @@ -641,7 +641,7 @@ namespace omniruntime::reader { uint64_t b0, b1, b2, b3, b4, b5, b6, b7; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -683,7 +683,7 @@ namespace omniruntime::reader { uint64_t b0, b1, b2, b3, b4, b5, b6; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -723,7 +723,7 @@ namespace omniruntime::reader { uint64_t b0, b1, b2, b3, b4, b5; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -761,7 +761,7 @@ namespace omniruntime::reader { uint64_t b0, b1, b2, b3, b4; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -797,7 +797,7 @@ namespace omniruntime::reader { uint32_t b0, b1, b2, b3; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -832,7 +832,7 @@ namespace omniruntime::reader { uint32_t b0, b1, b2; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); b2 = static_cast(*(buffer + 2)); @@ -865,7 +865,7 @@ namespace omniruntime::reader { uint16_t b0, b1; // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { b0 = static_cast(*buffer); b1 = static_cast(*(buffer + 1)); buffer += 2; @@ -895,7 +895,7 @@ namespace omniruntime::reader { bufferNum = std::min(bufferNum, static_cast(offset + len - curIdx)); // Avoid updating 'bufferStart' inside the loop. const auto* buffer = reinterpret_cast(bufferStart); - for (int i = 0; i < bufferNum; ++i) { + for (int64_t i = 0; i < bufferNum; ++i) { data[curIdx++] = *buffer++; } bufferStart = reinterpret_cast(buffer); diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h index d6661ee7d4bb5842fa5ce70a6151e41d828167e1..8a04e338c5e3b54d0bae20d8f71de7558ad9bd60 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h @@ -70,7 +70,7 @@ namespace omniruntime::reader { for (int i = num_values - 1; i >= 0; --i) { if (!nulls[i]) { idx_decode--; - std::memmove(buffer + i, buffer + idx_decode, sizeof(T)); + memmove_s(buffer + i, sizeof(T), buffer + idx_decode, sizeof(T)); } } assert(idx_decode == 0); diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp index edd7b07a656da4c14864ef5beeb5c429ca8a4632..262f1498d125c89710a734a81955c4039288b6e1 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp @@ -502,7 +502,7 @@ Status RawBytesToDecimal64Bytes(const uint8_t* bytes, int32_t length, void DefLevelsToNullsSIMD(const int16_t* def_levels, int64_t num_def_levels, const int16_t max_def_level, int64_t* values_read, int64_t* null_count, bool* nulls) { - for (int i = 0; i < num_def_levels; ++i) { + for (int64_t i = 0; i < num_def_levels; ++i) { if (def_levels[i] < max_def_level) { nulls[i] = true; (*null_count)++; diff --git a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_splitter.cpp b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_splitter.cpp index 68f4ce912ae9d853b1e9459172f5dd62a6029d95..ee6de06c431f4cbd36cc8f3c07904e8c5d90c1eb 100644 --- a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_splitter.cpp +++ b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_splitter.cpp @@ -212,7 +212,7 @@ bool OckSplitter::WriteFixedWidthValueTemple(BaseVector *vector, bool isDict, st for (uint32_t index = 0; index < rowNum; ++index) { uint32_t idIndex = rowIndexes[index]; if (UNLIKELY(idIndex >= idsNum)) { - LOG_ERROR("Invalid idIndex %d, idsNum.", idIndex, idsNum); + LOG_ERROR("Invalid idIndex %d, idsNum %d.", idIndex, idsNum); return false; } uint32_t rowIndex = reinterpret_cast(ids)[idIndex]; @@ -257,7 +257,7 @@ bool OckSplitter::WriteDecimal128(BaseVector *vector, bool isDict, std::vector= idsNum)) { - LOG_ERROR("Invalid idIndex %d, idsNum.", idIndex, idsNum); + LOG_ERROR("Invalid idIndex %d, idsNum %d.", idIndex, idsNum); return false; } uint32_t rowIndex = reinterpret_cast(ids)[idIndex]; diff --git a/omnioperator/omniop-spark-extension/.gitignore b/omnioperator/omniop-spark-extension/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..5ff6309b7199129c1afe4f4ec1906e640bec48c6 --- /dev/null +++ b/omnioperator/omniop-spark-extension/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/common/common.h b/omnioperator/omniop-spark-extension/cpp/src/common/common.h index 1578b85141ac6e9869da97ab2cb66b5359d52624..5b72ba5c79eb09ee9bfe08fe8739bee19c2cf2a4 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/common/common.h +++ b/omnioperator/omniop-spark-extension/cpp/src/common/common.h @@ -67,7 +67,7 @@ int32_t BytesGen(uint64_t offsetsAddr, std::string &nullStr, uint64_t valuesAddr } if (len != 0) { - memcpy((char *) (values + offsets[i]), addr, len); + memcpy_s((char *) (values + offsets[i]), len, addr, len); valueTotalLen += len; } } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc index 720f7ff1951b389fa230c1e074bdb6e9619f644b..01f89bc45495f88b9d26e5219fcd506b1df9dec2 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc @@ -24,6 +24,7 @@ #include #include #include +#include "securec.h" #include "zlib.h" #include "zstd.h" @@ -189,8 +190,9 @@ namespace spark { char * header = outputBuffer + outputPosition - totalCompressedSize - 3; if (totalCompressedSize >= static_cast(bufferSize)) { writeHeader(header, static_cast(bufferSize), true); - memcpy( + memcpy_s( header + 3, + static_cast(bufferSize), rawInputBuffer.data(), static_cast(bufferSize)); @@ -396,7 +398,7 @@ namespace spark { } int sizeToWrite = std::min(totalSizeToWrite, outputSize - outputPosition); - memcpy(dst, dataToWrite, static_cast(sizeToWrite)); + memcpy_s(dst, static_cast(sizeToWrite), dataToWrite, static_cast(sizeToWrite)); outputPosition += sizeToWrite; dataToWrite += sizeToWrite; diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc b/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc index b4fd9e345672ad8fdc8821f85eef9cf2323b74ad..4c4b25feb4f8938d6fec105fb9535cec26893560 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc @@ -23,6 +23,7 @@ #include #include #include +#include "securec.h" namespace spark { @@ -91,7 +92,7 @@ namespace spark { if (buf) { T* buf_old = buf; buf = reinterpret_cast(memoryPool.malloc(sizeof(T) * newCapacity)); - memcpy(buf, buf_old, sizeof(T) * currentSize); + memcpy_s(buf, sizeof(T) * currentSize, buf_old, sizeof(T) * currentSize); memoryPool.free(reinterpret_cast(buf_old)); } else { buf = reinterpret_cast(memoryPool.malloc(sizeof(T) * newCapacity)); @@ -113,7 +114,7 @@ namespace spark { void DataBuffer::resize(uint64_t newSize) { reserve(newSize); if (newSize > currentSize) { - memset(buf + currentSize, 0, newSize - currentSize); + memset_s(buf + currentSize, newSize - currentSize, 0, newSize - currentSize); } currentSize = newSize; } @@ -131,7 +132,7 @@ namespace spark { void DataBuffer::resize(uint64_t newSize) { reserve(newSize); if (newSize > currentSize) { - memset(buf + currentSize, 0, newSize - currentSize); + memset_s(buf + currentSize, newSize - currentSize, 0, newSize - currentSize); } currentSize = newSize; } diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 2970d5fabd60f00409d21129d3811bf75a889b72..ccfa024c1601311703542ed511bd7b26456f6a99 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -25,7 +25,7 @@ SplitOptions SplitOptions::Defaults() { return SplitOptions(); } // 计算分区id,每个batch初始化 int Splitter::ComputeAndCountPartitionId(VectorBatch& vb) { auto num_rows = vb.GetRowCount(); - std::memset(partition_id_cnt_cur_, 0, num_partitions_ * sizeof(int32_t)); + memset_s(partition_id_cnt_cur_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); partition_id_.resize(num_rows); if (singlePartitionFlag) { @@ -121,7 +121,7 @@ int Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t new_size) { int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { const auto num_rows = vb.GetRowCount(); for (uint col = 0; col < fixed_width_array_idx_.size(); ++col) { - std::memset(partition_buffer_idx_offset_, 0, num_partitions_ * sizeof(int32_t)); + memset_s(partition_buffer_idx_offset_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); auto col_idx_vb = fixed_width_array_idx_[col]; auto col_idx_schema = singlePartitionFlag ? col_idx_vb : (col_idx_vb - 1); const auto& dst_addrs = partition_fixed_width_value_addrs_[col]; @@ -352,7 +352,7 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ std::shared_ptr validity_buffer ( new Buffer((uint8_t *)ptr_tmp, partition_id_cnt_cur_[pid], new_size)); dst_addrs[pid] = const_cast(validity_buffer->data_); - std::memset(validity_buffer->data_, 0, new_size); + memset_s(validity_buffer->data_, new_size, 0, new_size); partition_fixed_width_buffers_[col][pid][0] = std::move(validity_buffer); fixed_nullBuffer_size_[pid] += new_size; } @@ -361,7 +361,7 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ // 计算并填充数据 auto src_addr = const_cast((uint8_t *)( reinterpret_cast(omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vb.Get(col_idx))))); - std::memset(partition_buffer_idx_offset_, 0, num_partitions_ * sizeof(int32_t)); + memset_s(partition_buffer_idx_offset_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); const auto num_rows = vb.GetRowCount(); for (auto row = 0; row < num_rows; ++row) { auto pid = partition_id_[row]; @@ -774,12 +774,14 @@ void Splitter::SerializingFixedColumns(int32_t partitionId, } if ((onceCopyLen - destCopyedLength) >= (cacheBatchSize - splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp])) { memCopyLen = cacheBatchSize - splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp]; - memcpy((uint8_t*)(ptr_value->data_) + destCopyedLength, + memcpy_s((uint8_t*)(ptr_value->data_) + destCopyedLength, + memCopyLen, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][1]->data_ + splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp], memCopyLen); // (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])) 等比例计算null数组偏移 if (partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0] != nullptr) { - memcpy((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), + memcpy_s((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), + memCopyLen / (1 << column_type_id_[colIndexTmpSchema]), partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_ + (splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] / (1 << column_type_id_[colIndexTmpSchema])), memCopyLen / (1 << column_type_id_[colIndexTmpSchema])); // 释放内存 @@ -795,13 +797,15 @@ void Splitter::SerializingFixedColumns(int32_t partitionId, splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] = 0; // 初始化下一个cacheBatch的起始偏移 } else { memCopyLen = onceCopyLen - destCopyedLength; - memcpy((uint8_t*)(ptr_value->data_) + destCopyedLength, + memcpy_s((uint8_t*)(ptr_value->data_) + destCopyedLength, + memCopyLen, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][1]->data_ + splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp], memCopyLen); // (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])) 等比例计算null数组偏移 if(partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0] != nullptr) { - memcpy((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), + memcpy_s((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), + memCopyLen / (1 << column_type_id_[colIndexTmpSchema]), partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_ + (splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] / (1 << column_type_id_[colIndexTmpSchema])), memCopyLen / (1 << column_type_id_[colIndexTmpSchema])); } @@ -910,7 +914,7 @@ int32_t Splitter::ProtoWritePartition(int32_t partition_id, std::unique_ptr(vecBatchProto->ByteSizeLong())); if (bufferStream->Next(&bufferOut, &sizeOut)) { - std::memcpy(bufferOut, &vecBatchProtoSize, sizeof(vecBatchProtoSize)); + memcpy_s(bufferOut, sizeof(vecBatchProtoSize), &vecBatchProtoSize, sizeof(vecBatchProtoSize)); if (sizeof(vecBatchProtoSize) < static_cast(sizeOut)) { bufferStream->BackUp(sizeOut - sizeof(vecBatchProtoSize)); } @@ -977,7 +981,7 @@ int32_t Splitter::ProtoWritePartitionByRow(int32_t partition_id, std::unique_ptr } uint32_t protoRowBatchSize = reversebytes_uint32t(static_cast(protoRowBatch->ByteSizeLong())); if (bufferStream->Next(&bufferOut, &sizeOut)) { - std::memcpy(bufferOut, &protoRowBatchSize, sizeof(protoRowBatchSize)); + memcpy_s(bufferOut, sizeof(protoRowBatchSize), &protoRowBatchSize, sizeof(protoRowBatchSize)); if (sizeof(protoRowBatchSize) < static_cast(sizeOut)) { bufferStream->BackUp(sizeOut - sizeof(protoRowBatchSize)); } @@ -1061,7 +1065,7 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptrSerializeToZeroCopyStream(bufferStream.get()); @@ -1133,7 +1137,7 @@ int Splitter::protoSpillPartitionByRow(int32_t partition_id, std::unique_ptrSerializeToZeroCopyStream(bufferStream.get()); @@ -1162,7 +1166,7 @@ int Splitter::WriteDataFileProto() { for (auto pid = 0; pid < num_partitions_; ++pid) { protoSpillPartition(pid, bufferStream); } - std::memset(partition_id_cnt_cache_, 0, num_partitions_ * sizeof(uint64_t)); + memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); outStream->close(); return 0; } @@ -1232,7 +1236,7 @@ void Splitter::MergeSpilled() { } } - std::memset(partition_id_cnt_cache_, 0, num_partitions_ * sizeof(uint64_t)); + memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); ReleaseVarcharVector(); num_row_splited_ = 0; cached_vectorbatch_size_ = 0; @@ -1305,7 +1309,7 @@ void Splitter::WriteSplit() { ProtoWritePartition(pid, bufferOutPutStream, bufferOut, sizeOut); } - std::memset(partition_id_cnt_cache_, 0, num_partitions_ * sizeof(uint64_t)); + memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); ReleaseVarcharVector(); num_row_splited_ = 0; cached_vectorbatch_size_ = 0; diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp index 6b824bf31448827a8ddd7189338f0af55213e30b..d3bf7f4022d9344a0873aa7d8aa5ff53cf7f5dc2 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp @@ -455,26 +455,27 @@ void Test_splitter_close(long splitter_addr) { delete splitter; } -void GetFilePath(const char *path, const char *filename, char *filepath) { - strcpy(filepath, path); +void GetFilePath(const char *path, const char *filename, char *filepath, const uint64_t filepathLen) { + strcpy_s(filepath, filepathLen, path); if(filepath[strlen(path) - 1] != '/') { - strcat(filepath, "/"); + strcat_s(filepath, filepathLen, "/"); } - strcat(filepath, filename); + strcat_s(filepath, filepathLen, filename); } void DeletePathAll(const char* path) { DIR *dir; struct dirent *dirInfo; struct stat statBuf; - char filepath[256] = {0}; + static constexpr uint32_t FILE_PATH_LEN = 256; + char filepath[FILE_PATH_LEN] = {0}; lstat(path, &statBuf); if (S_ISREG(statBuf.st_mode)) { remove(path); } else if (S_ISDIR(statBuf.st_mode)) { if ((dir = opendir(path)) != NULL) { while ((dirInfo = readdir(dir)) != NULL) { - GetFilePath(path, dirInfo->d_name, filepath); + GetFilePath(path, dirInfo->d_name, filepath, FILE_PATH_LEN); if (strcmp(dirInfo->d_name, ".") == 0 || strcmp(dirInfo->d_name, "..") == 0) { continue; } diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h index d822b5773bf265b67c37b793f1c6dab0056b88aa..512dd9d17499b39bf9a45d30086d1c21a14c4212 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h @@ -126,7 +126,7 @@ void Test_splitter_stop(long splitter_id); void Test_splitter_close(long splitter_id); -void GetFilePath(const char *path, const char *filename, char *filepath); +void GetFilePath(const char *path, const char *filename, char *filepath, const uint64_t filepathLen); void DeletePathAll(const char* path); diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index ea846f188f73b456f237c6d9d8e63141dd1c2cad..aae1d2c86acae75d37a37f27e73d6491eb16caa8 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -63,6 +63,8 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { def enableShareBroadcastJoinHashTable: Boolean = conf.getConf(ENABLE_SHARE_BROADCAST_JOIN_HASH_TABLE) + def enableShareBroadcastJoinNestedTable: Boolean = conf.getConf(ENABLE_SHARE_BROADCAST_JOIN_NESTED_TABLE) + def enableHeuristicJoinReorder: Boolean = conf.getConf(ENABLE_HEURISTIC_JOIN_REORDER) def enableDelayCartesianProduct: Boolean = conf.getConf(ENABLE_DELAY_CARTESIAN_PRODUCT) @@ -301,6 +303,12 @@ object ColumnarPluginConfig { .booleanConf .createWithDefault(true) + val ENABLE_SHARE_BROADCAST_JOIN_NESTED_TABLE = buildConf("spark.omni.sql.columnar.broadcastNestedJoin.shareBuildTable") + .internal() + .doc("enable or disable share columnar broadcastNestedJoin buildtable") + .booleanConf + .createWithDefault(true) + val ENABLE_HEURISTIC_JOIN_REORDER = buildConf("spark.sql.heuristicJoinReorder.enabled") .internal() .doc("enable or disable heuristic join reorder") diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala index ceab7e07d165567c3eea303296ce36f21acd89aa..3892d0d0cdd890bc7882f03aa75a1df57fdd59b1 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala @@ -357,9 +357,12 @@ object OmniAdaptorUtil { if (projectExprIdList.nonEmpty) { val projectOutput = ListBuffer[Attribute]() for (index <- projectExprIdList.indices) { - for (col <- output) { - if (col.exprId.equals(projectExprIdList(index))) { - projectOutput += col + breakable { + for (col <- output) { + if (col.exprId.equals(projectExprIdList(index))) { + projectOutput += col + break + } } } } @@ -373,10 +376,13 @@ object OmniAdaptorUtil { if (projectExprIdList.nonEmpty) { val indexList = ListBuffer[Int]() for (index <- projectExprIdList.indices) { - for (i <- output.indices) { - val col = output(i) - if (col.exprId.equals(projectExprIdList(index))) { - indexList += i + breakable { + for (i <- output.indices) { + val col = output(i) + if (col.exprId.equals(projectExprIdList(index))) { + indexList += i + break + } } } } diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 0ddf89b8c1c3d36b63e7bebd2f9b12e0b1a7f385..9f70e5ae9e9eb6eb35122d42e4cbea632bf32ce8 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -42,11 +42,11 @@ import org.apache.spark.util.collection.unsafe.sort.PrefixComparator * If set, will spill every 'frequency' records. * */ abstract class SortExecBase( - sortOrder: Seq[SortOrder], - global: Boolean, - child: SparkPlan, - testSpillFrequency: Int = 0) - extends UnaryExecNode with BlockingOperatorWithCodegen { + sortOrder: Seq[SortOrder], + global: Boolean, + child: SparkPlan, + testSpillFrequency: Int = 0) + extends UnaryExecNode with BlockingOperatorWithCodegen { override def output: Seq[Attribute] = child.output @@ -70,11 +70,11 @@ abstract class SortExecBase( protected val sorterClassName: String protected def newSorterInstance( - ordering: Ordering[InternalRow], - prefixComparator: PrefixComparator, - prefixComputer: PrefixComputer, - pageSize: Long, - canSortFullyWIthPrefix: Boolean): AbstractUnsafeRowSorter + ordering: Ordering[InternalRow], + prefixComparator: PrefixComparator, + prefixComputer: PrefixComputer, + pageSize: Long, + canSortFullyWIthPrefix: Boolean): AbstractUnsafeRowSorter private[sql] var rowSorter: AbstractUnsafeRowSorter = _ @@ -100,10 +100,10 @@ abstract class SortExecBase( val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix override def computePrefix(row: InternalRow): - UnsafeExternalRowSorter.PrefixComputer.Prefix = { - val prefix = prefixProjection.apply(row) - result.isNull = prefix.isNullAt(0) - result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0) + UnsafeExternalRowSorter.PrefixComputer.Prefix = { + val prefix = prefixProjection.apply(row) + result.isNull = prefix.isNullAt(0) + result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0) result } } @@ -167,9 +167,9 @@ abstract class SortExecBase( val addToSorter = ctx.freshName("addToSorter") val addToSorterFuncName = ctx.addNewFunction(addToSorter, s""" - | private void $addToSorter() throws java.io.IOException { - | ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} - | } + | private void $addToSorter() throws java.io.IOException { + | ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} + | } """.stripMargin.trim) val outputRow = ctx.freshName("outputRow") @@ -178,29 +178,29 @@ abstract class SortExecBase( val spillSizeBefore = ctx.freshName("spillSizeBefore") val sortTime = metricTerm(ctx, "sortTime") s""" - | if ($needToSort) { - | long $spillSizeBefore = $metrics.memoryBytesSpilled(); - | $addToSorterFuncName(); - | $sortedIterator = $sorterVariable.sort(); - | $sortTime.add($sorterVariable.getSortTimeNanos() / $NANOS_PER_MILLIS); - | $peakMemory.add($sorterVariable.getPeakMemoryUsage()); - | $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore); - | $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage()); - | $needToSort = false; - | } - | - | while ($limitNotReachedCond $sortedIterator.hasNext()) { - | UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next(); - | ${consume(ctx, null, outputRow)} - | if (shouldStop()) return; - | } + | if ($needToSort) { + | long $spillSizeBefore = $metrics.memoryBytesSpilled(); + | $addToSorterFuncName(); + | $sortedIterator = $sorterVariable.sort(); + | $sortTime.add($sorterVariable.getSortTimeNanos() / $NANOS_PER_MILLIS); + | $peakMemory.add($sorterVariable.getPeakMemoryUsage()); + | $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore); + | $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage()); + | $needToSort = false; + | } + | + | while ($limitNotReachedCond $sortedIterator.hasNext()) { + | UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next(); + | ${consume(ctx, null, outputRow)} + | if (shouldStop()) return; + | } """.stripMargin.trim } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { s""" - | ${row.code} - | $sorterVariable.insertRow((UnsafeRow)${row.value}); + | ${row.code} + | $sorterVariable.insertRow((UnsafeRow)${row.value}); """.stripMargin } diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index bb3430b6c631f4dbe529cd764f48ab04657e9cbb..cca636913c4c98905c9000ceb2324ec7d50599a8 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.util.{MergeIterator, SparkMemoryUtils} import org.apache.spark.sql.execution.vectorized.OmniColumnVector import org.apache.spark.sql.vectorized.ColumnarBatch - +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec /** * Performs an inner hash join of two child relations. When the output RDD of this operator is * being constructed, a Spark job is asynchronously started to calculate the values for the @@ -336,7 +336,12 @@ case class ColumnarBroadcastHashJoinExec( val projectListIndex = getProjectListIndex(projectExprIdList, prunedStreamedOutput, prunedBuildOutput) val lookupJoinType = OmniExpressionAdaptor.toOmniJoinType(joinType) val canShareBuildOp = (lookupJoinType != OMNI_JOIN_TYPE_RIGHT && lookupJoinType != OMNI_JOIN_TYPE_FULL) - + val buildPlanId = buildPlan match { + case exec: BroadcastQueryStageExec => + exec.plan.id + case _ => + buildPlan.id + } streamedPlan.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val filter: Optional[String] = condition match { case Some(expr) => @@ -356,7 +361,7 @@ case class ColumnarBroadcastHashJoinExec( buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) if (isShared) { - OmniHashBuilderWithExprOperatorFactory.saveHashBuilderOperatorAndFactory(buildPlan.id, + OmniHashBuilderWithExprOperatorFactory.saveHashBuilderOperatorAndFactory(buildPlanId, opFactory, op) } val deserializer = VecBatchSerializerFactory.create() @@ -371,7 +376,7 @@ case class ColumnarBroadcastHashJoinExec( } catch { case e: Exception => { if (isShared) { - OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlan.id) + OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlanId) } else { op.close() opFactory.close() @@ -388,7 +393,7 @@ case class ColumnarBroadcastHashJoinExec( if (enableShareBuildOp && canShareBuildOp) { OmniHashBuilderWithExprOperatorFactory.gLock.lock() try { - buildOpFactory = OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(buildPlan.id) + buildOpFactory = OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(buildPlanId) if (buildOpFactory == null) { val (opFactory, op) = createBuildOpFactoryAndOp(true) buildOpFactory = opFactory @@ -421,7 +426,7 @@ case class ColumnarBroadcastHashJoinExec( lookupOpFactory.close() if (enableShareBuildOp && canShareBuildOp) { OmniHashBuilderWithExprOperatorFactory.gLock.lock() - OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlan.id) + OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlanId) OmniHashBuilderWithExprOperatorFactory.gLock.unlock() } else { buildOp.close() diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 56814fb348dd47c0ff984deb9cbda7fd237a3155..0780d7ce6b23e043799dd8fd37226865ff43ca3e 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -26,14 +26,7 @@ import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, isSimpleColumn, isSimpleColumnForAll} import com.huawei.boostkit.spark.util.OmniAdaptorUtil -import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{getExprIdForProjectList, getIndexArray, getProjectListIndex,pruneOutput, reorderOutputVecs, transColBatchToOmniVecs} -import nova.hetu.omniruntime.constants.JoinType._ -import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.OmniOperator -import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} -import nova.hetu.omniruntime.operator.join.{OmniNestedLoopJoinBuildOperatorFactory, OmniNestedLoopJoinLookupOperatorFactory} -import nova.hetu.omniruntime.vector.VecBatch -import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory +import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{getExprIdForProjectList, getIndexArray, getProjectListIndex, pruneOutput, reorderOutputVecs, transColBatchToOmniVecs} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -42,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec import org.apache.spark.sql.execution.{CodegenSupport, ColumnarHashedRelation, ExplainUtils, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.util.{MergeIterator, SparkMemoryUtils} @@ -182,6 +176,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( } val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getSessionConf + val enableShareBuildOp: Boolean = columnarConf.enableShareBroadcastJoinNestedTable val enableJoinBatchMerge: Boolean = columnarConf.enableJoinBatchMerge val projectExprIdList = getExprIdForProjectList(projectList) @@ -216,14 +211,24 @@ case class ColumnarBroadcastNestedLoopJoinExec( case _ => Optional.empty() } + val buildPlanId = buildPlan match { + case exec: BroadcastQueryStageExec => + exec.plan.id + case _ => + buildPlan.id + } - def createBuildOpFactoryAndOp(): (OmniNestedLoopJoinBuildOperatorFactory, OmniOperator) = { + def createBuildOpFactoryAndOp(isShared: Boolean): (OmniNestedLoopJoinBuildOperatorFactory, OmniOperator) = { val startBuildCodegen = System.nanoTime() val opFactory = new OmniNestedLoopJoinBuildOperatorFactory(buildTypes, buildOutputCols) val op = opFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) + if (isShared) { + OmniNestedLoopJoinBuildOperatorFactory.saveNestedLoopJoinBuilderOperatorAndFactory(buildPlanId, + opFactory, op) + } val deserializer = VecBatchSerializerFactory.create() relation.value.buildData.foreach { input => val startBuildInput = System.nanoTime() @@ -235,9 +240,13 @@ case class ColumnarBroadcastNestedLoopJoinExec( op.getOutput } catch { case e: Exception => { - op.close() - opFactory.close() - throw new RuntimeException("NestedLoopJoinBuilder getOutput failed") + if (isShared) { + OmniNestedLoopJoinBuildOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlanId) + } else { + op.close() + opFactory.close() + } + throw new RuntimeException("Nested loop join builder getOutput failed") } } buildGetOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildGetOp) @@ -245,9 +254,27 @@ case class ColumnarBroadcastNestedLoopJoinExec( } var buildOp: OmniOperator = null var buildOpFactory: OmniNestedLoopJoinBuildOperatorFactory = null - val (opFactory, op) = createBuildOpFactoryAndOp() - buildOpFactory = opFactory - buildOp = op + if (enableShareBuildOp) { + OmniNestedLoopJoinBuildOperatorFactory.gLock.lock() + try { + buildOpFactory = OmniNestedLoopJoinBuildOperatorFactory.getNestedLoopJoinBuilderOperatorFactory(buildPlanId) + if (buildOpFactory == null) { + val (opFactory, op) = createBuildOpFactoryAndOp(true) + buildOpFactory = opFactory + buildOp = op + } + } catch { + case e: Exception => { + throw new RuntimeException("nested loop build failed. errmsg:" + e.getMessage()) + } + } finally { + OmniNestedLoopJoinBuildOperatorFactory.gLock.unlock() + } + } else { + val (opFactory, op) = createBuildOpFactoryAndOp(false) + buildOpFactory = opFactory + buildOp = op + } val startLookupCodegen = System.nanoTime() @@ -261,9 +288,14 @@ case class ColumnarBroadcastNestedLoopJoinExec( SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { lookupOp.close() lookupOpFactory.close() - buildOp.close() - buildOpFactory.close() - + if (enableShareBuildOp) { + OmniNestedLoopJoinBuildOperatorFactory.gLock.lock() + OmniNestedLoopJoinBuildOperatorFactory.dereferenceHashBuilderOperatorAndFactory(buildPlanId) + OmniNestedLoopJoinBuildOperatorFactory.gLock.unlock() + } else { + buildOp.close() + buildOpFactory.close() + } }) val resultSchema = this.schema diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptorSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptorSuite.scala index ded676538759023db54d956f4c4449ae427e76b6..03b953a77904fc2a8c60c9407cbb173d1c98cb68 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptorSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptorSuite.scala @@ -311,5 +311,13 @@ class OmniExpressionAdaptorSuite extends SparkFunSuite { checkJsonKeyValueIgnoreKeySequence(coalesceExp, coalesceResult, coalesce) } + test("test double -0.0") { + val literal = Literal(-0.0d) + val result = rewriteToOmniJsonExpressionLiteral(literal, Map.empty) + val expected = "{\"exprType\":\"LITERAL\",\"dataType\":3,\"isNull\":false,\"value\":-0.0}" + checkJsonKeyValueIgnoreKeySequence(expected, result, literal) + } + + }