From 488db53da3f5f3581015f7311d429234c7b73c12 Mon Sep 17 00:00:00 2001 From: zengdeyong Date: Mon, 24 Apr 2023 20:31:59 +0800 Subject: [PATCH] shuffle merge file optimization --- .../cpp/src/shuffle/splitter.cpp | 29 ++++++++++++++----- .../cpp/src/shuffle/splitter.h | 2 +- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 2eba4b929..0476de44f 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -714,7 +714,7 @@ void Splitter::SerializingBinaryColumns(int32_t partitionId, spark::Vec& vec, in vec.set_offset(OffsetsByte.get(), (itemsTotalLen + 1) * sizeof(int32_t)); } -int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptr &bufferStream) { +uint64_t Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptr &bufferStream) { SplitRowInfo splitRowInfoTmp; splitRowInfoTmp.copyedRow = 0; splitRowInfoTmp.remainCopyRow = partition_id_cnt_cache_[partition_id]; @@ -773,12 +773,14 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptr(vecBatchProto->ByteSizeLong())); void *buffer = nullptr; - if (!bufferStream->NextNBytes(&buffer, sizeof(vecBatchProtoSize))) { + int32_t outSize = 0; + if (!bufferStream->Next(&buffer, &outSize)) { LogsError("Allocate Memory Failed: Flush Spilled Data, Next failed."); throw std::runtime_error("Allocate Memory Failed: Flush Spilled Data, Next failed."); } // set serizalized bytes to stream memcpy(buffer, &vecBatchProtoSize, sizeof(vecBatchProtoSize)); + bufferStream->BackUp(outSize - sizeof(vecBatchProtoSize)); LogsDebug(" A Slice Of vecBatchProtoSize: %d ", reversebytes_uint32t(vecBatchProtoSize)); vecBatchProto->SerializeToZeroCopyStream(bufferStream.get()); @@ -800,7 +802,7 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptr(pair.second->data_)[pid]; @@ -856,12 +859,24 @@ void Splitter::MergeSpilled() { } } - uint64_t flushSize = bufferOutPutStream->flush(); - total_bytes_written_ += flushSize; + flushSize += bufferOutPutStream->flush(); LogsDebug(" Merge Flush Partition[%d] flushSize: %ld ", pid, flushSize); - partition_lengths_[pid] += flushSize; } + + // flush partition data in memory + CacheVectorBatch(pid, true); + partition_buffer_size_[pid] = 0; //溢写之后将其清零,条件溢写需要重新分配内存 + + flushSize += protoSpillPartition(pid, bufferOutPutStream); + total_bytes_written_ += flushSize; + partition_lengths_[pid] += flushSize; } + + // release resource + ReleaseVarcharVector(); + num_row_splited_ = 0; + cached_vectorbatch_size_ = 0; + std::fill(std::begin(partition_id_cnt_cache_), std::end(partition_id_cnt_cache_), 0); outStream->close(); } @@ -948,7 +963,7 @@ std::string Splitter::NextSpilledFileDir() { } int Splitter::Stop() { - TIME_NANO_OR_RAISE(total_spill_time_, SpillToTmpFile()); +// TIME_NANO_OR_RAISE(total_spill_time_, SpillToTmpFile()); TIME_NANO_OR_RAISE(total_write_time_, MergeSpilled()); TIME_NANO_OR_RAISE(total_write_time_, DeleteSpilledTmpFile()); LogsDebug(" Spill For Splitter Stopped. total_spill_row_num_: %ld ", total_spill_row_num_); diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h index 0ef198996..ba95e8f34 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h @@ -69,7 +69,7 @@ class Splitter { int colIndex, int curBatch); - int protoSpillPartition(int32_t partition_id, std::unique_ptr &bufferStream); + uint64_t protoSpillPartition(int32_t partition_id, std::unique_ptr &bufferStream); int ComputeAndCountPartitionId(VectorBatch& vb); -- Gitee