diff --git a/omnioperator/omniop-native-reader/cpp/config.h b/omnioperator/omniop-native-reader/cpp/config.h index 71d819b34c775afde138dfb1db023fb20408c46b..f2f406a4eb63af6dc9940a38a9e63b24319fef3d 100644 --- a/omnioperator/omniop-native-reader/cpp/config.h +++ b/omnioperator/omniop-native-reader/cpp/config.h @@ -1,20 +1,2 @@ -/* - * Copyright (C) 2023-2023. Huawei Technologies Co., Ltd. All rights reserved. - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -//#cmakedefine DEBUG_RUNTIME -//#cmakedefine TRACE_RUNTIME \ No newline at end of file +/* #undef DEBUG_RUNTIME */ +/* #undef TRACE_RUNTIME */ diff --git a/omnioperator/omniop-spark-extension/cpp/config.h b/omnioperator/omniop-spark-extension/cpp/config.h index 9c9637a16d96737d75c128d3fe0bda6c87c82172..f2f406a4eb63af6dc9940a38a9e63b24319fef3d 100644 --- a/omnioperator/omniop-spark-extension/cpp/config.h +++ b/omnioperator/omniop-spark-extension/cpp/config.h @@ -1,20 +1,2 @@ -/* - * Copyright (C) 2020-2022. Huawei Technologies Co., Ltd. All rights reserved. - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -//#cmakedefine DEBUG_RUNTIME -//#cmakedefine TRACE_RUNTIME \ No newline at end of file +/* #undef DEBUG_RUNTIME */ +/* #undef TRACE_RUNTIME */ diff --git a/omnioperator/omniop-spark-extension/cpp/src/proto/vec_data.proto b/omnioperator/omniop-spark-extension/cpp/src/proto/vec_data.proto index 725f9fa070aa1f8d188d85118df9765a63d299f3..869105748977fa573853d6f181fe5ccef2a2300a 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/proto/vec_data.proto +++ b/omnioperator/omniop-spark-extension/cpp/src/proto/vec_data.proto @@ -58,3 +58,11 @@ message VecType { } TimeUnit timeUnit = 6; } + +message ProtoRowBatch { + int32 rowCnt = 1; + int32 vecCnt = 2; + repeated VecType vecTypes = 3; + bytes rows = 4; + bytes offsets = 5; +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 4ceccb11e35833419cb35e3083673d3ce0f25d49..8256e36aa931a9add39a01602880e5a2d5d6d06a 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -20,14 +20,37 @@ #include "splitter.h" #include "utils.h" +using namespace omniruntime::vec; uint64_t SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD = UINT64_MAX; SplitOptions SplitOptions::Defaults() { return SplitOptions(); } +void Splitter::BuildPartition2Row(int32_t num_rows) +{ + row_offset_row_id_.resize(num_rows); + partition_row_offset_base_.resize(num_partitions_ + 1); + for (auto pid = 1; pid <= num_partitions_; ++pid) { + partition_row_offset_base_[pid] = partition_row_offset_base_[pid - 1] + partition_id_cnt_cur_[pid - 1]; + } + for (auto row = 0; row < num_rows; ++row) { + auto pid = partition_id_[row]; + row_offset_row_id_[partition_row_offset_base_[pid]++] = row; + } + for (auto pid = 0; pid < num_partitions_; ++pid) { + partition_row_offset_base_[pid] -= partition_id_cnt_cur_[pid]; + } + partition_used_.clear(); + for (auto pid = 0; pid != num_partitions_; ++pid) { + if (partition_id_cnt_cur_[pid] > 0) { + partition_used_.push_back(pid); + } + } +} + // 计算分区id,每个batch初始化 int Splitter::ComputeAndCountPartitionId(VectorBatch& vb) { auto num_rows = vb.GetRowCount(); - std::fill(std::begin(partition_id_cnt_cur_), std::end(partition_id_cnt_cur_), 0); + memset_s(partition_id_cnt_cur_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); partition_id_.resize(num_rows); if (singlePartitionFlag) { @@ -123,49 +146,45 @@ int Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t new_size) { int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { const auto num_rows = vb.GetRowCount(); for (uint col = 0; col < fixed_width_array_idx_.size(); ++col) { - std::fill(std::begin(partition_buffer_idx_offset_), - std::end(partition_buffer_idx_offset_), 0); + memset_s(partition_buffer_idx_offset_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); auto col_idx_vb = fixed_width_array_idx_[col]; auto col_idx_schema = singlePartitionFlag ? col_idx_vb : (col_idx_vb - 1); const auto& dst_addrs = partition_fixed_width_value_addrs_[col]; if (vb.Get(col_idx_vb)->GetEncoding() == OMNI_DICTIONARY) { LogsDebug("Dictionary Columnar process!"); - auto ids_addr = VectorHelper::UnsafeGetValues(vb.Get(col_idx_vb)); + auto ids_addr = static_cast(VectorHelper::UnsafeGetValues(vb.Get(col_idx_vb))); auto src_addr = reinterpret_cast(VectorHelper::UnsafeGetDictionary(vb.Get(col_idx_vb))); + auto process = [&](const ShuffleTypeId shuffleTypeId) { + const auto shuffle_size = (1 << shuffleTypeId); + for (auto &pid: partition_used_) { + auto dstPidBase = reinterpret_cast(dst_addrs[pid]) + partition_buffer_idx_base_[pid]; + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + auto count = end - pos; + for (; pos < end; ++pos) { + auto rowId = row_offset_row_id_[pos]; + *dstPidBase++ = reinterpret_cast(src_addr)[ids_addr[rowId]]; + } + partition_fixed_width_buffers_[col][pid][1]->size_ += shuffle_size * count; + partition_buffer_idx_offset_[pid] += count; + } + }; switch (column_type_id_[col_idx_schema]) { -#define PROCESS(SHUFFLE_TYPE, CTYPE) \ - case SHUFFLE_TYPE: \ - for (auto row = 0; row < num_rows; ++row) { \ - auto pid = partition_id_[row]; \ - auto dst_offset = \ - partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; \ - reinterpret_cast(dst_addrs[pid])[dst_offset] = \ - reinterpret_cast(src_addr)[reinterpret_cast(ids_addr)[row]]; \ - partition_fixed_width_buffers_[col][pid][1]->size_ += (1 << SHUFFLE_TYPE); \ - partition_buffer_idx_offset_[pid]++; \ - } \ - break; - PROCESS(SHUFFLE_1BYTE, uint8_t) - PROCESS(SHUFFLE_2BYTE, uint16_t) - PROCESS(SHUFFLE_4BYTE, uint32_t) - PROCESS(SHUFFLE_8BYTE, uint64_t) -#undef PROCESS + case SHUFFLE_1BYTE: + process.operator()(SHUFFLE_1BYTE); + break; + case SHUFFLE_2BYTE: + process.operator()(SHUFFLE_2BYTE); + break; + case SHUFFLE_4BYTE: + process.operator()(SHUFFLE_4BYTE); + break; + case SHUFFLE_8BYTE: + process.operator()(SHUFFLE_8BYTE); + break; case SHUFFLE_DECIMAL128: - for (auto row = 0; row < num_rows; ++row) { - auto pid = partition_id_[row]; - auto dst_offset = - partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; - // 前64位取值、赋值 - reinterpret_cast(dst_addrs[pid])[dst_offset << 1] = - reinterpret_cast(src_addr)[reinterpret_cast(ids_addr)[row] << 1]; - // 后64位取值、赋值 - reinterpret_cast(dst_addrs[pid])[(dst_offset << 1) | 1] = - reinterpret_cast(src_addr)[(reinterpret_cast(ids_addr)[row] << 1) | 1]; - partition_fixed_width_buffers_[col][pid][1]->size_ += - (1 << SHUFFLE_DECIMAL128); //decimal128 16Bytes - partition_buffer_idx_offset_[pid]++; - } + process.operator()(SHUFFLE_DECIMAL128); break; default: { LogsError("SplitFixedWidthValueBuffer not match this type: %d", column_type_id_[col_idx_schema]); @@ -174,37 +193,37 @@ int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { } } else { auto src_addr = reinterpret_cast(VectorHelper::UnsafeGetValues(vb.Get(col_idx_vb))); + auto process = [&](const ShuffleTypeId shuffleTypeId) { + const auto shuffle_size = (1 << shuffleTypeId); + for (auto &pid: partition_used_) { + auto dst_offset = partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; + auto dstPidBase = reinterpret_cast(dst_addrs[pid]) + dst_offset; + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + auto count = end - pos; + for (; pos < end; ++pos) { + auto rowId = row_offset_row_id_[pos]; + *dstPidBase++ = reinterpret_cast(src_addr)[rowId]; + } + partition_fixed_width_buffers_[col][pid][1]->size_ += shuffle_size * count; + partition_buffer_idx_offset_[pid] += count; + } + }; switch (column_type_id_[col_idx_schema]) { -#define PROCESS(SHUFFLE_TYPE, CTYPE) \ - case SHUFFLE_TYPE: \ - for (auto row = 0; row < num_rows; ++row) { \ - auto pid = partition_id_[row]; \ - auto dst_offset = \ - partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; \ - reinterpret_cast(dst_addrs[pid])[dst_offset] = \ - reinterpret_cast(src_addr)[row]; \ - partition_fixed_width_buffers_[col][pid][1]->size_ += (1 << SHUFFLE_TYPE); \ - partition_buffer_idx_offset_[pid]++; \ - } \ - break; - PROCESS(SHUFFLE_1BYTE, uint8_t) - PROCESS(SHUFFLE_2BYTE, uint16_t) - PROCESS(SHUFFLE_4BYTE, uint32_t) - PROCESS(SHUFFLE_8BYTE, uint64_t) -#undef PROCESS + case SHUFFLE_1BYTE: + process.operator()(SHUFFLE_1BYTE); + break; + case SHUFFLE_2BYTE: + process.operator()(SHUFFLE_2BYTE); + break; + case SHUFFLE_4BYTE: + process.operator()(SHUFFLE_4BYTE); + break; + case SHUFFLE_8BYTE: + process.operator()(SHUFFLE_8BYTE); + break; case SHUFFLE_DECIMAL128: - for (auto row = 0; row < num_rows; ++row) { - auto pid = partition_id_[row]; - auto dst_offset = - partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; - reinterpret_cast(dst_addrs[pid])[dst_offset << 1] = - reinterpret_cast(src_addr)[row << 1]; // 前64位取值、赋值 - reinterpret_cast(dst_addrs[pid])[(dst_offset << 1) | 1] = - reinterpret_cast(src_addr)[(row << 1) | 1]; // 后64位取值、赋值 - partition_fixed_width_buffers_[col][pid][1]->size_ += - (1 << SHUFFLE_DECIMAL128); //decimal128 16Bytes - partition_buffer_idx_offset_[pid]++; - } + process.operator()(SHUFFLE_DECIMAL128); break; default: { LogsError("ERROR: SplitFixedWidthValueBuffer not match this type: %d", column_type_id_[col_idx_schema]); @@ -229,73 +248,93 @@ void Splitter::SplitBinaryVector(BaseVector *varcharVector, int col_schema) { if (varcharVector->GetEncoding() == OMNI_DICTIONARY) { auto vc = reinterpret_cast> *>( varcharVector); - cached_vectorbatch_size_ += num_rows * (sizeof(bool) + sizeof(int32_t)); - for (auto row = 0; row < num_rows; ++row) { - auto pid = partition_id_[row]; + cached_vectorbatch_size_ += num_rows * (sizeof(bool) + sizeof(int32_t)); + for (auto &pid: partition_used_) { uint8_t *dst = nullptr; uint32_t str_len = 0; - if (!vc->IsNull(row)) { - std::string_view value = vc->GetValue(row); - dst = reinterpret_cast(reinterpret_cast(value.data())); - str_len = static_cast(value.length()); - } - if constexpr (hasNull) { - is_null = vc->IsNull(row); - } - cached_vectorbatch_size_ += str_len; // 累计变长部分cache数据 - VCLocation cl((uint64_t) dst, str_len, is_null); - if ((vc_partition_array_buffers_[pid][col_schema].size() != 0) && - (vc_partition_array_buffers_[pid][col_schema].back().getVcList().size() < - options_.spill_batch_row_num)) { - if constexpr(hasNull) { - HandleNull(vc_partition_array_buffers_[pid][col_schema].back(), is_null); + auto index = 0; + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + for (; pos < end; ++pos, ++index) { + auto rowId = row_offset_row_id_[pos]; + if constexpr (hasNull) { + if (!vc->IsNull(rowId)) { + std::string_view value = vc->GetValue(rowId); + dst = reinterpret_cast(reinterpret_cast(value.data())); + str_len = static_cast(value.length()); + } + } else { + std::string_view value = vc->GetValue(rowId); + dst = reinterpret_cast(reinterpret_cast(value.data())); + str_len = static_cast(value.length()); } - vc_partition_array_buffers_[pid][col_schema].back().getVcList().push_back(cl); - vc_partition_array_buffers_[pid][col_schema].back().vcb_total_len += str_len; - } else { - VCBatchInfo svc(options_.spill_batch_row_num); - svc.getVcList().push_back(cl); - svc.vcb_total_len += str_len; if constexpr (hasNull) { - HandleNull(svc, is_null); + is_null = vc->IsNull(rowId); + } + cached_vectorbatch_size_ += str_len; // 累计变长部分cache数据 + if ((vc_partition_array_buffers_[pid][col_schema].size() != 0) && + (vc_partition_array_buffers_[pid][col_schema].back().getVcList().size() < + options_.spill_batch_row_num)) { + if constexpr (hasNull) { + HandleNull(vc_partition_array_buffers_[pid][col_schema].back(), is_null); + } + vc_partition_array_buffers_[pid][col_schema].back().getVcList().emplace_back((uint64_t)dst, str_len, is_null); + vc_partition_array_buffers_[pid][col_schema].back().vcb_total_len += str_len; + } else { + VCBatchInfo svc(options_.spill_batch_row_num); + svc.getVcList().emplace_back((uint64_t)dst, str_len, is_null); + svc.vcb_total_len += str_len; + if constexpr (hasNull) { + HandleNull(svc, is_null); + } + vc_partition_array_buffers_[pid][col_schema].emplace_back(svc); } - vc_partition_array_buffers_[pid][col_schema].push_back(svc); } } } else { auto vc = reinterpret_cast> *>(varcharVector); - cached_vectorbatch_size_ += num_rows * (sizeof(bool) + sizeof(int32_t)) + sizeof(int32_t); - for (auto row = 0; row < num_rows; ++row) { - auto pid = partition_id_[row]; + cached_vectorbatch_size_ += num_rows * (sizeof(bool) + sizeof(int32_t)) + sizeof(int32_t); + for (auto &pid: partition_used_) { + auto &vc_partition_array = vc_partition_array_buffers_[pid]; uint8_t *dst = nullptr; uint32_t str_len = 0; - if (!vc->IsNull(row)) { - std::string_view value = vc->GetValue(row); - dst = reinterpret_cast(reinterpret_cast(value.data())); - str_len = static_cast(value.length()); - } - - if constexpr (hasNull) { - is_null = vc->IsNull(row); - } - cached_vectorbatch_size_ += str_len; // 累计变长部分cache数据 - VCLocation cl((uint64_t) dst, str_len, is_null); - if ((vc_partition_array_buffers_[pid][col_schema].size() != 0) && - (vc_partition_array_buffers_[pid][col_schema].back().getVcList().size() < - options_.spill_batch_row_num)) { - if constexpr(hasNull) { - HandleNull(vc_partition_array_buffers_[pid][col_schema].back(), is_null); + auto index = 0; + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + for (; pos < end; ++pos, ++index) { + auto rowId = row_offset_row_id_[pos]; + if constexpr (hasNull) { + if (!vc->IsNull(rowId)) { + std::string_view value = vc->GetValue(rowId); + dst = reinterpret_cast(reinterpret_cast(value.data())); + str_len = static_cast(value.length()); + } + } else { + std::string_view value = vc->GetValue(rowId); + dst = reinterpret_cast(reinterpret_cast(value.data())); + str_len = static_cast(value.length()); } - vc_partition_array_buffers_[pid][col_schema].back().getVcList().push_back(cl); - vc_partition_array_buffers_[pid][col_schema].back().vcb_total_len += str_len; - } else { - VCBatchInfo svc(options_.spill_batch_row_num); - svc.getVcList().push_back(cl); - if constexpr(hasNull) { - HandleNull(svc, is_null); + if constexpr (hasNull) { + is_null = vc->IsNull(rowId); + } + cached_vectorbatch_size_ += str_len; // 累计变长部分cache数据 + if ((vc_partition_array[col_schema].size() != 0) && + (vc_partition_array[col_schema].back().getVcList().size() < + options_.spill_batch_row_num)) { + if constexpr (hasNull) { + HandleNull(vc_partition_array[col_schema].back(), is_null); + } + vc_partition_array[col_schema].back().getVcList().emplace_back((uint64_t)dst, str_len, is_null); + vc_partition_array[col_schema].back().vcb_total_len += str_len; + } else { + VCBatchInfo svc(options_.spill_batch_row_num); + svc.getVcList().emplace_back((uint64_t)dst, str_len, is_null); + if constexpr (hasNull) { + HandleNull(svc, is_null); + } + svc.vcb_total_len += str_len; + vc_partition_array[col_schema].emplace_back(svc); } - svc.vcb_total_len += str_len; - vc_partition_array_buffers_[pid][col_schema].push_back(svc); } } } @@ -337,7 +376,9 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_cur_[pid] > 0 && dst_addrs[pid] == nullptr) { // init bitmap if it's null - auto new_size = partition_id_cnt_cur_[pid] > options_.buffer_size ? partition_id_cnt_cur_[pid] : options_.buffer_size; + auto new_size = partition_id_cnt_cur_[pid] > options_.buffer_size + ? partition_id_cnt_cur_[pid] + : options_.buffer_size; auto ptr_tmp = static_cast(options_.allocator->Alloc(new_size)); if (nullptr == ptr_tmp) { throw std::runtime_error("Allocator for ValidityBuffer Failed! "); @@ -352,16 +393,15 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ } // 计算并填充数据 - auto src_addr = const_cast((uint8_t *)( - reinterpret_cast(omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vb.Get(col_idx))))); - std::fill(std::begin(partition_buffer_idx_offset_), - std::end(partition_buffer_idx_offset_), 0); - const auto num_rows = vb.GetRowCount(); - for (auto row = 0; row < num_rows; ++row) { - auto pid = partition_id_[row]; - auto dst_offset = partition_buffer_idx_base_[pid] + partition_buffer_idx_offset_[pid]; - dst_addrs[pid][dst_offset] = omniruntime::BitUtil::IsBitSet(src_addr, row); - partition_buffer_idx_offset_[pid]++; + auto src_addr = unsafe::UnsafeBaseVector::GetNulls(vb.Get(col_idx)); + for (auto &pid: partition_used_) { + auto dstPidBase = dst_addrs[pid] + partition_buffer_idx_base_[pid]; + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + for (; pos < end; ++pos) { + auto rowId = row_offset_row_id_[pos]; + *dstPidBase++ = omniruntime::BitUtil::IsBitSet(src_addr, rowId); + } } } } @@ -415,13 +455,6 @@ int Splitter::CacheVectorBatch(int32_t partition_id, bool reset_buffers) { } int Splitter::DoSplit(VectorBatch& vb) { - // for the first input record batch, scan binary arrays and large binary - // arrays to get their empirical sizes - - if (!first_vector_batch_) { - first_vector_batch_ = true; - } - // prepare partition buffers and spill if necessary for (auto pid = 0; pid < num_partitions_; ++pid) { if (fixed_width_array_idx_.size() > 0 && @@ -436,6 +469,8 @@ int Splitter::DoSplit(VectorBatch& vb) { } } } + BuildPartition2Row(vb.GetRowCount()); + SplitFixedWidthValueBuffer(vb); SplitFixedWidthValidityBuffer(vb); @@ -527,26 +562,29 @@ void Splitter::ToSplitterTypeId(int num_cols) void Splitter::CastOmniToShuffleType(DataTypeId omniType, ShuffleTypeId shuffleType) { - vector_batch_col_types_.push_back(omniType); + proto_col_types_.push_back(CastOmniTypeIdToProtoVecType(omniType)); column_type_id_.push_back(shuffleType); } int Splitter::Split_Init(){ num_row_splited_ = 0; cached_vectorbatch_size_ = 0; - partition_id_cnt_cur_.resize(num_partitions_); - partition_id_cnt_cache_.resize(num_partitions_); - partition_buffer_size_.resize(num_partitions_); - partition_buffer_idx_base_.resize(num_partitions_); - partition_buffer_idx_offset_.resize(num_partitions_); + + partition_id_cnt_cur_ = new int32_t[num_partitions_](); + partition_id_cnt_cache_ = new uint64_t[num_partitions_](); + partition_buffer_size_ = new int32_t[num_partitions_](); + partition_buffer_idx_base_ = new int32_t[num_partitions_](); + partition_buffer_idx_offset_ = new int32_t[num_partitions_](); + partition_serialization_size_ = new uint32_t[num_partitions_](); + partition_cached_vectorbatch_.resize(num_partitions_); - partition_serialization_size_.resize(num_partitions_); fixed_width_array_idx_.clear(); partition_lengths_.resize(num_partitions_); - fixed_valueBuffer_size_.resize(num_partitions_); - fixed_nullBuffer_size_.resize(num_partitions_); - //obtain configed dir from Environment Variables + fixed_valueBuffer_size_ = new uint32_t[num_partitions_](); + fixed_nullBuffer_size_ = new uint32_t[num_partitions_](); + + // obtain configed dir from Environment Variables configured_dirs_ = GetConfiguredLocalDirs(); sub_dir_selection_.assign(configured_dirs_.size(), 0); @@ -590,19 +628,102 @@ int Splitter::Split_Init(){ for (auto i = 0; i < num_partitions_; ++i) { vc_partition_array_buffers_[i].resize(column_type_id_.size()); } + partition_arena_.resize(num_partitions_); + partition_row_batch.resize(num_partitions_); + partition_row_batch_count.resize(num_partitions_); + std::fill(partition_row_batch_count.begin(), partition_row_batch_count.end(), 0); + partition_rows.resize(num_partitions_); return 0; } int Splitter::Split(VectorBatch& vb ) { - //计算vectorBatch分区信息 + // 计算vectorBatch分区信息 LogsTrace(" split vb row number: %d ", vb.GetRowCount()); TIME_NANO_OR_RAISE(total_compute_pid_time_, ComputeAndCountPartitionId(vb)); - //执行分区动作 + // 执行分区动作 DoSplit(vb); return 0; } +int Splitter::SplitByRow(VectorBatch *vecBatch) { + int32_t rowCount = vecBatch->GetRowCount(); + for (int pid = 0; pid < num_partitions_; ++pid) { + auto needCapacity = partition_rows[pid].size() + rowCount; + if (partition_rows[pid].capacity() < needCapacity) { + auto prepareCapacity = partition_rows[pid].capacity() * expansion; + auto newCapacity = prepareCapacity > needCapacity ? prepareCapacity : needCapacity; + partition_rows[pid].reserve(newCapacity); + } + } + + if (singlePartitionFlag) { + RowBatch *rowBatch = VectorHelper::TransRowBatchFromVectorBatch(vecBatch); + for (int i = 0; i < rowCount; ++i) { + RowInfo *rowInfo = rowBatch->Get(i); + partition_rows[0].emplace_back(rowInfo); + total_input_size += rowInfo->length; + } + } else { + auto pidVec = reinterpret_cast *>(vecBatch->Get(0)); + auto tmpVectorBatch = new VectorBatch(rowCount); + partition_id_.resize(rowCount); + memset_s(partition_id_cnt_cur_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); + for (int i = 0; i < rowCount; ++i) { + auto pid = pidVec->GetValue(i); + if (pid >= num_partitions_) { + LogsError(" Illegal pid Value: %d >= partition number %d .", pid, num_partitions_); + throw std::runtime_error("Shuffle pidVec Illegal pid Value!"); + } + partition_id_[i] = pid; + partition_id_cnt_cur_[pid]++; + } + BuildPartition2Row(rowCount); + for (int i = 1; i < vecBatch->GetVectorCount(); ++i) { + tmpVectorBatch->Append(vecBatch->Get(i)); + } + vecBatch->ResizeVectorCount(1); + std::vector typeIds; + std::vector encodings; + int32_t vecCount = tmpVectorBatch->GetVectorCount(); + for (int i = 0; i < vecCount; i++) { + typeIds.push_back(tmpVectorBatch->Get(i)->GetTypeId()); + encodings.push_back(tmpVectorBatch->Get(i)->GetEncoding()); + } + auto rowBuffer = std::make_unique(typeIds, encodings, typeIds.size() - 1); + + for (auto &pid: partition_used_) { + auto pos = partition_row_offset_base_[pid]; + auto end = partition_row_offset_base_[pid + 1]; + for (; pos < end; ++pos) { + rowBuffer->TransValueFromVectorBatch(tmpVectorBatch, static_cast(row_offset_row_id_[pos])); + auto oneRowLen = rowBuffer->FillBuffer(partition_arena_[pid]); + partition_rows[pid].emplace_back(new RowInfo(rowBuffer->TakeRowBuffer(), oneRowLen)); + total_input_size += oneRowLen; + } + } + + delete vecBatch; + delete tmpVectorBatch; + } + + // spill + // process level: If the memory usage of the current executor exceeds the threshold, spill is triggered. + if (num_row_splited_ >= SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD) { + TIME_NANO_OR_RAISE(total_spill_time_, SpillToTmpFileByRow()); + total_input_size = 0; + isSpill = true; + } + + // task level: If the memory usage of the current task exceeds the threshold, spill is triggered. + if (cached_vectorbatch_size_ + current_fixed_alloc_buffer_size_ >= options_.spill_mem_threshold) { + TIME_NANO_OR_RAISE(total_spill_time_, SpillToTmpFileByRow()); + total_input_size = 0; + isSpill = true; + } + return 0; +} + std::shared_ptr Splitter::CaculateSpilledTmpFilePartitionOffsets() { void *ptr_tmp = static_cast(options_.allocator->Alloc((num_partitions_ + 1) * sizeof(uint64_t))); if (nullptr == ptr_tmp) { @@ -622,8 +743,8 @@ std::shared_ptr Splitter::CaculateSpilledTmpFilePartitionOffsets() { return ptrPartitionOffsets; } -spark::VecType::VecTypeId CastShuffleTypeIdToVecType(int32_t tmpType) { - switch (tmpType) { +spark::VecType::VecTypeId Splitter::CastOmniTypeIdToProtoVecType(int32_t omniType) { + switch (omniType) { case OMNI_NONE: return spark::VecType::VEC_TYPE_NONE; case OMNI_INT: @@ -663,7 +784,7 @@ spark::VecType::VecTypeId CastShuffleTypeIdToVecType(int32_t tmpType) { case DataTypeId::OMNI_INVALID: return spark::VecType::VEC_TYPE_INVALID; default: { - throw std::runtime_error("castShuffleTypeIdToVecType() unexpected ShuffleTypeId"); + throw std::runtime_error("CastOmniTypeIdToProtoVecType() unexpected OmniTypeId"); } } }; @@ -815,13 +936,13 @@ int32_t Splitter::ProtoWritePartition(int32_t partition_id, std::unique_ptrset_veccnt(column_type_id_.size()); int fixColIndexTmp = 0; for (size_t indexSchema = 0; indexSchema < column_type_id_.size(); indexSchema++) { - spark::Vec * vec = vecBatchProto->add_vecs(); + spark::Vec *vec = vecBatchProto->add_vecs(); switch (column_type_id_[indexSchema]) { case ShuffleTypeId::SHUFFLE_1BYTE: case ShuffleTypeId::SHUFFLE_2BYTE: case ShuffleTypeId::SHUFFLE_4BYTE: case ShuffleTypeId::SHUFFLE_8BYTE: - case ShuffleTypeId::SHUFFLE_DECIMAL128:{ + case ShuffleTypeId::SHUFFLE_DECIMAL128: { SerializingFixedColumns(partition_id, *vec, fixColIndexTmp, &splitRowInfoTmp); fixColIndexTmp++; // 定长序列化数量++ break; @@ -835,13 +956,13 @@ int32_t Splitter::ProtoWritePartition(int32_t partition_id, std::unique_ptrmutable_vectype(); - vt->set_typeid_(CastShuffleTypeIdToVecType(vector_batch_col_types_[indexSchema])); - LogsDebug("precision[indexSchema %d]: %d , scale[indexSchema %d]: %d ", - indexSchema, input_col_types.inputDataPrecisions[indexSchema], - indexSchema, input_col_types.inputDataScales[indexSchema]); + vt->set_typeid_(proto_col_types_[indexSchema]); if(vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL128 || vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL64){ vt->set_precision(input_col_types.inputDataPrecisions[indexSchema]); vt->set_scale(input_col_types.inputDataScales[indexSchema]); + LogsDebug("precision[indexSchema %d]: %d , scale[indexSchema %d]: %d ", + indexSchema, input_col_types.inputDataPrecisions[indexSchema], + indexSchema, input_col_types.inputDataScales[indexSchema]); } } curBatch++; @@ -852,7 +973,7 @@ int32_t Splitter::ProtoWritePartition(int32_t partition_id, std::unique_ptr(vecBatchProto->ByteSizeLong())); if (bufferStream->Next(&bufferOut, &sizeOut)) { memcpy_s(bufferOut, sizeof(vecBatchProtoSize), &vecBatchProtoSize, sizeof(vecBatchProtoSize)); - if (sizeof(vecBatchProtoSize) < sizeOut) { + if (sizeof(vecBatchProtoSize) < static_cast(sizeOut)) { bufferStream->BackUp(sizeOut - sizeof(vecBatchProtoSize)); } } @@ -875,7 +996,73 @@ int32_t Splitter::ProtoWritePartition(int32_t partition_id, std::unique_ptr &bufferStream, void *bufferOut, int32_t &sizeOut) { + uint64_t rowCount = partition_rows[partition_id].size(); + uint64_t onceCopyRow = 0; + uint32_t batchCount = 0; + while (0 < rowCount) { + if (options_.spill_batch_row_num < rowCount) { + onceCopyRow = options_.spill_batch_row_num; + } else { + onceCopyRow = rowCount; + } + + protoRowBatch->set_rowcnt(onceCopyRow); + protoRowBatch->set_veccnt(proto_col_types_.size()); + for (uint32_t i = 0; i < proto_col_types_.size(); ++i) { + spark::VecType *vt = protoRowBatch->add_vectypes(); + vt->set_typeid_(proto_col_types_[i]); + if(vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL128 || vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL64){ + vt->set_precision(input_col_types.inputDataPrecisions[i]); + vt->set_scale(input_col_types.inputDataScales[i]); + LogsDebug("precision[indexSchema %d]: %d , scale[indexSchema %d]: %d ", + i, input_col_types.inputDataPrecisions[i], + i, input_col_types.inputDataScales[i]); + } + } + int64_t offset = batchCount * options_.spill_batch_row_num; + std::vector offset_vec(onceCopyRow + 1, 0); + auto rowInfoPtr = partition_rows[partition_id].data() + offset; + for (uint64_t i = 0; i < onceCopyRow; ++i) { + RowInfo *rowInfo = rowInfoPtr[i]; + offset_vec[i + 1] = offset_vec[i] + rowInfo->length; + } + std::string rows; + rows.reserve(offset_vec[onceCopyRow]); + for (uint64_t i = 0; i < onceCopyRow; ++i) { + RowInfo *rowInfo = rowInfoPtr[i]; + rows.append(reinterpret_cast(rowInfo->row), rowInfo->length); + } + protoRowBatch->set_rows(std::move(rows)); + protoRowBatch->set_offsets(reinterpret_cast(offset_vec.data()), onceCopyRow * sizeof(int32_t)); + + auto byteSizeLong = protoRowBatch->ByteSizeLong(); + if (byteSizeLong > UINT32_MAX) { + throw std::runtime_error("Unsafe static_cast long to uint_32t."); + } + uint32_t protoRowBatchSize = reversebytes_uint32t(static_cast(byteSizeLong)); + if (bufferStream->Next(&bufferOut, &sizeOut)) { + memcpy_s(bufferOut, sizeof(protoRowBatchSize), &protoRowBatchSize, sizeof(protoRowBatchSize)); + if (sizeof(protoRowBatchSize) < static_cast(sizeOut)) { + bufferStream->BackUp(sizeOut - sizeof(protoRowBatchSize)); + } + } + + protoRowBatch->SerializeToZeroCopyStream(bufferStream.get()); + rowCount -= onceCopyRow; + batchCount++; + protoRowBatch->Clear(); + } + partition_arena_[partition_id].Reset(); + uint64_t partitionBatchSize = bufferStream->flush(); + total_bytes_written_ += partitionBatchSize; + partition_lengths_[partition_id] += partitionBatchSize; + partition_rows[partition_id].clear(); + LogsDebug(" partitionBatch write length: %lu", partitionBatchSize); + return 0; } int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptr &bufferStream) { @@ -901,13 +1088,13 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptrset_veccnt(column_type_id_.size()); int fixColIndexTmp = 0; for (size_t indexSchema = 0; indexSchema < column_type_id_.size(); indexSchema++) { - spark::Vec * vec = vecBatchProto->add_vecs(); + spark::Vec *vec = vecBatchProto->add_vecs(); switch (column_type_id_[indexSchema]) { case ShuffleTypeId::SHUFFLE_1BYTE: case ShuffleTypeId::SHUFFLE_2BYTE: case ShuffleTypeId::SHUFFLE_4BYTE: case ShuffleTypeId::SHUFFLE_8BYTE: - case ShuffleTypeId::SHUFFLE_DECIMAL128:{ + case ShuffleTypeId::SHUFFLE_DECIMAL128: { SerializingFixedColumns(partition_id, *vec, fixColIndexTmp, &splitRowInfoTmp); fixColIndexTmp++; // 定长序列化数量++ break; @@ -921,13 +1108,13 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptrmutable_vectype(); - vt->set_typeid_(CastShuffleTypeIdToVecType(vector_batch_col_types_[indexSchema])); - LogsDebug("precision[indexSchema %d]: %d , scale[indexSchema %d]: %d ", - indexSchema, input_col_types.inputDataPrecisions[indexSchema], - indexSchema, input_col_types.inputDataScales[indexSchema]); + vt->set_typeid_(proto_col_types_[indexSchema]); if(vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL128 || vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL64){ vt->set_precision(input_col_types.inputDataPrecisions[indexSchema]); vt->set_scale(input_col_types.inputDataScales[indexSchema]); + LogsDebug("precision[indexSchema %d]: %d , scale[indexSchema %d]: %d ", + indexSchema, input_col_types.inputDataPrecisions[indexSchema], + indexSchema, input_col_types.inputDataScales[indexSchema]); } } curBatch++; @@ -967,6 +1154,77 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptr &bufferStream) { + uint64_t rowCount = partition_rows[partition_id].size(); + total_spill_row_num_ += rowCount; + + uint64_t onceCopyRow = 0; + uint32_t batchCount = 0; + while (0 < rowCount) { + if (options_.spill_batch_row_num < rowCount) { + onceCopyRow = options_.spill_batch_row_num; + } else { + onceCopyRow = rowCount; + } + + protoRowBatch->set_rowcnt(onceCopyRow); + protoRowBatch->set_veccnt(proto_col_types_.size()); + for (uint32_t i = 0; i < proto_col_types_.size(); ++i) { + spark::VecType *vt = protoRowBatch->add_vectypes(); + vt->set_typeid_(proto_col_types_[i]); + if(vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL128 || vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL64){ + vt->set_precision(input_col_types.inputDataPrecisions[i]); + vt->set_scale(input_col_types.inputDataScales[i]); + LogsDebug("precision[indexSchema %d]: %d , scale[indexSchema %d]: %d ", + i, input_col_types.inputDataPrecisions[i], + i, input_col_types.inputDataScales[i]); + } + } + + int64_t offset = batchCount * options_.spill_batch_row_num; + std::vector offset_vec(onceCopyRow + 1, 0); + auto rowInfoPtr = partition_rows[partition_id].data() + offset; + for (uint64_t i = 0; i < onceCopyRow; ++i) { + RowInfo *rowInfo = rowInfoPtr[i]; + offset_vec[i + 1] = offset_vec[i] + rowInfo->length; + } + std::string rows; + rows.reserve(offset_vec[onceCopyRow]); + for (uint64_t i = 0; i < onceCopyRow; ++i) { + RowInfo *rowInfo = rowInfoPtr[i]; + rows.append(reinterpret_cast(rowInfo->row), rowInfo->length); + } + protoRowBatch->set_rows(std::move(rows)); + protoRowBatch->set_offsets(reinterpret_cast(offset_vec.data()), onceCopyRow * sizeof(int32_t)); + + auto byteSizeLong = protoRowBatch->ByteSizeLong(); + if (byteSizeLong > UINT32_MAX) { + throw std::runtime_error("Unsafe static_cast long to uint_32t."); + } + uint32_t protoRowBatchSize = reversebytes_uint32t(static_cast(byteSizeLong)); + void *buffer = nullptr; + if (!bufferStream->NextNBytes(&buffer, sizeof(protoRowBatchSize))) { + throw std::runtime_error("Allocate Memory Failed: Flush Spilled Data, Next failed."); + } + // set serizalized bytes to stream + memcpy_s(buffer, sizeof(protoRowBatchSize), &protoRowBatchSize, sizeof(protoRowBatchSize)); + LogsDebug(" A Slice Of vecBatchProtoSize: %d ", reversebytes_uint32t(protoRowBatchSize)); + + protoRowBatch->SerializeToZeroCopyStream(bufferStream.get()); + rowCount -= onceCopyRow; + batchCount++; + protoRowBatch->Clear(); + } + partition_arena_[partition_id].Reset(); + + uint64_t partitionBatchSize = bufferStream->flush(); + total_bytes_spilled_ += partitionBatchSize; + partition_serialization_size_[partition_id] = partitionBatchSize; + partition_rows[partition_id].clear(); + LogsDebug(" partitionBatch write length: %lu", partitionBatchSize); + return 0; +} + int Splitter::WriteDataFileProto() { LogsDebug(" spill DataFile: %s ", (options_.next_spilled_file_dir + ".data").c_str()); std::unique_ptr outStream = writeLocalFile(options_.next_spilled_file_dir + ".data"); @@ -979,7 +1237,23 @@ int Splitter::WriteDataFileProto() { for (auto pid = 0; pid < num_partitions_; ++pid) { protoSpillPartition(pid, bufferStream); } - std::fill(std::begin(partition_id_cnt_cache_), std::end(partition_id_cnt_cache_), 0); + memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); + outStream->close(); + return 0; +} + +int Splitter::WriteDataFileProtoByRow() { + LogsDebug(" spill DataFile: %s ", (options_.next_spilled_file_dir + ".data").c_str()); + std::unique_ptr outStream = writeLocalFile(options_.next_spilled_file_dir + ".data"); + WriterOptions options; + // tmp spilled file no need compression + options.setCompression(CompressionKind_NONE); + std::unique_ptr streamsFactory = createStreamsFactory(options, outStream.get()); + std::unique_ptr bufferStream = streamsFactory->createStream(); + // 顺序写入每个partition的offset + for (auto pid = 0; pid < num_partitions_; ++pid) { + protoSpillPartitionByRow(pid, bufferStream); + } outStream->close(); return 0; } @@ -987,7 +1261,7 @@ int Splitter::WriteDataFileProto() { void Splitter::MergeSpilled() { for (auto pid = 0; pid < num_partitions_; ++pid) { CacheVectorBatch(pid, true); - partition_buffer_size_[pid] = 0; //溢写之后将其清零,条件溢写需要重新分配内存 + partition_buffer_size_[pid] = 0; // 溢写之后将其清零,条件溢写需要重新分配内存 } std::unique_ptr outStream = writeLocalFile(options_.data_file); @@ -997,13 +1271,13 @@ void Splitter::MergeSpilled() { options.setCompressionBlockSize(options_.compress_block_size); options.setCompressionStrategy(CompressionStrategy_COMPRESSION); std::unique_ptr streamsFactory = createStreamsFactory(options, outStream.get()); - std::unique_ptr bufferOutPutStream = streamsFactory->createStream(); + std::unique_ptr bufferOutPutStream = streamsFactory->createStream(); void* bufferOut = nullptr; int sizeOut = 0; for (int pid = 0; pid < num_partitions_; pid++) { ProtoWritePartition(pid, bufferOutPutStream, bufferOut, sizeOut); - LogsDebug(" MergeSplled traversal partition( %d ) ",pid); + LogsDebug(" MergeSpilled traversal partition( %d ) ", pid); for (auto &pair : spilled_tmp_files_info_) { auto tmpDataFilePath = pair.first + ".data"; auto tmpPartitionOffset = reinterpret_cast(pair.second->data_)[pid]; @@ -1015,11 +1289,11 @@ void Splitter::MergeSpilled() { uint64_t seekPosit = tmpPartitionOffset; uint64_t onceReadLen = 0; while ((targetLen > 0) && bufferOutPutStream->Next(&bufferOut, &sizeOut)) { - onceReadLen = targetLen > sizeOut ? sizeOut : targetLen; + onceReadLen = targetLen > static_cast(sizeOut) ? sizeOut : targetLen; inputStream->read(bufferOut, onceReadLen, seekPosit); targetLen -= onceReadLen; seekPosit += onceReadLen; - if (onceReadLen < sizeOut) { + if (onceReadLen < static_cast(sizeOut)) { // Reached END. bufferOutPutStream->BackUp(sizeOut - onceReadLen); break; @@ -1033,17 +1307,63 @@ void Splitter::MergeSpilled() { } } - std::fill(std::begin(partition_id_cnt_cache_), std::end(partition_id_cnt_cache_), 0); + memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); ReleaseVarcharVector(); num_row_splited_ = 0; cached_vectorbatch_size_ = 0; outStream->close(); } +void Splitter::MergeSpilledByRow() { + std::unique_ptr outStream = writeLocalFile(options_.data_file); + LogsDebug(" Merge Spilled Tmp File: %s ", options_.data_file.c_str()); + WriterOptions options; + options.setCompression(options_.compression_type); + options.setCompressionBlockSize(options_.compress_block_size); + options.setCompressionStrategy(CompressionStrategy_COMPRESSION); + std::unique_ptr streamsFactory = createStreamsFactory(options, outStream.get()); + std::unique_ptr bufferOutPutStream = streamsFactory->createStream(); + + void* bufferOut = nullptr; + int sizeOut = 0; + for (int pid = 0; pid < num_partitions_; pid++) { + ProtoWritePartitionByRow(pid, bufferOutPutStream, bufferOut, sizeOut); + LogsDebug(" MergeSpilled traversal partition( %d ) ", pid); + for (auto &pair : spilled_tmp_files_info_) { + auto tmpDataFilePath = pair.first + ".data"; + auto tmpPartitionOffset = reinterpret_cast(pair.second->data_)[pid]; + auto tmpPartitionSize = reinterpret_cast(pair.second->data_)[pid + 1] - reinterpret_cast(pair.second->data_)[pid]; + LogsDebug(" get Partition Stream...tmpPartitionOffset %d tmpPartitionSize %d path %s", + tmpPartitionOffset, tmpPartitionSize, tmpDataFilePath.c_str()); + std::unique_ptr inputStream = readLocalFile(tmpDataFilePath); + uint64_t targetLen = tmpPartitionSize; + uint64_t seekPosit = tmpPartitionOffset; + uint64_t onceReadLen = 0; + while ((targetLen > 0) && bufferOutPutStream->Next(&bufferOut, &sizeOut)) { + onceReadLen = targetLen > static_cast(sizeOut) ? sizeOut : targetLen; + inputStream->read(bufferOut, onceReadLen, seekPosit); + targetLen -= onceReadLen; + seekPosit += onceReadLen; + if (onceReadLen < static_cast(sizeOut)) { + // Reached END. + bufferOutPutStream->BackUp(sizeOut - onceReadLen); + break; + } + } + + uint64_t flushSize = bufferOutPutStream->flush(); + total_bytes_written_ += flushSize; + LogsDebug(" Merge Flush Partition[%d] flushSize: %ld ", pid, flushSize); + partition_lengths_[pid] += flushSize; + } + } + outStream->close(); +} + void Splitter::WriteSplit() { for (auto pid = 0; pid < num_partitions_; ++pid) { CacheVectorBatch(pid, true); - partition_buffer_size_[pid] = 0; //溢写之后将其清零,条件溢写需要重新分配内存 + partition_buffer_size_[pid] = 0; // 溢写之后将其清零,条件溢写需要重新分配内存 } std::unique_ptr outStream = writeLocalFile(options_.data_file); @@ -1052,21 +1372,38 @@ void Splitter::WriteSplit() { options.setCompressionBlockSize(options_.compress_block_size); options.setCompressionStrategy(CompressionStrategy_COMPRESSION); std::unique_ptr streamsFactory = createStreamsFactory(options, outStream.get()); - std::unique_ptr bufferOutPutStream = streamsFactory->createStream(); + std::unique_ptr bufferOutPutStream = streamsFactory->createStream(); void* bufferOut = nullptr; int32_t sizeOut = 0; - for (auto pid = 0; pid < num_partitions_; ++ pid) { + for (auto pid = 0; pid < num_partitions_; ++pid) { ProtoWritePartition(pid, bufferOutPutStream, bufferOut, sizeOut); } - std::fill(std::begin(partition_id_cnt_cache_), std::end(partition_id_cnt_cache_), 0); + memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); ReleaseVarcharVector(); num_row_splited_ = 0; cached_vectorbatch_size_ = 0; outStream->close(); } +void Splitter::WriteSplitByRow() { + std::unique_ptr outStream = writeLocalFile(options_.data_file); + WriterOptions options; + options.setCompression(options_.compression_type); + options.setCompressionBlockSize(options_.compress_block_size); + options.setCompressionStrategy(CompressionStrategy_COMPRESSION); + std::unique_ptr streamsFactory = createStreamsFactory(options, outStream.get()); + std::unique_ptr bufferOutPutStream = streamsFactory->createStream(); + + void* bufferOut = nullptr; + int32_t sizeOut = 0; + for (auto pid = 0; pid < num_partitions_; ++pid) { + ProtoWritePartitionByRow(pid, bufferOutPutStream, bufferOut, sizeOut); + } + outStream->close(); +} + int Splitter::DeleteSpilledTmpFile() { for (auto &pair : spilled_tmp_files_info_) { auto tmpDataFilePath = pair.first + ".data"; @@ -1085,7 +1422,7 @@ int Splitter::DeleteSpilledTmpFile() { int Splitter::SpillToTmpFile() { for (auto pid = 0; pid < num_partitions_; ++pid) { CacheVectorBatch(pid, true); - partition_buffer_size_[pid] = 0; //溢写之后将其清零,条件溢写需要重新分配内存 + partition_buffer_size_[pid] = 0; // 溢写之后将其清零,条件溢写需要重新分配内存 } options_.next_spilled_file_dir = CreateTempShuffleFile(NextSpilledFileDir()); @@ -1098,12 +1435,20 @@ int Splitter::SpillToTmpFile() { return 0; } +int Splitter::SpillToTmpFileByRow() { + options_.next_spilled_file_dir = CreateTempShuffleFile(NextSpilledFileDir()); + WriteDataFileProtoByRow(); + std::shared_ptr ptrTmp = CaculateSpilledTmpFilePartitionOffsets(); + spilled_tmp_files_info_[options_.next_spilled_file_dir] = ptrTmp; + return 0; +} + Splitter::Splitter(InputDataTypes inputDataTypes, int32_t num_cols, int32_t num_partitions, SplitOptions options, bool flag) - : input_col_types(inputDataTypes), - singlePartitionFlag(flag), + : singlePartitionFlag(flag), num_partitions_(num_partitions), options_(std::move(options)), - num_fields_(num_cols) + num_fields_(num_cols), + input_col_types(inputDataTypes) { LogsDebug("Input Schema colNum: %d", num_cols); ToSplitterTypeId(num_cols); @@ -1158,3 +1503,17 @@ int Splitter::Stop() { } return 0; } + +int Splitter::StopByRow() { + if (isSpill) { + TIME_NANO_OR_RAISE(total_write_time_, MergeSpilledByRow()); + TIME_NANO_OR_RAISE(total_write_time_, DeleteSpilledTmpFile()); + LogsDebug(" Spill For Splitter Stopped. total_spill_row_num_: %ld ", total_spill_row_num_); + } else { + TIME_NANO_OR_RAISE(total_write_time_, WriteSplitByRow()); + } + if (nullptr == protoRowBatch) { + throw std::runtime_error("delete nullptr error for free protobuf rowBatch memory"); + } + return 0; +} diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h index b0a6386dcbe205334070905e4d636196a4bf13f3..baad48877f213d21487315b5fb7bd430304e7645 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h @@ -35,6 +35,7 @@ #include "../common/common.h" #include "vec_data.pb.h" #include "google/protobuf/io/zero_copy_stream_impl.h" +#include "vector/omni_row.h" using namespace std; using namespace spark; @@ -51,13 +52,16 @@ struct SplitRowInfo { }; class Splitter { - virtual int DoSplit(VectorBatch& vb); int WriteDataFileProto(); + int WriteDataFileProtoByRow(); + std::shared_ptr CaculateSpilledTmpFilePartitionOffsets(); + spark::VecType::VecTypeId CastOmniTypeIdToProtoVecType(int32_t omniType); + void SerializingFixedColumns(int32_t partitionId, spark::Vec& vec, int fixColIndexTmp, @@ -70,8 +74,12 @@ class Splitter { int protoSpillPartition(int32_t partition_id, std::unique_ptr &bufferStream); + int protoSpillPartitionByRow(int32_t partition_id, std::unique_ptr &bufferStream); + int32_t ProtoWritePartition(int32_t partition_id, std::unique_ptr &bufferStream, void *bufferOut, int32_t &sizeOut); + int32_t ProtoWritePartitionByRow(int32_t partition_id, std::unique_ptr &bufferStream, void *bufferOut, int32_t &sizeOut); + int ComputeAndCountPartitionId(VectorBatch& vb); int AllocatePartitionBuffers(int32_t partition_id, int32_t new_size); @@ -93,38 +101,53 @@ class Splitter { void MergeSpilled(); + void MergeSpilledByRow(); + void WriteSplit(); + void WriteSplitByRow(); + + // Common structures for row formats and col formats bool isSpill = false; + int64_t total_bytes_written_ = 0; + int64_t total_bytes_spilled_ = 0; + int64_t total_write_time_ = 0; + int64_t total_spill_time_ = 0; + int64_t total_spill_row_num_ = 0; + + // configured local dirs for spilled file + int32_t dir_selection_ = 0; + std::vector sub_dir_selection_; + std::vector configured_dirs_; + + // Data structures required to handle col formats + int64_t total_compute_pid_time_ = 0; + std::vector partition_lengths_; std::vector partition_id_; // 记录当前vb每一行的pid - std::vector partition_id_cnt_cur_; // 统计不同partition记录的行数(当前处理中的vb) - std::vector partition_id_cnt_cache_; // 统计不同partition记录的行数,cache住的 + int32_t *partition_id_cnt_cur_; // 统计不同partition记录的行数(当前处理中的vb) + uint64_t *partition_id_cnt_cache_; // 统计不同partition记录的行数,cache住的 + std::vector row_offset_row_id_; + std::vector partition_used_; + std::vector partition_row_offset_base_; + std::vector partition_arena_; // column number uint32_t num_row_splited_; // cached row number uint64_t cached_vectorbatch_size_; // cache total vectorbatch size in bytes uint64_t current_fixed_alloc_buffer_size_ = 0; - std::vector fixed_valueBuffer_size_; // 当前定长omniAlloc已经分配value内存大小byte - std::vector fixed_nullBuffer_size_; // 当前定长omniAlloc已分配null内存大小byte + uint32_t *fixed_valueBuffer_size_; // 当前定长omniAlloc已经分配value内存大小byte + uint32_t *fixed_nullBuffer_size_; // 当前定长omniAlloc已分配null内存大小byte // int32_t num_cache_vector_; std::vector column_type_id_; // 各列映射SHUFFLE类型,schema列id序列 std::vector> partition_fixed_width_validity_addrs_; std::vector> partition_fixed_width_value_addrs_; // std::vector>>> partition_fixed_width_buffers_; std::vector>> partition_binary_builders_; - std::vector partition_buffer_size_; // 各分区的buffer大小 std::vector fixed_width_array_idx_; // 记录各定长类型列的序号,VB 列id序列 std::vector binary_array_idx_; //记录各变长类型列序号 - std::vector partition_buffer_idx_base_; //当前已缓存的各partition行数据记录,用于定位缓冲buffer当前可用位置 - std::vector partition_buffer_idx_offset_; //split定长列时用于统计offset的临时变量 - std::vector partition_serialization_size_; // 记录序列化后的各partition大小,用于stop返回partition偏移 in bytes - - std::vector input_fixed_width_has_null_; // 定长列是否含有null标志数组 - - // configured local dirs for spilled file - int32_t dir_selection_ = 0; - std::vector sub_dir_selection_; - std::vector configured_dirs_; - + int32_t *partition_buffer_size_; // 各分区的buffer大小 + int32_t *partition_buffer_idx_base_; //当前已缓存的各partition行数据记录,用于定位缓冲buffer当前可用位置 + int32_t *partition_buffer_idx_offset_; //split定长列时用于统计offset的临时变量 + uint32_t *partition_serialization_size_; // 记录序列化后的各partition大小,用于stop返回partition偏移 in bytes std::vector>>>> partition_cached_vectorbatch_; /* * varchar buffers: @@ -132,16 +155,20 @@ class Splitter { * */ std::vector>> vc_partition_array_buffers_; + spark::VecBatch *vecBatchProto = new VecBatch(); // protobuf 序列化对象结构 - int64_t total_bytes_written_ = 0; - int64_t total_bytes_spilled_ = 0; - int64_t total_write_time_ = 0; - int64_t total_spill_time_ = 0; - int64_t total_compute_pid_time_ = 0; - int64_t total_spill_row_num_ = 0; - std::vector partition_lengths_; + // Data structures required to handle row formats + std::vector> partition_rows; // pid : std::vector + RowBatch *array_partition_rows; + std::vector> partition_row_batch; + std::vector partition_row_batch_count; + uint64_t total_input_size = 0; // total row size in bytes + uint32_t expansion = 2; // expansion coefficient + spark::ProtoRowBatch *protoRowBatch = new ProtoRowBatch(); private: + void BuildPartition2Row(int32_t row_count); + void ReleaseVarcharVector() { std::set::iterator it; @@ -169,31 +196,34 @@ private: delete vb; } + // Data structures required to handle col formats std::set varcharVectorCache; - bool first_vector_batch_ = false; - std::vector vector_batch_col_types_; - InputDataTypes input_col_types; - std::vector binary_array_empirical_size_; - omniruntime::vec::VectorBatch *inputVecBatch = nullptr; public: + // Common structures for row formats and col formats bool singlePartitionFlag = false; int32_t num_partitions_; SplitOptions options_; // 分区数 int32_t num_fields_; - + InputDataTypes input_col_types; + std::vector proto_col_types_; // Avoid repeated type conversion during the split process. + omniruntime::vec::VectorBatch *inputVecBatch = nullptr; std::map> spilled_tmp_files_info_; - spark::VecBatch *vecBatchProto = new VecBatch(); // protobuf 序列化对象结构 - virtual int Split_Init(); virtual int Split(VectorBatch& vb); + virtual int SplitByRow(VectorBatch* vb); + int Stop(); + int StopByRow(); + int SpillToTmpFile(); + int SpillToTmpFileByRow(); + Splitter(InputDataTypes inputDataTypes, int32_t num_cols, int32_t num_partitions, @@ -225,7 +255,16 @@ public: virtual ~Splitter() { - delete vecBatchProto; //free protobuf vecBatch memory + delete vecBatchProto; //free protobuf vecBatch memory + delete protoRowBatch; //free protobuf rowBatch memory + delete[] partition_id_cnt_cur_; + delete[] partition_id_cnt_cache_; + delete[] partition_buffer_size_; + delete[] partition_buffer_idx_base_; + delete[] partition_buffer_idx_offset_; + delete[] partition_serialization_size_; + delete[] fixed_valueBuffer_size_; + delete[] fixed_nullBuffer_size_; partition_fixed_width_buffers_.clear(); partition_binary_builders_.clear(); partition_cached_vectorbatch_.clear(); diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/type.h b/omnioperator/omniop-spark-extension/cpp/src/shuffle/type.h index 04d90130dea30a83651fff3526c08dc0992f9928..4672fd39b0ca5d44a777a648e345e93ec8069de8 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/type.h +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/type.h @@ -28,6 +28,8 @@ using namespace omniruntime::mem; static constexpr int32_t kDefaultSplitterBufferSize = 4096; static constexpr int32_t kDefaultNumSubDirs = 64; +static constexpr double spillThreshold = 0.9; +static int64_t offHeapSize = MemoryManager::GetGlobalMemoryLimit(); struct SplitOptions { int32_t buffer_size = kDefaultSplitterBufferSize;