diff --git a/omnioperator/omniop-spark-extension/cpp/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/CMakeLists.txt index dd0b79dbabc3b9c65ff8edf8b6c34853f51a0f63..491cfb7086037229608f2963cf6c278ca132b198 100644 --- a/omnioperator/omniop-spark-extension/cpp/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/CMakeLists.txt @@ -5,7 +5,7 @@ project(spark-thestral-plugin) cmake_minimum_required(VERSION 3.10) # configure cmake -set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_COMPILER "g++") set(root_directory ${PROJECT_BINARY_DIR}) diff --git a/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp b/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp index 2c6b9fab89ec31c7df596cc4e9b14e3f869a12b2..f33d5c4c9df9695c2464b622587dea9e3546c39c 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/common/common.cpp @@ -76,21 +76,4 @@ spark::CompressionKind GetCompressionType(const std::string& name) { int IsFileExist(const std::string path) { return !access(path.c_str(), F_OK); -} - -void ReleaseVectorBatch(omniruntime::vec::VectorBatch& vb) -{ - int tmpVectorNum = vb.GetVectorCount(); - std::set vectorBatchAddresses; - vectorBatchAddresses.clear(); - for (int vecIndex = 0; vecIndex < tmpVectorNum; ++vecIndex) { - vectorBatchAddresses.insert(vb.GetVector(vecIndex)); - } - for (Vector * tmpAddress : vectorBatchAddresses) { - if (nullptr == tmpAddress) { - throw std::runtime_error("delete nullptr error for release vectorBatch"); - } - delete tmpAddress; - } - vectorBatchAddresses.clear(); } \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/common/common.h b/omnioperator/omniop-spark-extension/cpp/src/common/common.h index fdc3b10e692e3944eeee9cf70f96ed47262a5e77..733dac920727489b205727d32300252bd32626c5 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/common/common.h +++ b/omnioperator/omniop-spark-extension/cpp/src/common/common.h @@ -45,6 +45,4 @@ spark::CompressionKind GetCompressionType(const std::string& name); int IsFileExist(const std::string path); -void ReleaseVectorBatch(omniruntime::vec::VectorBatch& vb); - #endif //CPP_COMMON_H \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp index 453c85bfe7c4709594d14630553a4351a2c6f91d..4d83eca9821a6ac3b0b91a94d898c02aa5760e01 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.cpp @@ -21,6 +21,7 @@ #include "jni_common.h" using namespace omniruntime::vec; +using namespace omniruntime::type; using namespace std; using namespace orc; @@ -348,38 +349,37 @@ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_OrcColumnarBatchJniRe template uint64_t copyFixwidth(orc::ColumnVectorBatch *field) { - VectorAllocator *allocator = omniruntime::vec::GetProcessGlobalVecAllocator(); using T = typename NativeType::type; ORC_TYPE *lvb = dynamic_cast(field); - FixedWidthVector *originalVector = new FixedWidthVector(allocator, lvb->numElements); + auto originalVector = std::make_unique>(lvb->numElements); for (uint i = 0; i < lvb->numElements; i++) { if (lvb->notNull.data()[i]) { originalVector->SetValue(i, (T)(lvb->data.data()[i])); } else { - originalVector->SetValueNull(i); + originalVector->SetNull(i); } } - return (uint64_t)originalVector; + return reinterpret_cast(originalVector.release()); } -uint64_t copyVarwidth(int maxLen, orc::ColumnVectorBatch *field, int vcType) +uint64_t copyVarwidth(orc::ColumnVectorBatch *field, int vcType) { - VectorAllocator *allocator = omniruntime::vec::GetProcessGlobalVecAllocator(); orc::StringVectorBatch *lvb = dynamic_cast(field); - VarcharVector *originalVector = new VarcharVector(allocator, lvb->numElements); + auto originalVector = std::make_unique>>(lvb->numElements); for (uint i = 0; i < lvb->numElements; i++) { if (lvb->notNull.data()[i]) { string tmpStr(reinterpret_cast(lvb->data.data()[i]), lvb->length.data()[i]); if (vcType == orc::TypeKind::CHAR && tmpStr.back() == ' ') { tmpStr.erase(tmpStr.find_last_not_of(" ") + 1); } - originalVector->SetValue(i, reinterpret_cast(tmpStr.data()), tmpStr.length()); + auto data = std::string_view(tmpStr.data(), tmpStr.length()); + originalVector->SetValue(i, data); } else { - originalVector->SetValueNull(i); + originalVector->SetNull(i); } } - return (uint64_t)originalVector; + return reinterpret_cast(originalVector.release()); } int copyToOmniVec(orc::TypeKind vcType, int &omniTypeId, uint64_t &omniVecId, orc::ColumnVectorBatch *field, ...) @@ -419,10 +419,7 @@ int copyToOmniVec(orc::TypeKind vcType, int &omniTypeId, uint64_t &omniVecId, or case orc::TypeKind::STRING: case orc::TypeKind::VARCHAR: { omniTypeId = static_cast(OMNI_VARCHAR); - va_list args; - va_start(args, field); - omniVecId = (uint64_t)copyVarwidth(va_arg(args, int), field, vcType); - va_end(args); + omniVecId = copyVarwidth(field, vcType); break; } default: { @@ -434,12 +431,10 @@ int copyToOmniVec(orc::TypeKind vcType, int &omniTypeId, uint64_t &omniVecId, or int copyToOmniDecimalVec(int precision, int &omniTypeId, uint64_t &omniVecId, orc::ColumnVectorBatch *field) { - VectorAllocator *allocator = VectorAllocator::GetGlobalAllocator(); if (precision > 18) { omniTypeId = static_cast(OMNI_DECIMAL128); orc::Decimal128VectorBatch *lvb = dynamic_cast(field); - FixedWidthVector *originalVector = - new FixedWidthVector(allocator, lvb->numElements); + auto originalVector = std::make_unique>(lvb->numElements); for (uint i = 0; i < lvb->numElements; i++) { if (lvb->notNull.data()[i]) { int64_t highbits = lvb->values.data()[i].getHighBits(); @@ -455,22 +450,22 @@ int copyToOmniDecimalVec(int precision, int &omniTypeId, uint64_t &omniVecId, or Decimal128 d128(highbits, lowbits); originalVector->SetValue(i, d128); } else { - originalVector->SetValueNull(i); + originalVector->SetNull(i); } } - omniVecId = (uint64_t)originalVector; + omniVecId = reinterpret_cast(originalVector.release()); } else { omniTypeId = static_cast(OMNI_DECIMAL64); orc::Decimal64VectorBatch *lvb = dynamic_cast(field); - FixedWidthVector *originalVector = new FixedWidthVector(allocator, lvb->numElements); + auto originalVector = std::make_unique>(lvb->numElements); for (uint i = 0; i < lvb->numElements; i++) { if (lvb->notNull.data()[i]) { originalVector->SetValue(i, (int64_t)(lvb->values.data()[i])); } else { - originalVector->SetValueNull(i); + originalVector->SetNull(i); } } - omniVecId = (uint64_t)originalVector; + omniVecId = reinterpret_cast(originalVector.release()); } return 1; } diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h index 975de176f9c99f5bb78001a3beb88db5d43d9298..f23a940c6745acede09bcef86fc57880c6aba42d 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/OrcColumnarBatchJniReader.h @@ -146,8 +146,6 @@ int copyToOmniVec(orc::TypeKind vcType, int &omniTypeId, uint64_t &omniVecId, or int copyToOmniDecimalVec(int precision, int &omniTypeId, uint64_t &omniVecId, orc::ColumnVectorBatch *field); -int copyToOmniDecimalVec(int precision, int &omniTypeId, uint64_t &omniVecId, orc::ColumnVectorBatch *field); - #ifdef __cplusplus } #endif diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index 2f75c23a770b8d40d61ec575b035f899ed22decb..9d357afb51bfc2b1352339e47ced45e651edb677 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -89,17 +89,17 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( DataTypes inputVecTypes = Deserialize(inputTypeCharPtr); const int32_t *inputVecTypeIds = inputVecTypes.GetIds(); // - std::vector inputDataTpyes = inputVecTypes.Get(); - int32_t size = inputDataTpyes.size(); + std::vector inputDataTypes = inputVecTypes.Get(); + int32_t size = inputDataTypes.size(); uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; for (int i = 0; i < size; ++i) { - if(inputDataTpyes[i]->GetId() == OMNI_DECIMAL64 || inputDataTpyes[i]->GetId() == OMNI_DECIMAL128) { - inputDataScales[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetScale(); - inputDataPrecisions[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetPrecision(); + if (inputDataTypes[i]->GetId() == OMNI_DECIMAL64 || inputDataTypes[i]->GetId() == OMNI_DECIMAL128) { + inputDataScales[i] = std::dynamic_pointer_cast(inputDataTypes[i])->GetScale(); + inputDataPrecisions[i] = std::dynamic_pointer_cast(inputDataTypes[i])->GetPrecision(); } } - inputDataTpyes.clear(); + inputDataTypes.clear(); InputDataTypes inputDataTypesTmp; inputDataTypesTmp.inputVecTypeIds = (int32_t *)inputVecTypeIds; diff --git a/omnioperator/omniop-spark-extension/cpp/src/proto/vec_data.proto b/omnioperator/omniop-spark-extension/cpp/src/proto/vec_data.proto index c40472020171692ea7b0acde2dd873efeda691f4..725f9fa070aa1f8d188d85118df9765a63d299f3 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/proto/vec_data.proto +++ b/omnioperator/omniop-spark-extension/cpp/src/proto/vec_data.proto @@ -57,4 +57,4 @@ message VecType { NANOSEC = 3; } TimeUnit timeUnit = 6; -} \ No newline at end of file +} diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 2eba4b92930591e97c1a264c40cd5e4a110ec0af..e1152c1da7adaef7315505d10b984b8673163cc4 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -37,10 +37,10 @@ int Splitter::ComputeAndCountPartitionId(VectorBatch& vb) { partition_id_[i] = 0; } } else { - IntVector* hashVct = static_cast(vb.GetVector(0)); + auto hash_vct = reinterpret_cast *>(vb.Get(0)); for (auto i = 0; i < num_rows; ++i) { // positive mod - int32_t pid = hashVct->GetValue(i); + int32_t pid = hash_vct->GetValue(i); if (pid >= num_partitions_) { LogsError(" Illegal pid Value: %d >= partition number %d .", pid, num_partitions_); throw std::runtime_error("Shuffle pidVec Illegal pid Value!"); @@ -76,7 +76,7 @@ int Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t new_size) { case SHUFFLE_8BYTE: case SHUFFLE_DECIMAL128: default: { - void *ptr_tmp = static_cast(options_.allocator->alloc(new_size * (1 << column_type_id_[i]))); + void *ptr_tmp = static_cast(options_.allocator->Alloc(new_size * (1 << column_type_id_[i]))); fixed_valueBuffer_size_[partition_id] = new_size * (1 << column_type_id_[i]); if (nullptr == ptr_tmp) { throw std::runtime_error("Allocator for AllocatePartitionBuffers Failed! "); @@ -128,15 +128,12 @@ int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { auto col_idx_vb = fixed_width_array_idx_[col]; auto col_idx_schema = singlePartitionFlag ? col_idx_vb : (col_idx_vb - 1); const auto& dst_addrs = partition_fixed_width_value_addrs_[col]; - if (vb.GetVector(col_idx_vb)->GetEncoding() == OMNI_VEC_ENCODING_DICTIONARY) { + if (vb.Get(col_idx_vb)->GetEncoding() == OMNI_DICTIONARY) { LogsDebug("Dictionary Columnar process!"); - auto ids_tmp = static_cast(options_.allocator->alloc(num_rows * sizeof(int32_t))); - Buffer *ids (new Buffer((uint8_t*)ids_tmp, 0, num_rows * sizeof(int32_t))); - if (ids->data_ == nullptr) { - throw std::runtime_error("Allocator for SplitFixedWidthValueBuffer ids Failed! "); - } - auto dictionaryTmp = ((DictionaryVector *)(vb.GetVector(col_idx_vb)))->ExtractDictionaryAndIds(0, num_rows, (int32_t *)(ids->data_)); - auto src_addr = VectorHelper::GetValuesAddr(dictionaryTmp); + + DataTypeId type_id = vector_batch_col_types_.at(col_idx_schema); + auto ids_addr = VectorHelper::UnsafeGetValues(vb.Get(col_idx_vb), type_id); + auto src_addr = reinterpret_cast(VectorHelper::UnsafeGetDictionary(vb.Get(col_idx_vb), type_id)); switch (column_type_id_[col_idx_schema]) { #define PROCESS(SHUFFLE_TYPE, CTYPE) \ case SHUFFLE_TYPE: \ @@ -145,8 +142,8 @@ int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { auto dst_offset = \ partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; \ reinterpret_cast(dst_addrs[pid])[dst_offset] = \ - reinterpret_cast(src_addr)[reinterpret_cast(ids->data_)[row]]; \ - partition_fixed_width_buffers_[col][pid][1]->size_ += (1 << SHUFFLE_TYPE); \ + reinterpret_cast(src_addr)[reinterpret_cast(ids_addr)[row]]; \ + partition_fixed_width_buffers_[col][pid][1]->size_ += (1 << SHUFFLE_TYPE); \ partition_buffer_idx_offset_[pid]++; \ } \ break; @@ -160,10 +157,12 @@ int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { auto pid = partition_id_[row]; auto dst_offset = partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; + // 前64位取值、赋值 reinterpret_cast(dst_addrs[pid])[dst_offset << 1] = - reinterpret_cast(src_addr)[reinterpret_cast(ids->data_)[row] << 1]; // 前64位取值、赋值 - reinterpret_cast(dst_addrs[pid])[dst_offset << 1 | 1] = - reinterpret_cast(src_addr)[reinterpret_cast(ids->data_)[row] << 1 | 1]; // 后64位取值、赋值 + reinterpret_cast(src_addr)[reinterpret_cast(ids_addr)[row] << 1]; + // 后64位取值、赋值 + reinterpret_cast(dst_addrs[pid])[(dst_offset << 1) | 1] = + reinterpret_cast(src_addr)[(reinterpret_cast(ids_addr)[row] << 1) | 1]; partition_fixed_width_buffers_[col][pid][1]->size_ += (1 << SHUFFLE_DECIMAL128); //decimal128 16Bytes partition_buffer_idx_offset_[pid]++; @@ -174,13 +173,9 @@ int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { throw std::runtime_error("SplitFixedWidthValueBuffer not match this type: " + column_type_id_[col_idx_schema]); } } - options_.allocator->free(ids->data_, ids->capacity_); - if (nullptr == ids) { - throw std::runtime_error("delete nullptr error for ids"); - } - delete ids; } else { - auto src_addr = VectorHelper::GetValuesAddr(vb.GetVector(col_idx_vb)); + DataTypeId type_id = vector_batch_col_types_.at(col_idx_schema); + auto src_addr = reinterpret_cast(VectorHelper::UnsafeGetValues(vb.Get(col_idx_vb), type_id)); switch (column_type_id_[col_idx_schema]) { #define PROCESS(SHUFFLE_TYPE, CTYPE) \ case SHUFFLE_TYPE: \ @@ -225,54 +220,65 @@ int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { int Splitter::SplitBinaryArray(VectorBatch& vb) { - const auto numRows = vb.GetRowCount(); - auto vecCntVb = vb.GetVectorCount(); - auto vecCntSchema = singlePartitionFlag ? vecCntVb : vecCntVb - 1; - for (auto colSchema = 0; colSchema < vecCntSchema; ++colSchema) { - switch (column_type_id_[colSchema]) { + const auto num_rows = vb.GetRowCount(); + auto vec_cnt_vb = vb.GetVectorCount(); + auto vec_cnt_schema = singlePartitionFlag ? vec_cnt_vb : vec_cnt_vb - 1; + for (auto col_schema = 0; col_schema < vec_cnt_schema; ++col_schema) { + switch (column_type_id_[col_schema]) { case SHUFFLE_BINARY: { - auto colVb = singlePartitionFlag ? colSchema : colSchema + 1; - varcharVectorCache.insert(vb.GetVector(colVb)); // record varchar vector for release - if (vb.GetVector(colVb)->GetEncoding() == OMNI_VEC_ENCODING_DICTIONARY) { - for (auto row = 0; row < numRows; ++row) { + auto col_vb = singlePartitionFlag ? col_schema : col_schema + 1; + varcharVectorCache.insert(vb.Get(col_vb)); + if (vb.Get(col_vb)->GetEncoding() == OMNI_DICTIONARY) { + auto vc = reinterpret_cast> *>( + vb.Get(col_vb)); + for (auto row = 0; row < num_rows; ++row) { auto pid = partition_id_[row]; uint8_t *dst = nullptr; - auto str_len = ((DictionaryVector *)(vb.GetVector(colVb)))->GetVarchar(row, &dst); - bool isnull = ((DictionaryVector *)(vb.GetVector(colVb)))->IsValueNull(row); + uint32_t str_len = 0; + if (!vc->IsNull(row)) { + std::string_view value = vc->GetValue(row); + dst = reinterpret_cast(reinterpret_cast(value.data())); + str_len = static_cast(value.length()); + } + bool is_null = vc->IsNull(row); cached_vectorbatch_size_ += str_len; // 累计变长部分cache数据 - VCLocation cl((uint64_t) dst, str_len, isnull); - if ((vc_partition_array_buffers_[pid][colSchema].size() != 0) && - (vc_partition_array_buffers_[pid][colSchema].back().getVcList().size() < + VCLocation cl((uint64_t) dst, str_len, is_null); + if ((vc_partition_array_buffers_[pid][col_schema].size() != 0) && + (vc_partition_array_buffers_[pid][col_schema].back().getVcList().size() < options_.spill_batch_row_num)) { - vc_partition_array_buffers_[pid][colSchema].back().getVcList().push_back(cl); - vc_partition_array_buffers_[pid][colSchema].back().vcb_total_len += str_len; + vc_partition_array_buffers_[pid][col_schema].back().getVcList().push_back(cl); + vc_partition_array_buffers_[pid][col_schema].back().vcb_total_len += str_len; } else { VCBatchInfo svc(options_.spill_batch_row_num); svc.getVcList().push_back(cl); svc.vcb_total_len += str_len; - vc_partition_array_buffers_[pid][colSchema].push_back(svc); + vc_partition_array_buffers_[pid][col_schema].push_back(svc); } } } else { - VarcharVector *vc = nullptr; - vc = static_cast(vb.GetVector(colVb)); - for (auto row = 0; row < numRows; ++row) { + auto vc = reinterpret_cast> *>(vb.Get(col_vb)); + for (auto row = 0; row < num_rows; ++row) { auto pid = partition_id_[row]; uint8_t *dst = nullptr; - int str_len = vc->GetValue(row, &dst); - bool isnull = vc->IsValueNull(row); + uint32_t str_len = 0; + if (!vc->IsNull(row)) { + std::string_view value = vc->GetValue(row); + dst = reinterpret_cast(reinterpret_cast(value.data())); + str_len = static_cast(value.length()); + } + bool is_null = vc->IsNull(row); cached_vectorbatch_size_ += str_len; // 累计变长部分cache数据 - VCLocation cl((uint64_t) dst, str_len, isnull); - if ((vc_partition_array_buffers_[pid][colSchema].size() != 0) && - (vc_partition_array_buffers_[pid][colSchema].back().getVcList().size() < + VCLocation cl((uint64_t) dst, str_len, is_null); + if ((vc_partition_array_buffers_[pid][col_schema].size() != 0) && + (vc_partition_array_buffers_[pid][col_schema].back().getVcList().size() < options_.spill_batch_row_num)) { - vc_partition_array_buffers_[pid][colSchema].back().getVcList().push_back(cl); - vc_partition_array_buffers_[pid][colSchema].back().vcb_total_len += str_len; + vc_partition_array_buffers_[pid][col_schema].back().getVcList().push_back(cl); + vc_partition_array_buffers_[pid][col_schema].back().vcb_total_len += str_len; } else { VCBatchInfo svc(options_.spill_batch_row_num); svc.getVcList().push_back(cl); svc.vcb_total_len += str_len; - vc_partition_array_buffers_[pid][colSchema].push_back(svc); + vc_partition_array_buffers_[pid][col_schema].push_back(svc); } } } @@ -297,7 +303,7 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ if (partition_id_cnt_cur_[pid] > 0 && dst_addrs[pid] == nullptr) { // init bitmap if it's null auto new_size = partition_id_cnt_cur_[pid] > options_.buffer_size ? partition_id_cnt_cur_[pid] : options_.buffer_size; - auto ptr_tmp = static_cast(options_.allocator->alloc(new_size)); + auto ptr_tmp = static_cast(options_.allocator->Alloc(new_size)); if (nullptr == ptr_tmp) { throw std::runtime_error("Allocator for ValidityBuffer Failed! "); } @@ -310,7 +316,8 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ } // 计算并填充数据 - auto src_addr = const_cast((uint8_t*)(VectorHelper::GetNullsAddr(vb.GetVector(col_idx)))); + auto src_addr = const_cast((uint8_t *)( + reinterpret_cast(omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vb.Get(col_idx))))); std::fill(std::begin(partition_buffer_idx_offset_), std::end(partition_buffer_idx_offset_), 0); const auto num_rows = vb.GetRowCount(); @@ -550,7 +557,7 @@ int Splitter::Split(VectorBatch& vb ) } std::shared_ptr Splitter::CaculateSpilledTmpFilePartitionOffsets() { - void *ptr_tmp = static_cast(options_.allocator->alloc((num_partitions_ + 1) * sizeof(uint64_t))); + void *ptr_tmp = static_cast(options_.allocator->Alloc((num_partitions_ + 1) * sizeof(uint64_t))); if (nullptr == ptr_tmp) { throw std::runtime_error("Allocator for partitionOffsets Failed! "); } @@ -606,7 +613,7 @@ spark::VecType::VecTypeId CastShuffleTypeIdToVecType(int32_t tmpType) { return spark::VecType::VEC_TYPE_CHAR; case OMNI_CONTAINER: return spark::VecType::VEC_TYPE_CONTAINER; - case OMNI_INVALID: + case DataTypeId::OMNI_INVALID: return spark::VecType::VEC_TYPE_INVALID; default: { throw std::runtime_error("castShuffleTypeIdToVecType() unexpected ShuffleTypeId"); @@ -625,9 +632,9 @@ void Splitter::SerializingFixedColumns(int32_t partitionId, colIndexTmpSchema = singlePartitionFlag ? fixed_width_array_idx_[fixColIndexTmp] : fixed_width_array_idx_[fixColIndexTmp] - 1; auto onceCopyLen = splitRowInfoTmp->onceCopyRow * (1 << column_type_id_[colIndexTmpSchema]); // 临时内存,拷贝拼接onceCopyRow批,用完释放 - void *ptr_value_tmp = static_cast(options_.allocator->alloc(onceCopyLen)); + void *ptr_value_tmp = static_cast(options_.allocator->Alloc(onceCopyLen)); std::shared_ptr ptr_value (new Buffer((uint8_t*)ptr_value_tmp, 0, onceCopyLen)); - void *ptr_validity_tmp = static_cast(options_.allocator->alloc(splitRowInfoTmp->onceCopyRow)); + void *ptr_validity_tmp = static_cast(options_.allocator->Alloc(splitRowInfoTmp->onceCopyRow)); std::shared_ptr ptr_validity (new Buffer((uint8_t*)ptr_validity_tmp, 0, splitRowInfoTmp->onceCopyRow)); if (nullptr == ptr_value->data_ || nullptr == ptr_validity->data_) { throw std::runtime_error("Allocator for tmp buffer Failed! "); @@ -659,9 +666,9 @@ void Splitter::SerializingFixedColumns(int32_t partitionId, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_ + (splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] / (1 << column_type_id_[colIndexTmpSchema])), memCopyLen / (1 << column_type_id_[colIndexTmpSchema])); // 释放内存 - options_.allocator->free(partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_, + options_.allocator->Free(partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->capacity_); - options_.allocator->free(partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][1]->data_, + options_.allocator->Free(partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][1]->data_, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][1]->capacity_); destCopyedLength += memCopyLen; splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp] += 1; // cacheBatchIndex下标后移 @@ -688,8 +695,8 @@ void Splitter::SerializingFixedColumns(int32_t partitionId, vec.set_values(ptr_value->data_, onceCopyLen); vec.set_nulls(ptr_validity->data_, splitRowInfoTmp->onceCopyRow); // 临时内存,拷贝拼接onceCopyRow批,用完释放 - options_.allocator->free(ptr_value->data_, ptr_value->capacity_); - options_.allocator->free(ptr_validity->data_, ptr_validity->capacity_); + options_.allocator->Free(ptr_value->data_, ptr_value->capacity_); + options_.allocator->Free(ptr_validity->data_, ptr_validity->capacity_); } // partition_cached_vectorbatch_[partition_id][cache_index][col][0]代表ByteMap, // partition_cached_vectorbatch_[partition_id][cache_index][col][1]代表value @@ -869,7 +876,7 @@ int Splitter::DeleteSpilledTmpFile() { for (auto &pair : spilled_tmp_files_info_) { auto tmpDataFilePath = pair.first + ".data"; // 释放存储有各个临时文件的偏移数据内存 - options_.allocator->free(pair.second->data_, pair.second->capacity_); + options_.allocator->Free(pair.second->data_, pair.second->capacity_); if (IsFileExist(tmpDataFilePath)) { remove(tmpDataFilePath.c_str()); } @@ -957,7 +964,4 @@ int Splitter::Stop() { } delete vecBatchProto; //free protobuf vecBatch memory return 0; -} - - - +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h index 0ef1989968a764eb67bb5c7aa35853e71a2fbe06..a57f868a335ebbf711b03a00329a882a82ee21f0 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h @@ -41,7 +41,6 @@ using namespace spark; using namespace google::protobuf::io; using namespace omniruntime::vec; using namespace omniruntime::type; -using namespace omniruntime::mem; struct SplitRowInfo { uint32_t copyedRow = 0; @@ -137,7 +136,7 @@ class Splitter { private: void ReleaseVarcharVector() { - std::set::iterator it; + std::set::iterator it; for (it = varcharVectorCache.begin(); it != varcharVectorCache.end(); it++) { delete *it; } @@ -147,9 +146,9 @@ private: void ReleaseVectorBatch(VectorBatch *vb) { int vectorCnt = vb->GetVectorCount(); - std::set vectorAddress; // vector deduplication + std::set vectorAddress; // vector deduplication for (int vecIndex = 0; vecIndex < vectorCnt; vecIndex++) { - Vector *vector = vb->GetVector(vecIndex); + BaseVector *vector = vb->Get(vecIndex); // not varchar vector can be released; if (varcharVectorCache.find(vector) == varcharVectorCache.end() && vectorAddress.find(vector) == vectorAddress.end()) { @@ -161,7 +160,7 @@ private: delete vb; } - std::set varcharVectorCache; + std::set varcharVectorCache; bool first_vector_batch_ = false; std::vector vector_batch_col_types_; InputDataTypes input_col_types; @@ -176,7 +175,7 @@ public: std::map> spilled_tmp_files_info_; - VecBatch *vecBatchProto = new VecBatch(); //protobuf 序列化对象结构 + spark::VecBatch *vecBatchProto = new VecBatch(); // protobuf 序列化对象结构 virtual int Split_Init(); diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/type.h b/omnioperator/omniop-spark-extension/cpp/src/shuffle/type.h index 446cedc5f89988f115aedb7d9b3bc9b7c1c0a177..04d90130dea30a83651fff3526c08dc0992f9928 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/type.h +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/type.h @@ -40,7 +40,7 @@ struct SplitOptions { int64_t thread_id = -1; int64_t task_attempt_id = -1; - BaseAllocator *allocator = omniruntime::mem::GetProcessRootAllocator(); + Allocator *allocator = Allocator::GetAllocator(); uint64_t spill_batch_row_num = 4096; // default value uint64_t spill_mem_threshold = 1024 * 1024 * 1024; // default value diff --git a/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp b/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp index 1834345d54466d8e65f34eaea4ba2c99396440e0..c7a55759558e0713f3c3f265c052e5fcce94aa1c 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp @@ -242,7 +242,7 @@ TEST_F (ShuffleTest, Split_Short_10WRows) { 0, tmpTestingDir); for (uint64_t j = 0; j < 100; j++) { - VectorBatch* vb = CreateVectorBatch_1FixCol_withPid(partitionNum, 1000, OMNI_SHORT); + VectorBatch* vb = CreateVectorBatch_1FixCol_withPid(partitionNum, 1000, ShortType()); Test_splitter_split(splitterId, vb); } Test_splitter_stop(splitterId); @@ -270,7 +270,7 @@ TEST_F (ShuffleTest, Split_Boolean_10WRows) { 0, tmpTestingDir); for (uint64_t j = 0; j < 100; j++) { - VectorBatch* vb = CreateVectorBatch_1FixCol_withPid(partitionNum, 1000, OMNI_BOOLEAN); + VectorBatch* vb = CreateVectorBatch_1FixCol_withPid(partitionNum, 1000, BooleanType()); Test_splitter_split(splitterId, vb); } Test_splitter_stop(splitterId); @@ -298,7 +298,7 @@ TEST_F (ShuffleTest, Split_Long_100WRows) { 0, tmpTestingDir); for (uint64_t j = 0; j < 100; j++) { - VectorBatch* vb = CreateVectorBatch_1FixCol_withPid(partitionNum, 10000, OMNI_LONG); + VectorBatch* vb = CreateVectorBatch_1FixCol_withPid(partitionNum, 10000, LongType()); Test_splitter_split(splitterId, vb); } Test_splitter_stop(splitterId); diff --git a/omnioperator/omniop-spark-extension/cpp/test/tablescan/scan_test.cpp b/omnioperator/omniop-spark-extension/cpp/test/tablescan/scan_test.cpp index f8a6a6b7f2776f212d7ba6b1c9ee8d9260509116..bd552e817b4fd44df9340e35d91b329da5fd043f 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/tablescan/scan_test.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/tablescan/scan_test.cpp @@ -158,7 +158,7 @@ TEST_F(ScanTest, test_copy_intVec) // int type copyToOmniVec(orc::TypeKind::INT, omniType, omniVecId, root->fields[0]); ASSERT_EQ(omniType, omniruntime::type::OMNI_INT); - omniruntime::vec::IntVector *olbInt = (omniruntime::vec::IntVector *)(omniVecId); + auto *olbInt = (omniruntime::vec::Vector *)(omniVecId); ASSERT_EQ(olbInt->GetValue(0), 10); delete olbInt; } @@ -170,10 +170,9 @@ TEST_F(ScanTest, test_copy_varCharVec) // varchar type copyToOmniVec(orc::TypeKind::VARCHAR, omniType, omniVecId, root->fields[1], 60); ASSERT_EQ(omniType, omniruntime::type::OMNI_VARCHAR); - uint8_t *actualChar = nullptr; - omniruntime::vec::VarcharVector *olbVc = (omniruntime::vec::VarcharVector *)(omniVecId); - int len = olbVc->GetValue(0, &actualChar); - std::string actualStr(reinterpret_cast(actualChar), 0, len); + auto *olbVc = (omniruntime::vec::Vector> *)( + omniVecId); + std::string_view actualStr = olbVc->GetValue(0); ASSERT_EQ(actualStr, "varchar_1"); delete olbVc; } @@ -182,14 +181,13 @@ TEST_F(ScanTest, test_copy_stringVec) { int omniType = 0; uint64_t omniVecId = 0; - uint8_t *actualChar = nullptr; // string type copyToOmniVec(orc::TypeKind::STRING, omniType, omniVecId, root->fields[2]); ASSERT_EQ(omniType, omniruntime::type::OMNI_VARCHAR); - omniruntime::vec::VarcharVector *olbStr = (omniruntime::vec::VarcharVector *)(omniVecId); - int len = olbStr->GetValue(0, &actualChar); - std::string actualStr2(reinterpret_cast(actualChar), 0, len); - ASSERT_EQ(actualStr2, "string_type_1"); + auto *olbStr = (omniruntime::vec::Vector> *)( + omniVecId); + std::string_view actualStr = olbStr->GetValue(0); + ASSERT_EQ(actualStr, "string_type_1"); delete olbStr; } @@ -200,7 +198,7 @@ TEST_F(ScanTest, test_copy_longVec) // bigint type copyToOmniVec(orc::TypeKind::LONG, omniType, omniVecId, root->fields[3]); ASSERT_EQ(omniType, omniruntime::type::OMNI_LONG); - omniruntime::vec::LongVector *olbLong = (omniruntime::vec::LongVector *)(omniVecId); + auto *olbLong = (omniruntime::vec::Vector *)(omniVecId); ASSERT_EQ(olbLong->GetValue(0), 10000); delete olbLong; } @@ -209,15 +207,14 @@ TEST_F(ScanTest, test_copy_charVec) { int omniType = 0; uint64_t omniVecId = 0; - uint8_t *actualChar = nullptr; // char type copyToOmniVec(orc::TypeKind::CHAR, omniType, omniVecId, root->fields[4], 40); ASSERT_EQ(omniType, omniruntime::type::OMNI_VARCHAR); - omniruntime::vec::VarcharVector *olbChar40 = (omniruntime::vec::VarcharVector *)(omniVecId); - int len = olbChar40->GetValue(0, &actualChar); - std::string actualStr3(reinterpret_cast(actualChar), 0, len); - ASSERT_EQ(actualStr3, "char_1"); - delete olbChar40; + auto *olbChar = (omniruntime::vec::Vector> *)( + omniVecId); + std::string_view actualStr = olbChar->GetValue(0); + ASSERT_EQ(actualStr, "char_1"); + delete olbChar; } TEST_F(ScanTest, test_copy_doubleVec) @@ -227,7 +224,7 @@ TEST_F(ScanTest, test_copy_doubleVec) // double type copyToOmniVec(orc::TypeKind::DOUBLE, omniType, omniVecId, root->fields[6]); ASSERT_EQ(omniType, omniruntime::type::OMNI_DOUBLE); - omniruntime::vec::DoubleVector *olbDouble = (omniruntime::vec::DoubleVector *)(omniVecId); + auto *olbDouble = (omniruntime::vec::Vector *)(omniVecId); ASSERT_EQ(olbDouble->GetValue(0), 1111.1111); delete olbDouble; } @@ -239,7 +236,7 @@ TEST_F(ScanTest, test_copy_booleanVec) // boolean type copyToOmniVec(orc::TypeKind::BOOLEAN, omniType, omniVecId, root->fields[9]); ASSERT_EQ(omniType, omniruntime::type::OMNI_BOOLEAN); - omniruntime::vec::BooleanVector *olbBoolean = (omniruntime::vec::BooleanVector *)(omniVecId); + auto *olbBoolean = (omniruntime::vec::Vector *)(omniVecId); ASSERT_EQ(olbBoolean->GetValue(0), true); delete olbBoolean; } @@ -251,7 +248,7 @@ TEST_F(ScanTest, test_copy_shortVec) // short type copyToOmniVec(orc::TypeKind::SHORT, omniType, omniVecId, root->fields[10]); ASSERT_EQ(omniType, omniruntime::type::OMNI_SHORT); - omniruntime::vec::ShortVector *olbShort = (omniruntime::vec::ShortVector *)(omniVecId); + auto *olbShort = (omniruntime::vec::Vector *)(omniVecId); ASSERT_EQ(olbShort->GetValue(0), 11); delete olbShort; } diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp index 586f4bbdb95721b22422d715f645eb502dc1a894..d70a62003645893af12df8f8980c9195bbd6d389 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp @@ -21,199 +21,33 @@ using namespace omniruntime::vec; -void ToVectorTypes(const int32_t *dataTypeIds, int32_t dataTypeCount, std::vector &dataTypes) -{ - for (int i = 0; i < dataTypeCount; ++i) { - if (dataTypeIds[i] == OMNI_VARCHAR) { - dataTypes.push_back(std::make_shared(50)); - continue; - } else if (dataTypeIds[i] == OMNI_CHAR) { - dataTypes.push_back(std::make_shared(50)); - continue; - } - dataTypes.push_back(std::make_shared(dataTypeIds[i])); - } -} - -VectorBatch* CreateInputData(const int32_t numRows, - const int32_t numCols, - int32_t* inputTypeIds, - int64_t* allData) -{ - auto *vecBatch = new VectorBatch(numCols, numRows); - vector inputTypes; - ToVectorTypes(inputTypeIds, numCols, inputTypes); - vecBatch->NewVectors(omniruntime::vec::GetProcessGlobalVecAllocator(), inputTypes); - for (int i = 0; i < numCols; ++i) { - switch (inputTypeIds[i]) { - case OMNI_BOOLEAN: - ((BooleanVector *)vecBatch->GetVector(i))->SetValues(0, (int32_t *)allData[i], numRows); - break; - case OMNI_INT: - ((IntVector *)vecBatch->GetVector(i))->SetValues(0, (int32_t *)allData[i], numRows); - break; - case OMNI_LONG: - ((LongVector *)vecBatch->GetVector(i))->SetValues(0, (int64_t *)allData[i], numRows); - break; - case OMNI_DOUBLE: - ((DoubleVector *)vecBatch->GetVector(i))->SetValues(0, (double *)allData[i], numRows); - break; - case OMNI_SHORT: - ((ShortVector *)vecBatch->GetVector(i))->SetValues(0, (int16_t *)allData[i], numRows); - break; - case OMNI_VARCHAR: - case OMNI_CHAR: { - for (int j = 0; j < numRows; ++j) { - int64_t addr = (reinterpret_cast(allData[i]))[j]; - std::string s (reinterpret_cast(addr)); - ((VarcharVector *)vecBatch->GetVector(i))->SetValue(j, (uint8_t *)(s.c_str()), s.length()); - } - break; - } - case OMNI_DECIMAL128: - ((Decimal128Vector *)vecBatch->GetVector(i))->SetValues(0, (int64_t *) allData[i], numRows); - break; - default:{ - LogError("No such data type %d", inputTypeIds[i]); - } - } - } - return vecBatch; -} - -VarcharVector *CreateVarcharVector(VarcharDataType type, std::string *values, int32_t length) -{ - VectorAllocator * vecAllocator = omniruntime::vec::GetProcessGlobalVecAllocator(); - uint32_t width = type.GetWidth(); - VarcharVector *vector = std::make_unique(vecAllocator, length * width, length).release(); - for (int32_t i = 0; i < length; i++) { - vector->SetValue(i, reinterpret_cast(values[i].c_str()), values[i].length()); - } - return vector; -} - -Decimal128Vector *CreateDecimal128Vector(Decimal128 *values, int32_t length) -{ - VectorAllocator *vecAllocator = omniruntime::vec::GetProcessGlobalVecAllocator(); - Decimal128Vector *vector = std::make_unique(vecAllocator, length).release(); - for (int32_t i = 0; i < length; i++) { - vector->SetValue(i, values[i]); - } - return vector; -} - -Vector *CreateVector(DataType &vecType, int32_t rowCount, va_list &args) -{ - switch (vecType.GetId()) { - case OMNI_INT: - case OMNI_DATE32: - return CreateVector(va_arg(args, int32_t *), rowCount); - case OMNI_LONG: - case OMNI_DECIMAL64: - return CreateVector(va_arg(args, int64_t *), rowCount); - case OMNI_DOUBLE: - return CreateVector(va_arg(args, double *), rowCount); - case OMNI_BOOLEAN: - return CreateVector(va_arg(args, bool *), rowCount); - case OMNI_VARCHAR: - case OMNI_CHAR: - return CreateVarcharVector(static_cast(vecType), va_arg(args, std::string *), rowCount); - case OMNI_DECIMAL128: - return CreateDecimal128Vector(va_arg(args, Decimal128 *), rowCount); - default: - std::cerr << "Unsupported type : " << vecType.GetId() << std::endl; - return nullptr; - } -} - -DictionaryVector *CreateDictionaryVector(DataType &dataType, int32_t rowCount, int32_t *ids, int32_t idsCount, ...) +VectorBatch *CreateVectorBatch(const DataTypes &types, int32_t rowCount, ...) { + int32_t typesCount = types.GetSize(); + auto *vectorBatch = new VectorBatch(rowCount); va_list args; - va_start(args, idsCount); - Vector *dictionary = CreateVector(dataType, rowCount, args); + va_start(args, rowCount); + for (int32_t i = 0; i < typesCount; i++) { + DataTypePtr type = types.GetType(i); + vectorBatch->Append(CreateVector(*type, rowCount, args).release()); + } va_end(args); - auto vec = new DictionaryVector(dictionary, ids, idsCount); - delete dictionary; - return vec; + return vectorBatch; } -Vector *buildVector(const DataType &aggType, int32_t rowNumber) +std::unique_ptr CreateVector(DataType &dataType, int32_t rowCount, va_list &args) { - VectorAllocator *vecAllocator = omniruntime::vec::GetProcessGlobalVecAllocator(); - switch (aggType.GetId()) { - case OMNI_NONE: { - LongVector *col = new LongVector(vecAllocator, rowNumber); - for (int32_t j = 0; j < rowNumber; ++j) { - col->SetValueNull(j); - } - return col; - } - case OMNI_INT: - case OMNI_DATE32: { - IntVector *col = new IntVector(vecAllocator, rowNumber); - for (int32_t j = 0; j < rowNumber; ++j) { - col->SetValue(j, 1); - } - return col; - } - case OMNI_LONG: - case OMNI_DECIMAL64: { - LongVector *col = new LongVector(vecAllocator, rowNumber); - for (int32_t j = 0; j < rowNumber; ++j) { - col->SetValue(j, 1); - } - return col; - } - case OMNI_DOUBLE: { - DoubleVector *col = new DoubleVector(vecAllocator, rowNumber); - for (int32_t j = 0; j < rowNumber; ++j) { - col->SetValue(j, 1); - } - return col; - } - case OMNI_BOOLEAN: { - BooleanVector *col = new BooleanVector(vecAllocator, rowNumber); - for (int32_t j = 0; j < rowNumber; ++j) { - col->SetValue(j, 1); - } - return col; - } - case OMNI_DECIMAL128: { - Decimal128Vector *col = new Decimal128Vector(vecAllocator, rowNumber); - for (int32_t j = 0; j < rowNumber; ++j) { - col->SetValue(j, Decimal128(0, 1)); - } - return col; - } - case OMNI_VARCHAR: - case OMNI_CHAR: { - VarcharDataType charType = (VarcharDataType &)aggType; - VarcharVector *col = new VarcharVector(vecAllocator, charType.GetWidth() * rowNumber, rowNumber); - for (int32_t j = 0; j < rowNumber; ++j) { - std::string str = std::to_string(j); - col->SetValue(j, reinterpret_cast(str.c_str()), str.size()); - } - return col; - } - default: { - LogError("No such %d type support", aggType.GetId()); - return nullptr; - } - } + return DYNAMIC_TYPE_DISPATCH(CreateFlatVector, dataType.GetId(), rowCount, args); } -VectorBatch *CreateVectorBatch(const DataTypes &types, int32_t rowCount, ...) +std::unique_ptr CreateDictionaryVector(DataType &dataType, int32_t rowCount, int32_t *ids, int32_t idsCount, + ...) { - int32_t typesCount = types.GetSize(); - auto *vectorBatch = new VectorBatch(typesCount, rowCount); va_list args; - va_start(args, rowCount); - for (int32_t i = 0; i < typesCount; i++) { - DataTypePtr type = types.GetType(i); - vectorBatch->SetVector(i, CreateVector(*type, rowCount, args)); - } + va_start(args, idsCount); + std::unique_ptr dictionary = CreateVector(dataType, rowCount, args); va_end(args); - return vectorBatch; + return DYNAMIC_TYPE_DISPATCH(CreateDictionary, dataType.GetId(), dictionary.get(), ids, idsCount); } /** @@ -225,24 +59,16 @@ VectorBatch *CreateVectorBatch(const DataTypes &types, int32_t rowCount, ...) */ VectorBatch* CreateVectorBatch_1row_varchar_withPid(int pid, std::string inputString) { // gen vectorBatch - const int32_t numCols = 2; - int32_t* inputTypes = new int32_t[numCols]; - inputTypes[0] = OMNI_INT; - inputTypes[1] = OMNI_VARCHAR; + DataTypes inputTypes(std::vector({ IntType(), VarcharType() })); const int32_t numRows = 1; auto* col1 = new int32_t[numRows]; col1[0] = pid; - auto* col2 = new int64_t[numRows]; - std::string* strTmp = new std::string(inputString); - col2[0] = (int64_t)(strTmp->c_str()); + auto* col2 = new std::string[numRows]; + col2[0] = std::move(inputString); - int64_t allData[numCols] = {reinterpret_cast(col1), - reinterpret_cast(col2)}; - VectorBatch* in = CreateInputData(numRows, numCols, inputTypes, allData); - delete[] inputTypes; + VectorBatch* in = CreateVectorBatch(inputTypes, numRows, col1, col2); delete[] col1; delete[] col2; - delete strTmp; return in; } @@ -255,224 +81,144 @@ VectorBatch* CreateVectorBatch_1row_varchar_withPid(int pid, std::string inputSt */ VectorBatch* CreateVectorBatch_4col_withPid(int parNum, int rowNum) { int partitionNum = parNum; - const int32_t numCols = 5; - int32_t* inputTypes = new int32_t[numCols]; - inputTypes[0] = OMNI_INT; - inputTypes[1] = OMNI_INT; - inputTypes[2] = OMNI_LONG; - inputTypes[3] = OMNI_DOUBLE; - inputTypes[4] = OMNI_VARCHAR; + DataTypes inputTypes(std::vector({ IntType(), IntType(), LongType(), DoubleType(), VarcharType() })); const int32_t numRows = rowNum; auto* col0 = new int32_t[numRows]; auto* col1 = new int32_t[numRows]; auto* col2 = new int64_t[numRows]; auto* col3 = new double[numRows]; - auto* col4 = new int64_t[numRows]; - string startStr = "_START_"; - string endStr = "_END_"; + auto* col4 = new std::string[numRows]; + std::string startStr = "_START_"; + std::string endStr = "_END_"; std::vector string_cache_test_; for (int i = 0; i < numRows; i++) { - col0[i] = (i+1) % partitionNum; + col0[i] = (i + 1) % partitionNum; col1[i] = i + 1; col2[i] = i + 1; col3[i] = i + 1; - std::string* strTmp = new std::string(startStr + to_string(i + 1) + endStr); - string_cache_test_.push_back(strTmp); - col4[i] = (int64_t)((*strTmp).c_str()); + std::string strTmp = std::string(startStr + to_string(i + 1) + endStr); + col4[i] = std::move(strTmp); } - int64_t allData[numCols] = {reinterpret_cast(col0), - reinterpret_cast(col1), - reinterpret_cast(col2), - reinterpret_cast(col3), - reinterpret_cast(col4)}; - VectorBatch* in = CreateInputData(numRows, numCols, inputTypes, allData); - delete[] inputTypes; + VectorBatch* in = CreateVectorBatch(inputTypes, numRows, col0, col1, col2, col3, col4); delete[] col0; delete[] col1; delete[] col2; delete[] col3; delete[] col4; - - for (uint p = 0; p < string_cache_test_.size(); p++) { - delete string_cache_test_[p]; // release memory - } return in; } -VectorBatch* CreateVectorBatch_1FixCol_withPid(int parNum, int rowNum, int32_t fixColType) { +VectorBatch* CreateVectorBatch_1FixCol_withPid(int parNum, int rowNum, DataTypePtr fixColType) { int partitionNum = parNum; - const int32_t numCols = 2; - int32_t* inputTypes = new int32_t[numCols]; - inputTypes[0] = OMNI_INT; - inputTypes[1] = fixColType; + DataTypes inputTypes(std::vector({ IntType(), std::move(fixColType) })); const int32_t numRows = rowNum; auto* col0 = new int32_t[numRows]; auto* col1 = new int64_t[numRows]; for (int i = 0; i < numRows; i++) { - col0[i] = (i+1) % partitionNum; + col0[i] = (i + 1) % partitionNum; col1[i] = i + 1; } - int64_t allData[numCols] = {reinterpret_cast(col0), - reinterpret_cast(col1)}; - VectorBatch* in = CreateInputData(numRows, numCols, inputTypes, allData); - delete[] inputTypes; + VectorBatch* in = CreateVectorBatch(inputTypes, numRows, col0, col1); delete[] col0; delete[] col1; return in; } VectorBatch* CreateVectorBatch_2column_1row_withPid(int pid, std::string strVar, int intVar) { - const int32_t numCols = 3; - int32_t* inputTypes = new int32_t[numCols]; - inputTypes[0] = OMNI_INT; - inputTypes[1] = OMNI_VARCHAR; - inputTypes[2] = OMNI_INT; + DataTypes inputTypes(std::vector({ IntType(), VarcharType(), IntType() })); const int32_t numRows = 1; auto* col0 = new int32_t[numRows]; - auto* col1 = new int64_t[numRows]; + auto* col1 = new std::string[numRows]; auto* col2 = new int32_t[numRows]; col0[0] = pid; - std::string* strTmp = new std::string(strVar); - col1[0] = (int64_t)(strTmp->c_str()); + col1[0] = std::move(strVar); col2[0] = intVar; - int64_t allData[numCols] = {reinterpret_cast(col0), - reinterpret_cast(col1), - reinterpret_cast(col2)}; - VectorBatch* in = CreateInputData(numRows, numCols, inputTypes, allData); - delete[] inputTypes; + VectorBatch* in = CreateVectorBatch(inputTypes, numRows, col0, col1, col2); delete[] col0; delete[] col1; delete[] col2; - delete strTmp; return in; } VectorBatch* CreateVectorBatch_4varcharCols_withPid(int parNum, int rowNum) { int partitionNum = parNum; - const int32_t numCols = 5; - int32_t* inputTypes = new int32_t[numCols]; - inputTypes[0] = OMNI_INT; - inputTypes[1] = OMNI_VARCHAR; - inputTypes[2] = OMNI_VARCHAR; - inputTypes[3] = OMNI_VARCHAR; - inputTypes[4] = OMNI_VARCHAR; + DataTypes inputTypes( + std::vector({ IntType(), VarcharType(), VarcharType(), VarcharType(), VarcharType() })); const int32_t numRows = rowNum; auto* col0 = new int32_t[numRows]; - auto* col1 = new int64_t[numRows]; - auto* col2 = new int64_t[numRows]; - auto* col3 = new int64_t[numRows]; - auto* col4 = new int64_t[numRows]; + auto* col1 = new std::string[numRows]; + auto* col2 = new std::string[numRows]; + auto* col3 = new std::string[numRows]; + auto* col4 = new std::string[numRows]; - std::vector string_cache_test_; for (int i = 0; i < numRows; i++) { - col0[i] = (i+1) % partitionNum; - std::string* strTmp1 = new std::string("Col1_START_" + to_string(i + 1) + "_END_"); - col1[i] = (int64_t)((*strTmp1).c_str()); - std::string* strTmp2 = new std::string("Col2_START_" + to_string(i + 1) + "_END_"); - col2[i] = (int64_t)((*strTmp2).c_str()); - std::string* strTmp3 = new std::string("Col3_START_" + to_string(i + 1) + "_END_"); - col3[i] = (int64_t)((*strTmp3).c_str()); - std::string* strTmp4 = new std::string("Col4_START_" + to_string(i + 1) + "_END_"); - col4[i] = (int64_t)((*strTmp4).c_str()); - string_cache_test_.push_back(strTmp1); - string_cache_test_.push_back(strTmp2); - string_cache_test_.push_back(strTmp3); - string_cache_test_.push_back(strTmp4); + col0[i] = (i + 1) % partitionNum; + std::string strTmp1 = std::string("Col1_START_" + to_string(i + 1) + "_END_"); + col1[i] = std::move(strTmp1); + std::string strTmp2 = std::string("Col2_START_" + to_string(i + 1) + "_END_"); + col2[i] = std::move(strTmp2); + std::string strTmp3 = std::string("Col3_START_" + to_string(i + 1) + "_END_"); + col3[i] = std::move(strTmp3); + std::string strTmp4 = std::string("Col4_START_" + to_string(i + 1) + "_END_"); + col4[i] = std::move(strTmp4); } - int64_t allData[numCols] = {reinterpret_cast(col0), - reinterpret_cast(col1), - reinterpret_cast(col2), - reinterpret_cast(col3), - reinterpret_cast(col4)}; - VectorBatch* in = CreateInputData(numRows, numCols, inputTypes, allData); - delete[] inputTypes; + VectorBatch* in = CreateVectorBatch(inputTypes, numRows, col0, col1, col2, col3, col4); delete[] col0; delete[] col1; delete[] col2; delete[] col3; delete[] col4; - - for (uint p = 0; p < string_cache_test_.size(); p++) { - delete string_cache_test_[p]; // release memory - } return in; } VectorBatch* CreateVectorBatch_4charCols_withPid(int parNum, int rowNum) { int partitionNum = parNum; - const int32_t numCols = 5; - int32_t* inputTypes = new int32_t[numCols]; - inputTypes[0] = OMNI_INT; - inputTypes[1] = OMNI_CHAR; - inputTypes[2] = OMNI_CHAR; - inputTypes[3] = OMNI_CHAR; - inputTypes[4] = OMNI_CHAR; + DataTypes inputTypes(std::vector({ IntType(), CharType(), CharType(), CharType(), CharType() })); const int32_t numRows = rowNum; auto* col0 = new int32_t[numRows]; - auto* col1 = new int64_t[numRows]; - auto* col2 = new int64_t[numRows]; - auto* col3 = new int64_t[numRows]; - auto* col4 = new int64_t[numRows]; + auto* col1 = new std::string[numRows]; + auto* col2 = new std::string[numRows]; + auto* col3 = new std::string[numRows]; + auto* col4 = new std::string[numRows]; std::vector string_cache_test_; for (int i = 0; i < numRows; i++) { - col0[i] = (i+1) % partitionNum; - std::string* strTmp1 = new std::string("Col1_CHAR_" + to_string(i + 1) + "_END_"); - col1[i] = (int64_t)((*strTmp1).c_str()); - std::string* strTmp2 = new std::string("Col2_CHAR_" + to_string(i + 1) + "_END_"); - col2[i] = (int64_t)((*strTmp2).c_str()); - std::string* strTmp3 = new std::string("Col3_CHAR_" + to_string(i + 1) + "_END_"); - col3[i] = (int64_t)((*strTmp3).c_str()); - std::string* strTmp4 = new std::string("Col4_CHAR_" + to_string(i + 1) + "_END_"); - col4[i] = (int64_t)((*strTmp4).c_str()); - string_cache_test_.push_back(strTmp1); - string_cache_test_.push_back(strTmp2); - string_cache_test_.push_back(strTmp3); - string_cache_test_.push_back(strTmp4); + col0[i] = (i + 1) % partitionNum; + std::string strTmp1 = std::string("Col1_CHAR_" + to_string(i + 1) + "_END_"); + col1[i] = std::move(strTmp1); + std::string strTmp2 = std::string("Col2_CHAR_" + to_string(i + 1) + "_END_"); + col2[i] = std::move(strTmp2); + std::string strTmp3 = std::string("Col3_CHAR_" + to_string(i + 1) + "_END_"); + col3[i] = std::move(strTmp3); + std::string strTmp4 = std::string("Col4_CHAR_" + to_string(i + 1) + "_END_"); + col4[i] = std::move(strTmp4); } - int64_t allData[numCols] = {reinterpret_cast(col0), - reinterpret_cast(col1), - reinterpret_cast(col2), - reinterpret_cast(col3), - reinterpret_cast(col4)}; - VectorBatch* in = CreateInputData(numRows, numCols, inputTypes, allData); - delete[] inputTypes; + VectorBatch* in = CreateVectorBatch(inputTypes, numRows, col0, col1, col2, col3, col4); delete[] col0; delete[] col1; delete[] col2; delete[] col3; delete[] col4; - - for (uint p = 0; p < string_cache_test_.size(); p++) { - delete string_cache_test_[p]; // release memory - } return in; } VectorBatch* CreateVectorBatch_5fixedCols_withPid(int parNum, int rowNum) { int partitionNum = parNum; - // gen vectorBatch - const int32_t numCols = 6; - int32_t* inputTypes = new int32_t[numCols]; - inputTypes[0] = OMNI_INT; - inputTypes[1] = OMNI_BOOLEAN; - inputTypes[2] = OMNI_SHORT; - inputTypes[3] = OMNI_INT; - inputTypes[4] = OMNI_LONG; - inputTypes[5] = OMNI_DOUBLE; + DataTypes inputTypes( + std::vector({ IntType(), BooleanType(), ShortType(), IntType(), LongType(), DoubleType() })); const int32_t numRows = rowNum; auto* col0 = new int32_t[numRows]; @@ -490,14 +236,7 @@ VectorBatch* CreateVectorBatch_5fixedCols_withPid(int parNum, int rowNum) { col5[i] = i + 1; } - int64_t allData[numCols] = {reinterpret_cast(col0), - reinterpret_cast(col1), - reinterpret_cast(col2), - reinterpret_cast(col3), - reinterpret_cast(col4), - reinterpret_cast(col5)}; - VectorBatch* in = CreateInputData(numRows, numCols, inputTypes, allData); - delete[] inputTypes; + VectorBatch* in = CreateVectorBatch(inputTypes, numRows, col0, col1, col2, col3, col4, col5); delete[] col0; delete[] col1; delete[] col2; @@ -512,71 +251,85 @@ VectorBatch* CreateVectorBatch_2dictionaryCols_withPid(int partitionNum) { // construct input data const int32_t dataSize = 6; // prepare data - int32_t data0[dataSize] = {111, 112, 113, 114, 115, 116}; - int64_t data1[dataSize] = {221, 222, 223, 224, 225, 226}; - void *datas[2] = {data0, data1}; - DataTypes sourceTypes(std::vector({ std::make_unique(), std::make_unique()})); - int32_t ids[] = {0, 1, 2, 3, 4, 5}; - VectorBatch *vectorBatch = new VectorBatch(3, dataSize); - VectorAllocator *allocator = omniruntime::vec::GetProcessGlobalVecAllocator(); - IntVector *intVectorTmp = new IntVector(allocator, 6); - for (int i = 0; i < intVectorTmp->GetSize(); i++) { - intVectorTmp->SetValue(i, (i+1) % partitionNum); - } - for (int32_t i = 0; i < 3; i ++) { - if (i == 0) { - vectorBatch->SetVector(i, intVectorTmp); - } else { - omniruntime::vec::DataType dataType = *(sourceTypes.Get()[i - 1]); - vectorBatch->SetVector(i, CreateDictionaryVector(dataType, dataSize, ids, dataSize, datas[i - 1])); - } + auto *col0 = new int32_t[dataSize]; + for (int32_t i = 0; i< dataSize; i++) { + col0[i] = (i + 1) % partitionNum; } + int32_t col1[dataSize] = {111, 112, 113, 114, 115, 116}; + int64_t col2[dataSize] = {221, 222, 223, 224, 225, 226}; + void *datas[2] = {col1, col2}; + DataTypes sourceTypes(std::vector({ IntType(), LongType() })); + int32_t ids[] = {0, 1, 2, 3, 4, 5}; + + VectorBatch *vectorBatch = new VectorBatch(dataSize); + auto Vec0 = CreateVector(dataSize, col0); + vectorBatch->Append(Vec0.release()); + auto dicVec0 = CreateDictionaryVector(*sourceTypes.GetType(0), dataSize, ids, dataSize, datas[0]); + auto dicVec1 = CreateDictionaryVector(*sourceTypes.GetType(1), dataSize, ids, dataSize, datas[1]); + vectorBatch->Append(dicVec0.release()); + vectorBatch->Append(dicVec1.release()); + + delete[] col0; return vectorBatch; } VectorBatch* CreateVectorBatch_1decimal128Col_withPid(int partitionNum, int rowNum) { - auto decimal128InputVec = buildVector(Decimal128DataType(38, 2), rowNum); - VectorAllocator *allocator = VectorAllocator::GetGlobalAllocator(); - IntVector *intVectorPid = new IntVector(allocator, rowNum); - for (int i = 0; i < intVectorPid->GetSize(); i++) { - intVectorPid->SetValue(i, (i+1) % partitionNum); + const int32_t numRows = rowNum; + DataTypes inputTypes(std::vector({ IntType(), Decimal128Type(38, 2) })); + + auto *col0 = new int32_t[numRows]; + auto *col1 = new Decimal128[numRows]; + for (int32_t i = 0; i < numRows; i++) { + col0[i] = (i + 1) % partitionNum; + col1[i] = Decimal128(0, 1); } - VectorBatch *vecBatch = new VectorBatch(2); - vecBatch->SetVector(0, intVectorPid); - vecBatch->SetVector(1, decimal128InputVec); - return vecBatch; + + VectorBatch* in = CreateVectorBatch(inputTypes, numRows, col0, col1); + delete[] col0; + delete[] col1; + return in; } VectorBatch* CreateVectorBatch_1decimal64Col_withPid(int partitionNum, int rowNum) { - auto decimal64InputVec = buildVector(Decimal64DataType(7, 2), rowNum); - VectorAllocator *allocator = VectorAllocator::GetGlobalAllocator(); - IntVector *intVectorPid = new IntVector(allocator, rowNum); - for (int i = 0; i < intVectorPid->GetSize(); i++) { - intVectorPid->SetValue(i, (i+1) % partitionNum); + const int32_t numRows = rowNum; + DataTypes inputTypes(std::vector({ IntType(), Decimal64Type(7, 2) })); + + auto *col0 = new int32_t[numRows]; + auto *col1 = new int64_t[numRows]; + for (int32_t i = 0; i < numRows; i++) { + col0[i] = (i + 1) % partitionNum; + col1[i] = 1; } - VectorBatch *vecBatch = new VectorBatch(2); - vecBatch->SetVector(0, intVectorPid); - vecBatch->SetVector(1, decimal64InputVec); - return vecBatch; + + VectorBatch* in = CreateVectorBatch(inputTypes, numRows, col0, col1); + delete[] col0; + delete[] col1; + return in; } VectorBatch* CreateVectorBatch_2decimalCol_withPid(int partitionNum, int rowNum) { - auto decimal64InputVec = buildVector(Decimal64DataType(7, 2), rowNum); - auto decimal128InputVec = buildVector(Decimal128DataType(38, 2), rowNum); - VectorAllocator *allocator = VectorAllocator::GetGlobalAllocator(); - IntVector *intVectorPid = new IntVector(allocator, rowNum); - for (int i = 0; i < intVectorPid->GetSize(); i++) { - intVectorPid->SetValue(i, (i+1) % partitionNum); + const int32_t numRows = rowNum; + DataTypes inputTypes(std::vector({ IntType(), Decimal64Type(7, 2), Decimal128Type(38, 2) })); + + auto *col0 = new int32_t[numRows]; + auto *col1 = new int64_t[numRows]; + auto *col2 = new Decimal128[numRows]; + for (int32_t i = 0; i < numRows; i++) { + col0[i] = (i + 1) % partitionNum; + col1[i] = 1; + col2[i] = Decimal128(0, 1); } - VectorBatch *vecBatch = new VectorBatch(3); - vecBatch->SetVector(0, intVectorPid); - vecBatch->SetVector(1, decimal64InputVec); - vecBatch->SetVector(2, decimal128InputVec); - return vecBatch; + + VectorBatch* in = CreateVectorBatch(inputTypes, numRows, col0, col1, col2); + delete[] col0; + delete[] col1; + delete[] col2; + return in; } VectorBatch* CreateVectorBatch_someNullRow_vectorBatch() { const int32_t numRows = 6; + const int32_t numCols = 6; bool data0[numRows] = {true, false, true, false, true, false}; int16_t data1[numRows] = {0, 1, 2, 3, 4, 6}; int32_t data2[numRows] = {0, 1, 2, 0, 1, 2}; @@ -584,50 +337,32 @@ VectorBatch* CreateVectorBatch_someNullRow_vectorBatch() { double data4[numRows] = {0.0, 1.1, 2.2, 3.3, 4.4, 5.5}; std::string data5[numRows] = {"abcde", "fghij", "klmno", "pqrst", "", ""}; - auto vec0 = CreateVector(data0, numRows); - auto vec1 = CreateVector(data1, numRows); - auto vec2 = CreateVector(data2, numRows); - auto vec3 = CreateVector(data3, numRows); - auto vec4 = CreateVector(data4, numRows); - auto vec5 = CreateVarcharVector(VarcharDataType(5), data5, numRows); - for (int i = 0; i < numRows; i = i + 2) { - vec0->SetValueNull(i); - vec1->SetValueNull(i); - vec2->SetValueNull(i); - vec3->SetValueNull(i); - vec4->SetValueNull(i); - vec5->SetValueNull(i); + DataTypes inputTypes( + std::vector({ BooleanType(), ShortType(), IntType(), LongType(), DoubleType(), VarcharType(5) })); + VectorBatch* vecBatch = CreateVectorBatch(inputTypes, numRows, data0, data1, data2, data3, data4, data5); + for (int32_t i = 0; i < numCols; i++) { + for (int32_t j = 0; j < numRows; j = j + 2) { + vecBatch->Get(i)->SetNull(j); + } } - VectorBatch *vecBatch = new VectorBatch(6); - vecBatch->SetVector(0, vec0); - vecBatch->SetVector(1, vec1); - vecBatch->SetVector(2, vec2); - vecBatch->SetVector(3, vec3); - vecBatch->SetVector(4, vec4); - vecBatch->SetVector(5, vec5); return vecBatch; } VectorBatch* CreateVectorBatch_someNullCol_vectorBatch() { const int32_t numRows = 6; + const int32_t numCols = 4; int32_t data1[numRows] = {0, 1, 2, 0, 1, 2}; int64_t data2[numRows] = {0, 1, 2, 3, 4, 5}; double data3[numRows] = {0.0, 1.1, 2.2, 3.3, 4.4, 5.5}; std::string data4[numRows] = {"abcde", "fghij", "klmno", "pqrst", "", ""}; - auto vec0 = CreateVector(data1, numRows); - auto vec1 = CreateVector(data2, numRows); - auto vec2 = CreateVector(data3, numRows); - auto vec3 = CreateVarcharVector(VarcharDataType(5), data4, numRows); - for (int i = 0; i < numRows; i = i + 1) { - vec1->SetValueNull(i); - vec3->SetValueNull(i); + DataTypes inputTypes(std::vector({ IntType(), LongType(), DoubleType(), VarcharType(5) })); + VectorBatch* vecBatch = CreateVectorBatch(inputTypes, numRows, data1, data2, data3, data4); + for (int32_t i = 0; i < numCols; i = i + 2) { + for (int32_t j = 0; j < numRows; j++) { + vecBatch->Get(i)->SetNull(j); + } } - VectorBatch *vecBatch = new VectorBatch(4); - vecBatch->SetVector(0, vec0); - vecBatch->SetVector(1, vec1); - vecBatch->SetVector(2, vec2); - vecBatch->SetVector(3, vec3); return vecBatch; } diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h index 496a4cc6fc6d0a8834a95db72ccccb5376fe02b6..aad8ca49fb3ded5cdcfc44ee53f7b18d52389efa 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h @@ -32,15 +32,62 @@ static ConcurrentMap> shuffle_splitter_holder_; static std::string s_shuffle_tests_dir = "/tmp/shuffleTests"; -VectorBatch* CreateInputData(const int32_t numRows, const int32_t numCols, int32_t* inputTypeIds, int64_t* allData); +VectorBatch *CreateVectorBatch(const DataTypes &types, int32_t rowCount, ...); -Vector *buildVector(const DataType &aggType, int32_t rowNumber); +std::unique_ptr CreateVector(DataType &dataType, int32_t rowCount, va_list &args); + +template std::unique_ptr CreateVector(int32_t length, T *values) +{ + std::unique_ptr> vector = std::make_unique>(length); + for (int32_t i = 0; i < length; i++) { + vector->SetValue(i, values[i]); + } + return vector; +} + +template +std::unique_ptr CreateFlatVector(int32_t length, va_list &args) +{ + using namespace omniruntime::type; + using T = typename NativeType::type; + using VarcharVector = Vector>; + if constexpr (std::is_same_v || std::is_same_v) { + std::unique_ptr vector = std::make_unique(length); + std::string *str = va_arg(args, std::string *); + for (int32_t i = 0; i < length; i++) { + std::string_view value(str[i].data(), str[i].length()); + vector->SetValue(i, value); + } + return vector; + } else { + std::unique_ptr> vector = std::make_unique>(length); + T *value = va_arg(args, T *); + for (int32_t i = 0; i < length; i++) { + vector->SetValue(i, value[i]); + } + return vector; + } +} + +std::unique_ptr CreateDictionaryVector(DataType &dataType, int32_t rowCount, int32_t *ids, int32_t idsCount, + ...); + +template +std::unique_ptr CreateDictionary(BaseVector *vector, int32_t *ids, int32_t size) +{ + using T = typename NativeType::type; + if constexpr (std::is_same_v || std::is_same_v) { + return VectorHelper::CreateStringDictionary(ids, size, + reinterpret_cast> *>(vector)); + } + return VectorHelper::CreateDictionary(ids, size, reinterpret_cast *>(vector)); +} VectorBatch* CreateVectorBatch_1row_varchar_withPid(int pid, std::string inputChar); VectorBatch* CreateVectorBatch_4col_withPid(int parNum, int rowNum); -VectorBatch* CreateVectorBatch_1FixCol_withPid(int parNum, int rowNum, int32_t fixColType); +VectorBatch* CreateVectorBatch_1FixCol_withPid(int parNum, int rowNum, DataTypePtr fixColType); VectorBatch* CreateVectorBatch_2column_1row_withPid(int pid, std::string strVar, int intVar); @@ -79,14 +126,6 @@ void Test_splitter_stop(long splitter_id); void Test_splitter_close(long splitter_id); -template T *CreateVector(V *values, int32_t length) -{ - VectorAllocator *vecAllocator = omniruntime::vec::GetProcessGlobalVecAllocator(); - auto vector = new T(vecAllocator, length); - vector->SetValues(0, values, length); - return vector; -} - void GetFilePath(const char *path, const char *filename, char *filepath); void DeletePathAll(const char* path); diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java index 1e4d1c7bb053489559359408c5460b59adc36a4b..d80a236533c6b2b3305b2f443b759877239d6089 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/OrcColumnarBatchJniReader.java @@ -19,7 +19,6 @@ package com.huawei.boostkit.spark.jni; import nova.hetu.omniruntime.type.DataType; -import nova.hetu.omniruntime.type.Decimal128DataType; import nova.hetu.omniruntime.vector.*; import org.apache.spark.sql.catalyst.util.RebaseDateTime; @@ -273,7 +272,7 @@ public class OrcColumnarBatchJniReader { break; } case OMNI_DECIMAL128: { - vecList[i] = new Decimal128Vec(vecNativeIds[nativeGetId], Decimal128DataType.DECIMAL128); + vecList[i] = new Decimal128Vec(vecNativeIds[nativeGetId]); break; } default: { diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java b/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java index 808f96e1fb666def4ff9fc224f01020a81a5baf7..5379fd7c9501762279f4fa0279263c9658e4d827 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java @@ -194,32 +194,32 @@ public class OmniColumnVector extends WritableColumnVector { @Override public boolean hasNull() { if (dictionaryData != null) { - return dictionaryData.hasNullValue(); + return dictionaryData.hasNull(); } if (type instanceof BooleanType) { - return booleanDataVec.hasNullValue(); + return booleanDataVec.hasNull(); } else if (type instanceof ByteType) { - return charsTypeDataVec.hasNullValue(); + return charsTypeDataVec.hasNull(); } else if (type instanceof ShortType) { - return shortDataVec.hasNullValue(); + return shortDataVec.hasNull(); } else if (type instanceof IntegerType) { - return intDataVec.hasNullValue(); + return intDataVec.hasNull(); } else if (type instanceof DecimalType) { if (DecimalType.is64BitDecimalType(type)) { - return longDataVec.hasNullValue(); + return longDataVec.hasNull(); } else { - return decimal128DataVec.hasNullValue(); + return decimal128DataVec.hasNull(); } } else if (type instanceof LongType || DecimalType.is64BitDecimalType(type)) { - return longDataVec.hasNullValue(); + return longDataVec.hasNull(); } else if (type instanceof FloatType) { return false; } else if (type instanceof DoubleType) { - return doubleDataVec.hasNullValue(); + return doubleDataVec.hasNull(); } else if (type instanceof StringType) { - return charsTypeDataVec.hasNullValue(); + return charsTypeDataVec.hasNull(); } else if (type instanceof DateType) { - return intDataVec.hasNullValue(); + return intDataVec.hasNull(); } throw new UnsupportedOperationException("hasNull is not supported for type:" + type); } @@ -806,7 +806,7 @@ public class OmniColumnVector extends WritableColumnVector { if (type instanceof BooleanType) { booleanDataVec = new BooleanVec(newCapacity); } else if (type instanceof ByteType) { - charsTypeDataVec = new VarcharVec(newCapacity * 4, newCapacity); + charsTypeDataVec = new VarcharVec(newCapacity); } else if (type instanceof ShortType) { shortDataVec = new ShortVec(newCapacity); } else if (type instanceof IntegerType) { @@ -825,7 +825,7 @@ public class OmniColumnVector extends WritableColumnVector { doubleDataVec = new DoubleVec(newCapacity); } else if (type instanceof StringType) { // need to set with real column size, suppose char(200) utf8 - charsTypeDataVec = new VarcharVec(newCapacity * 4 * 200, newCapacity); + charsTypeDataVec = new VarcharVec(newCapacity); } else if (type instanceof DateType) { intDataVec = new IntVec(newCapacity); } else { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala index 6886a6f667b5188245d9fffe6b387de3fa01fe8b..ed99f6b4311a48492438095a87d450f7d9d89a5a 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala @@ -123,7 +123,7 @@ object OmniAdaptorUtil { } offsets(i + 1) = totalSize } - val vec = new VarcharVec(totalSize, columnSize) + val vec = new VarcharVec(columnSize) val values = new Array[Byte](totalSize) for (i <- 0 until columnSize) { if (null != columnVector.getUTF8String(i)) { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index a68c8d020b6ddaccf0af743881632016ad6bc70a..d1ce868dfc0b16360ec5a23826575236d97e6bf9 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -436,9 +436,11 @@ case class ColumnarBroadcastHashJoinExec( index += 1 } } - numOutputRows += result.getRowCount + val rowCnt: Int = result.getRowCount + numOutputRows += rowCnt numOutputVecBatchs += 1 - new ColumnarBatch(vecs.toArray, result.getRowCount) + result.close() + new ColumnarBatch(vecs.toArray, rowCnt) } } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala index 2fe9a1475ab729ceececa7c2b533006e2ee97e03..8f22135f5aea14db255295ff64d364df4e2225dc 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala @@ -331,9 +331,11 @@ case class ColumnarShuffledHashJoinExec( index += 1 } } - numOutputRows += result.getRowCount + val rowCnt: Int = result.getRowCount + numOutputRows += rowCnt numOutputVecBatchs += 1 - new ColumnarBatch(vecs.toArray, result.getRowCount) + result.close() + new ColumnarBatch(vecs.toArray, rowCnt) } } if ("FULL OUTER" == joinType.sql) { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala index bfec7121a7fab2963cf8c25dec38cdccad9420f4..f811608c716bbff066d2f469ab4d59887f4e58d2 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala @@ -152,7 +152,7 @@ case class ColumnarSortMergeJoinExec( val joinCondStr = if (condition.isDefined) { s"${condition.get}${condition.get.dataType}" } else "None" - + s""" |$formattedNodeName |$simpleStringWithNodeId @@ -437,7 +437,7 @@ case class ColumnarSortMergeJoinExec( case DataType.DataTypeId.OMNI_BOOLEAN => new BooleanVec(0) case DataType.DataTypeId.OMNI_CHAR | DataType.DataTypeId.OMNI_VARCHAR => - new VarcharVec(0, 0) + new VarcharVec(0) case DataType.DataTypeId.OMNI_DECIMAL128 => new Decimal128Vec(0) case DataType.DataTypeId.OMNI_SHORT => diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/util/MergeIterator.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/util/MergeIterator.scala index c57ce668f4500a1f836459185439be133e53047c..93ec7d89b01fc4fee52c4abeb77b5618f1c48481 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/util/MergeIterator.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/util/MergeIterator.scala @@ -57,7 +57,7 @@ class MergeIterator(iter: Iterator[ColumnarBatch], localSchema: StructType, vecs(index) = new BooleanVec(columnSize) case StringType => val vecType: DataType = sparkTypeToOmniType(field.dataType, field.metadata) - vecs(index) = new VarcharVec(VarcharVec.INIT_CAPACITY_IN_BYTES, columnSize) + vecs(index) = new VarcharVec(columnSize) case dt: DecimalType => if (DecimalType.is64BitDecimalType(dt)) { vecs(index) = new LongVec(columnSize) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/util/SparkMemoryUtils.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/util/SparkMemoryUtils.scala index 6012da931bb3b93ef8a3e6690d42ba3d1e4949e0..946c90a9baf346dc4e47253ced50a53def22374b 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/util/SparkMemoryUtils.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/util/SparkMemoryUtils.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.execution.util -import nova.hetu.omniruntime.vector.VecAllocator - +import nova.hetu.omniruntime.memory +import nova.hetu.omniruntime.memory.MemoryManager import org.apache.spark.{SparkEnv, TaskContext} object SparkMemoryUtils { private val max: Long = SparkEnv.get.conf.getSizeAsBytes("spark.memory.offHeap.size", "1g") - VecAllocator.setRootAllocatorLimit(max) + MemoryManager.setGlobalMemoryLimit(max) def init(): Unit = {} diff --git a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleTest.java b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleTest.java index 74fccca66fad64dac9c96ae5f60591de40e92012..8be5702dfbabc5bc847e4ebe547d1d4dfa243e6f 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleTest.java +++ b/omnioperator/omniop-spark-extension/java/src/test/java/com/huawei/boostkit/spark/ColumnShuffleTest.java @@ -141,7 +141,7 @@ abstract class ColumnShuffleTest { } case OMNI_VARCHAR: case OMNI_CHAR: { - tmpVec = new VarcharVec(rowNum * 16, rowNum); + tmpVec = new VarcharVec(rowNum); for (int j = 0; j < rowNum; j++) { ((VarcharVec)tmpVec).set(j, ("VAR_" + (j + 1) + "_END").getBytes(StandardCharsets.UTF_8)); if (mixHalfNull && (j % 2) == 0) { @@ -196,7 +196,7 @@ abstract class ColumnShuffleTest { public List buildValChar(int pid, String varChar) { IntVec c0 = new IntVec(1); - VarcharVec c1 = new VarcharVec(8, 1); + VarcharVec c1 = new VarcharVec(1); c0.set(0, pid); c1.set(0, varChar.getBytes(StandardCharsets.UTF_8)); List columns = new ArrayList<>(); diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala index 00adf145979e33f7dd7b1c49873fd72cdff18756..998791c8c0b11499a6cd92ca3ffdffc1a7f9b0fc 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala @@ -328,7 +328,7 @@ object ColumnarShuffleWriterSuite { def initOmniColumnVarcharVector(values: Array[java.lang.String]): OmniColumnVector = { val length = values.length - val vecTmp = new VarcharVec(1024, length) + val vecTmp = new VarcharVec(length) (0 until length).foreach { i => if (values(i) != null) { vecTmp.set(i, values(i).getBytes())