From ad9f194084682ae4d8e1b6ec5f23bb5253bdb6c9 Mon Sep 17 00:00:00 2001 From: zhousipei Date: Thu, 1 Feb 2024 10:32:37 +0800 Subject: [PATCH 1/3] refactor TypeDecoder --- .../cpp/src/parquet/ParquetDecoder.h | 85 ++++--------------- .../src/parquet/ParquetTypedRecordReader.cpp | 4 +- 2 files changed, 17 insertions(+), 72 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h index a36c2e2ac..ae7f7bf2b 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h @@ -31,8 +31,18 @@ using namespace omniruntime::vec; using namespace arrow; namespace omniruntime::reader { + class Decoder { + public: + virtual ~Decoder() = default; + + virtual void SetData(int num_values, const uint8_t* data, int len) = 0; + + virtual int values_left() const = 0; + + virtual ::parquet::Encoding::type encoding() const = 0; + }; - class ParquetDecoderImpl : virtual public ::parquet::Decoder { + class ParquetDecoderImpl : virtual public Decoder { public: void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; @@ -78,7 +88,7 @@ namespace omniruntime::reader { } template - class ParquetTypedDecoder : virtual public ::parquet::TypedDecoder { + class ParquetTypedDecoder : virtual public Decoder { public: using T = typename DType::c_type; @@ -97,9 +107,7 @@ namespace omniruntime::reader { } } - int Decode(T* buffer, int num_values) override { - ::parquet::ParquetException::NYI("ParquetTypedDecoder for Decode"); - } + virtual int Decode(T* buffer, int max_values) = 0; virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrowNonNull"); @@ -172,31 +180,6 @@ namespace omniruntime::reader { return num_values; } - int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset) override { - num_values = std::min(num_values, num_values_); - if (num_values != idx_decoder_.GetBatchWithDictSpaced( - reinterpret_cast(dictionary_->data()), - dictionary_length_, buffer, num_values, null_count, valid_bits, - valid_bits_offset)) { - ::parquet::ParquetException::EofException(); - } - num_values_ -= num_values; - return num_values; - } - - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits::Accumulator* out) override { - ::parquet::ParquetException::NYI("DecodeArrow(Accumulator) for OmniDictDecoderImpl"); - } - - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits::DictAccumulator* out) override { - ::parquet::ParquetException::NYI("DecodeArrow(DictAccumulator) for OmniDictDecoderImpl"); - } - void InsertDictionary(::arrow::ArrayBuilder* builder) override { ::parquet::ParquetException::NYI("InsertDictionary ArrayBuilder"); } @@ -241,7 +224,7 @@ namespace omniruntime::reader { return Status::Invalid("Index not in dictionary bounds"); } - inline void DecodeDict(::parquet::TypedDecoder* dictionary) { + inline void DecodeDict(ParquetTypedDecoder* dictionary) { dictionary_length_ = static_cast(dictionary->values_left()); PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T), /*shrink_to_fit=*/false)); @@ -359,18 +342,6 @@ namespace omniruntime::reader { explicit ParquetPlainDecoder(const ::parquet::ColumnDescriptor* descr); int Decode(T* buffer, int max_values) override; - - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits::Accumulator* builder) override { - ::parquet::ParquetException::NYI("DecodeArrow(Accumulator) for ParquetPlainDecoder"); - } - - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits::DictAccumulator* builder) override { - ::parquet::ParquetException::NYI("DecodeArrow(DictAccumulator) for ParquetPlainDecoder"); - } }; template @@ -552,19 +523,6 @@ namespace omniruntime::reader { int Decode(uint8_t* buffer, int max_values) override; int Decode(bool* buffer, int max_values) override; - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits<::parquet::BooleanType>::Accumulator* out) override { - ::parquet::ParquetException::NYI("DecodeArrow for ParquetPlainBooleanDecoder"); - } - - int DecodeArrow( - int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits<::parquet::BooleanType>::DictAccumulator* builder) override { - ::parquet::ParquetException::NYI("DecodeArrow for ParquetPlainBooleanDecoder"); - } - private: std::unique_ptr<::arrow::bit_util::BitReader> bit_reader_; }; @@ -609,24 +567,11 @@ namespace omniruntime::reader { ::parquet::ParquetException::NYI("Decode(uint8_t*, int) for RleBooleanDecoder"); } - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits<::parquet::BooleanType>::Accumulator* out) override { - ::parquet::ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); - } - - int DecodeArrow( - int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits<::parquet::BooleanType>::DictAccumulator* builder) override { - ::parquet::ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); - } - private: std::shared_ptr<::arrow::util::RleDecoder> decoder_; }; - class ParquetPlainFLBADecoder : public ParquetPlainDecoder<::parquet::FLBAType>, virtual public ::parquet::FLBADecoder { + class ParquetPlainFLBADecoder : public ParquetPlainDecoder<::parquet::FLBAType> { public: using Base = ParquetPlainDecoder<::parquet::FLBAType>; using Base::ParquetPlainDecoder; diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp index 6251044a8..1f8cc7122 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp @@ -29,7 +29,7 @@ namespace omniruntime::reader { constexpr int32_t DECIMAL64_LEN = 8; -::parquet::Decoder* MakeOmniParquetDecoder(::parquet::Type::type type_num, ::parquet::Encoding::type encoding, +Decoder* MakeOmniParquetDecoder(::parquet::Type::type type_num, ::parquet::Encoding::type encoding, const ColumnDescriptor* descr) { if (encoding == ::parquet::Encoding::PLAIN) { switch (type_num) { @@ -61,7 +61,7 @@ constexpr int32_t DECIMAL64_LEN = 8; } -::parquet::Decoder* MakeOmniDictDecoder(::parquet::Type::type type_num, +Decoder* MakeOmniDictDecoder(::parquet::Type::type type_num, const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) { switch (type_num) { case ::parquet::Type::BOOLEAN: -- Gitee From 25a542054ad94f0858cb7f9298a5625a31523e90 Mon Sep 17 00:00:00 2001 From: zhousipei Date: Thu, 1 Feb 2024 10:40:28 +0800 Subject: [PATCH 2/3] refactor DictDecoder --- .../cpp/src/parquet/ParquetDecoder.cpp | 6 +- .../cpp/src/parquet/ParquetDecoder.h | 91 ++++++------------- .../src/parquet/ParquetTypedRecordReader.cpp | 8 +- 3 files changed, 36 insertions(+), 69 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp index b5c1d712d..80e8c12e1 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp @@ -61,12 +61,12 @@ namespace omniruntime::reader { } template <> - void ParquetDictDecoderImpl<::parquet::BooleanType>::SetDict(ParquetTypedDecoder<::parquet::BooleanType>* dictionary) { + void ParquetDictDecoder<::parquet::BooleanType>::SetDict(ParquetTypedDecoder<::parquet::BooleanType>* dictionary) { ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); } template <> - void ParquetDictDecoderImpl::SetDict(ParquetTypedDecoder* dictionary) { + void ParquetDictDecoder::SetDict(ParquetTypedDecoder* dictionary) { DecodeDict(dictionary); auto dict_values = reinterpret_cast(dictionary_->mutable_data()); @@ -95,7 +95,7 @@ namespace omniruntime::reader { } template <> - inline void ParquetDictDecoderImpl::SetDict(ParquetTypedDecoder* dictionary) { + inline void ParquetDictDecoder::SetDict(ParquetTypedDecoder* dictionary) { DecodeDict(dictionary); auto dict_values = reinterpret_cast(dictionary_->mutable_data()); diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h index ae7f7bf2b..e3d03beb2 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h @@ -109,42 +109,18 @@ namespace omniruntime::reader { virtual int Decode(T* buffer, int max_values) = 0; - virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { - ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrowNonNull"); - } + virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) = 0; virtual int DecodeArrow(int num_values, int null_count, bool* nulls, - int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { - ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrow"); - } + int64_t offset, omniruntime::vec::BaseVector** outBaseVec) = 0; }; - template - class ParquetDictDecoder : virtual public ParquetTypedDecoder { - public: - using T = typename DType::c_type; - - virtual void SetDict(ParquetTypedDecoder* dictionary) = 0; - - virtual void InsertDictionary(::arrow::ArrayBuilder* builder) = 0; - - virtual int DecodeIndicesSpaced(int num_values, int null_count, - const uint8_t* valid_bits, int64_t valid_bits_offset, - ::arrow::ArrayBuilder* builder) = 0; - - virtual int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) = 0; - - virtual int DecodeIndices(int num_values, int32_t* indices) = 0; - - virtual void GetDictionary(const T** dictionary, int32_t* dictionary_length) = 0; - }; - template - class ParquetDictDecoderImpl : public ParquetDecoderImpl, virtual public ParquetDictDecoder { + class ParquetDictDecoder : public ParquetDecoderImpl, virtual public ParquetTypedDecoder { public: typedef typename Type::c_type T; - explicit ParquetDictDecoderImpl(const ::parquet::ColumnDescriptor* descr, + explicit ParquetDictDecoder(const ::parquet::ColumnDescriptor* descr, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : ParquetDecoderImpl(descr, ::parquet::Encoding::RLE_DICTIONARY), dictionary_(::parquet::AllocateBuffer(pool, 0)), @@ -152,7 +128,7 @@ namespace omniruntime::reader { byte_array_data_(::parquet::AllocateBuffer(pool, 0)), byte_array_offsets_(::parquet::AllocateBuffer(pool, 0)) {} - void SetDict(ParquetTypedDecoder* dictionary) override; + void SetDict(ParquetTypedDecoder* dictionary); void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; @@ -180,40 +156,13 @@ namespace omniruntime::reader { return num_values; } - void InsertDictionary(::arrow::ArrayBuilder* builder) override { - ::parquet::ParquetException::NYI("InsertDictionary ArrayBuilder"); - } - - int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::ArrayBuilder* builder) override { - ::parquet::ParquetException::NYI("DecodeIndicesSpaced ArrayBuilder"); - } - - int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) override { - ::parquet::ParquetException::NYI("DecodeIndices ArrayBuilder"); - } - - int DecodeIndices(int num_values, int32_t* indices) override { - if (num_values != idx_decoder_.GetBatch(indices, num_values)) { - ::parquet::ParquetException::EofException(); - } - num_values_ -= num_values; - return num_values; - } - - void GetDictionary(const T** dictionary, int32_t* dictionary_length) override { - *dictionary_length = dictionary_length_; - *dictionary = reinterpret_cast(dictionary_->mutable_data()); - } - virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { - ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrowNonNull"); + ::parquet::ParquetException::NYI("ParquetDictDecoder for DecodeArrowNonNull"); } virtual int DecodeArrow(int num_values, int null_count, bool* nulls, int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { - ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrow"); + ::parquet::ParquetException::NYI("ParquetDictDecoder for DecodeArrow"); } protected: @@ -243,14 +192,14 @@ namespace omniruntime::reader { }; template - void ParquetDictDecoderImpl::SetDict(ParquetTypedDecoder* dictionary) { + void ParquetDictDecoder::SetDict(ParquetTypedDecoder* dictionary) { DecodeDict(dictionary); } - class OmniDictByteArrayDecoderImpl : public ParquetDictDecoderImpl<::parquet::ByteArrayType> { + class OmniDictByteArrayDecoderImpl : public ParquetDictDecoder<::parquet::ByteArrayType> { public: - using BASE = ParquetDictDecoderImpl<::parquet::ByteArrayType>; - using BASE::ParquetDictDecoderImpl; + using BASE = ParquetDictDecoder<::parquet::ByteArrayType>; + using BASE::ParquetDictDecoder; int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) override { int result = 0; @@ -342,6 +291,15 @@ namespace omniruntime::reader { explicit ParquetPlainDecoder(const ::parquet::ColumnDescriptor* descr); int Decode(T* buffer, int max_values) override; + + virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { + ::parquet::ParquetException::NYI("ParquetPlainDecoder for DecodeArrowNonNull"); + } + + virtual int DecodeArrow(int num_values, int null_count, bool* nulls, + int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { + ::parquet::ParquetException::NYI("ParquetPlainDecoder for DecodeArrow"); + } }; template @@ -513,6 +471,15 @@ namespace omniruntime::reader { public: using ParquetTypedDecoder<::parquet::BooleanType>::Decode; virtual int Decode(uint8_t* buffer, int max_values) = 0; + + virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { + ::parquet::ParquetException::NYI("ParquetBooleanDecoder for DecodeArrowNonNull"); + } + + virtual int DecodeArrow(int num_values, int null_count, bool* nulls, + int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { + ::parquet::ParquetException::NYI("ParquetBooleanDecoder for DecodeArrow"); + } }; class ParquetPlainBooleanDecoder : public ParquetDecoderImpl, virtual public ParquetBooleanDecoder { diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp index 1f8cc7122..d02806cd7 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp @@ -67,15 +67,15 @@ Decoder* MakeOmniDictDecoder(::parquet::Type::type type_num, case ::parquet::Type::BOOLEAN: ::parquet::ParquetException::NYI("Dictionary BOOLEAN encoding not implemented for boolean type"); case ::parquet::Type::INT32: - return new ParquetDictDecoderImpl<::parquet::Int32Type>(descr, pool); + return new ParquetDictDecoder<::parquet::Int32Type>(descr, pool); case ::parquet::Type::INT64: - return new ParquetDictDecoderImpl<::parquet::Int64Type>(descr, pool); + return new ParquetDictDecoder<::parquet::Int64Type>(descr, pool); case ::parquet::Type::DOUBLE: - return new ParquetDictDecoderImpl<::parquet::DoubleType>(descr, pool); + return new ParquetDictDecoder<::parquet::DoubleType>(descr, pool); case ::parquet::Type::BYTE_ARRAY: return new OmniDictByteArrayDecoderImpl(descr, pool); case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: - return new ParquetDictDecoderImpl<::parquet::FLBAType>(descr, pool); + return new ParquetDictDecoder<::parquet::FLBAType>(descr, pool); default: ::parquet::ParquetException::NYI("Not supported dictionary decoder type: " + type_num); } -- Gitee From ee3eec1d1da673aaeab7f49fc44c93c10e7eec90 Mon Sep 17 00:00:00 2001 From: zhousipei Date: Thu, 1 Feb 2024 10:47:22 +0800 Subject: [PATCH 3/3] refactor BooleanDecoder --- .../cpp/src/parquet/ParquetDecoder.cpp | 3 --- .../cpp/src/parquet/ParquetDecoder.h | 16 ++++++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp index 80e8c12e1..dcbb67d8f 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp @@ -25,9 +25,6 @@ using namespace omniruntime::vec; namespace omniruntime::reader { - ParquetPlainBooleanDecoder::ParquetPlainBooleanDecoder(const ::parquet::ColumnDescriptor* descr) - : ParquetDecoderImpl(descr, ::parquet::Encoding::PLAIN) {} - void ParquetPlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; bit_reader_ = std::make_unique<::arrow::bit_util::BitReader>(data, len); diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h index e3d03beb2..d6540288e 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h @@ -467,9 +467,11 @@ namespace omniruntime::reader { } }; - class ParquetBooleanDecoder : virtual public ParquetTypedDecoder<::parquet::BooleanType> { + class ParquetBooleanDecoder : public ParquetDecoderImpl, virtual public ParquetTypedDecoder<::parquet::BooleanType> { public: - using ParquetTypedDecoder<::parquet::BooleanType>::Decode; + explicit ParquetBooleanDecoder(const ::parquet::ColumnDescriptor* descr, ::parquet::Encoding::type encoding) + : ParquetDecoderImpl(descr, encoding) {} + virtual int Decode(uint8_t* buffer, int max_values) = 0; virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { @@ -482,9 +484,11 @@ namespace omniruntime::reader { } }; - class ParquetPlainBooleanDecoder : public ParquetDecoderImpl, virtual public ParquetBooleanDecoder { + class ParquetPlainBooleanDecoder : virtual public ParquetBooleanDecoder { public: - explicit ParquetPlainBooleanDecoder(const ::parquet::ColumnDescriptor* descr); + explicit ParquetPlainBooleanDecoder(const ::parquet::ColumnDescriptor* descr) + : ParquetBooleanDecoder(descr, ::parquet::Encoding::PLAIN) {} + void SetData(int num_values, const uint8_t* data, int len) override; int Decode(uint8_t* buffer, int max_values) override; @@ -494,10 +498,10 @@ namespace omniruntime::reader { std::unique_ptr<::arrow::bit_util::BitReader> bit_reader_; }; - class ParquetRleBooleanDecoder : public ParquetDecoderImpl, virtual public ParquetBooleanDecoder { + class ParquetRleBooleanDecoder : virtual public ParquetBooleanDecoder { public: explicit ParquetRleBooleanDecoder(const ::parquet::ColumnDescriptor* descr) - : ParquetDecoderImpl(descr, ::parquet::Encoding::RLE) {} + : ParquetBooleanDecoder(descr, ::parquet::Encoding::RLE) {} void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; -- Gitee