diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp index b5c1d712dd179e362837347e76e11b3684a9af2f..dcbb67d8fcebbeee64189a19d2c61557ba8acbda 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp @@ -25,9 +25,6 @@ using namespace omniruntime::vec; namespace omniruntime::reader { - ParquetPlainBooleanDecoder::ParquetPlainBooleanDecoder(const ::parquet::ColumnDescriptor* descr) - : ParquetDecoderImpl(descr, ::parquet::Encoding::PLAIN) {} - void ParquetPlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; bit_reader_ = std::make_unique<::arrow::bit_util::BitReader>(data, len); @@ -61,12 +58,12 @@ namespace omniruntime::reader { } template <> - void ParquetDictDecoderImpl<::parquet::BooleanType>::SetDict(ParquetTypedDecoder<::parquet::BooleanType>* dictionary) { + void ParquetDictDecoder<::parquet::BooleanType>::SetDict(ParquetTypedDecoder<::parquet::BooleanType>* dictionary) { ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); } template <> - void ParquetDictDecoderImpl::SetDict(ParquetTypedDecoder* dictionary) { + void ParquetDictDecoder::SetDict(ParquetTypedDecoder* dictionary) { DecodeDict(dictionary); auto dict_values = reinterpret_cast(dictionary_->mutable_data()); @@ -95,7 +92,7 @@ namespace omniruntime::reader { } template <> - inline void ParquetDictDecoderImpl::SetDict(ParquetTypedDecoder* dictionary) { + inline void ParquetDictDecoder::SetDict(ParquetTypedDecoder* dictionary) { DecodeDict(dictionary); auto dict_values = reinterpret_cast(dictionary_->mutable_data()); diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h index a36c2e2acb430d15e32f1a1da1be6c83700ecd7d..d6540288e5d0e2714d39cc0e631d258dd73a636a 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h @@ -31,8 +31,18 @@ using namespace omniruntime::vec; using namespace arrow; namespace omniruntime::reader { + class Decoder { + public: + virtual ~Decoder() = default; + + virtual void SetData(int num_values, const uint8_t* data, int len) = 0; + + virtual int values_left() const = 0; + + virtual ::parquet::Encoding::type encoding() const = 0; + }; - class ParquetDecoderImpl : virtual public ::parquet::Decoder { + class ParquetDecoderImpl : virtual public Decoder { public: void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; @@ -78,7 +88,7 @@ namespace omniruntime::reader { } template - class ParquetTypedDecoder : virtual public ::parquet::TypedDecoder { + class ParquetTypedDecoder : virtual public Decoder { public: using T = typename DType::c_type; @@ -97,46 +107,20 @@ namespace omniruntime::reader { } } - int Decode(T* buffer, int num_values) override { - ::parquet::ParquetException::NYI("ParquetTypedDecoder for Decode"); - } + virtual int Decode(T* buffer, int max_values) = 0; - virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { - ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrowNonNull"); - } + virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) = 0; virtual int DecodeArrow(int num_values, int null_count, bool* nulls, - int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { - ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrow"); - } + int64_t offset, omniruntime::vec::BaseVector** outBaseVec) = 0; }; - template - class ParquetDictDecoder : virtual public ParquetTypedDecoder { - public: - using T = typename DType::c_type; - - virtual void SetDict(ParquetTypedDecoder* dictionary) = 0; - - virtual void InsertDictionary(::arrow::ArrayBuilder* builder) = 0; - - virtual int DecodeIndicesSpaced(int num_values, int null_count, - const uint8_t* valid_bits, int64_t valid_bits_offset, - ::arrow::ArrayBuilder* builder) = 0; - - virtual int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) = 0; - - virtual int DecodeIndices(int num_values, int32_t* indices) = 0; - - virtual void GetDictionary(const T** dictionary, int32_t* dictionary_length) = 0; - }; - template - class ParquetDictDecoderImpl : public ParquetDecoderImpl, virtual public ParquetDictDecoder { + class ParquetDictDecoder : public ParquetDecoderImpl, virtual public ParquetTypedDecoder { public: typedef typename Type::c_type T; - explicit ParquetDictDecoderImpl(const ::parquet::ColumnDescriptor* descr, + explicit ParquetDictDecoder(const ::parquet::ColumnDescriptor* descr, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) : ParquetDecoderImpl(descr, ::parquet::Encoding::RLE_DICTIONARY), dictionary_(::parquet::AllocateBuffer(pool, 0)), @@ -144,7 +128,7 @@ namespace omniruntime::reader { byte_array_data_(::parquet::AllocateBuffer(pool, 0)), byte_array_offsets_(::parquet::AllocateBuffer(pool, 0)) {} - void SetDict(ParquetTypedDecoder* dictionary) override; + void SetDict(ParquetTypedDecoder* dictionary); void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; @@ -172,65 +156,13 @@ namespace omniruntime::reader { return num_values; } - int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset) override { - num_values = std::min(num_values, num_values_); - if (num_values != idx_decoder_.GetBatchWithDictSpaced( - reinterpret_cast(dictionary_->data()), - dictionary_length_, buffer, num_values, null_count, valid_bits, - valid_bits_offset)) { - ::parquet::ParquetException::EofException(); - } - num_values_ -= num_values; - return num_values; - } - - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits::Accumulator* out) override { - ::parquet::ParquetException::NYI("DecodeArrow(Accumulator) for OmniDictDecoderImpl"); - } - - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits::DictAccumulator* out) override { - ::parquet::ParquetException::NYI("DecodeArrow(DictAccumulator) for OmniDictDecoderImpl"); - } - - void InsertDictionary(::arrow::ArrayBuilder* builder) override { - ::parquet::ParquetException::NYI("InsertDictionary ArrayBuilder"); - } - - int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::ArrayBuilder* builder) override { - ::parquet::ParquetException::NYI("DecodeIndicesSpaced ArrayBuilder"); - } - - int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) override { - ::parquet::ParquetException::NYI("DecodeIndices ArrayBuilder"); - } - - int DecodeIndices(int num_values, int32_t* indices) override { - if (num_values != idx_decoder_.GetBatch(indices, num_values)) { - ::parquet::ParquetException::EofException(); - } - num_values_ -= num_values; - return num_values; - } - - void GetDictionary(const T** dictionary, int32_t* dictionary_length) override { - *dictionary_length = dictionary_length_; - *dictionary = reinterpret_cast(dictionary_->mutable_data()); - } - virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { - ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrowNonNull"); + ::parquet::ParquetException::NYI("ParquetDictDecoder for DecodeArrowNonNull"); } virtual int DecodeArrow(int num_values, int null_count, bool* nulls, int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { - ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrow"); + ::parquet::ParquetException::NYI("ParquetDictDecoder for DecodeArrow"); } protected: @@ -241,7 +173,7 @@ namespace omniruntime::reader { return Status::Invalid("Index not in dictionary bounds"); } - inline void DecodeDict(::parquet::TypedDecoder* dictionary) { + inline void DecodeDict(ParquetTypedDecoder* dictionary) { dictionary_length_ = static_cast(dictionary->values_left()); PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T), /*shrink_to_fit=*/false)); @@ -260,14 +192,14 @@ namespace omniruntime::reader { }; template - void ParquetDictDecoderImpl::SetDict(ParquetTypedDecoder* dictionary) { + void ParquetDictDecoder::SetDict(ParquetTypedDecoder* dictionary) { DecodeDict(dictionary); } - class OmniDictByteArrayDecoderImpl : public ParquetDictDecoderImpl<::parquet::ByteArrayType> { + class OmniDictByteArrayDecoderImpl : public ParquetDictDecoder<::parquet::ByteArrayType> { public: - using BASE = ParquetDictDecoderImpl<::parquet::ByteArrayType>; - using BASE::ParquetDictDecoderImpl; + using BASE = ParquetDictDecoder<::parquet::ByteArrayType>; + using BASE::ParquetDictDecoder; int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) override { int result = 0; @@ -360,16 +292,13 @@ namespace omniruntime::reader { int Decode(T* buffer, int max_values) override; - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits::Accumulator* builder) override { - ::parquet::ParquetException::NYI("DecodeArrow(Accumulator) for ParquetPlainDecoder"); + virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { + ::parquet::ParquetException::NYI("ParquetPlainDecoder for DecodeArrowNonNull"); } - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits::DictAccumulator* builder) override { - ::parquet::ParquetException::NYI("DecodeArrow(DictAccumulator) for ParquetPlainDecoder"); + virtual int DecodeArrow(int num_values, int null_count, bool* nulls, + int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { + ::parquet::ParquetException::NYI("ParquetPlainDecoder for DecodeArrow"); } }; @@ -538,41 +467,41 @@ namespace omniruntime::reader { } }; - class ParquetBooleanDecoder : virtual public ParquetTypedDecoder<::parquet::BooleanType> { + class ParquetBooleanDecoder : public ParquetDecoderImpl, virtual public ParquetTypedDecoder<::parquet::BooleanType> { public: - using ParquetTypedDecoder<::parquet::BooleanType>::Decode; + explicit ParquetBooleanDecoder(const ::parquet::ColumnDescriptor* descr, ::parquet::Encoding::type encoding) + : ParquetDecoderImpl(descr, encoding) {} + virtual int Decode(uint8_t* buffer, int max_values) = 0; + + virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { + ::parquet::ParquetException::NYI("ParquetBooleanDecoder for DecodeArrowNonNull"); + } + + virtual int DecodeArrow(int num_values, int null_count, bool* nulls, + int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { + ::parquet::ParquetException::NYI("ParquetBooleanDecoder for DecodeArrow"); + } }; - class ParquetPlainBooleanDecoder : public ParquetDecoderImpl, virtual public ParquetBooleanDecoder { + class ParquetPlainBooleanDecoder : virtual public ParquetBooleanDecoder { public: - explicit ParquetPlainBooleanDecoder(const ::parquet::ColumnDescriptor* descr); + explicit ParquetPlainBooleanDecoder(const ::parquet::ColumnDescriptor* descr) + : ParquetBooleanDecoder(descr, ::parquet::Encoding::PLAIN) {} + void SetData(int num_values, const uint8_t* data, int len) override; int Decode(uint8_t* buffer, int max_values) override; int Decode(bool* buffer, int max_values) override; - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits<::parquet::BooleanType>::Accumulator* out) override { - ::parquet::ParquetException::NYI("DecodeArrow for ParquetPlainBooleanDecoder"); - } - - int DecodeArrow( - int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits<::parquet::BooleanType>::DictAccumulator* builder) override { - ::parquet::ParquetException::NYI("DecodeArrow for ParquetPlainBooleanDecoder"); - } - private: std::unique_ptr<::arrow::bit_util::BitReader> bit_reader_; }; - class ParquetRleBooleanDecoder : public ParquetDecoderImpl, virtual public ParquetBooleanDecoder { + class ParquetRleBooleanDecoder : virtual public ParquetBooleanDecoder { public: explicit ParquetRleBooleanDecoder(const ::parquet::ColumnDescriptor* descr) - : ParquetDecoderImpl(descr, ::parquet::Encoding::RLE) {} + : ParquetBooleanDecoder(descr, ::parquet::Encoding::RLE) {} void SetData(int num_values, const uint8_t* data, int len) override { num_values_ = num_values; @@ -609,24 +538,11 @@ namespace omniruntime::reader { ::parquet::ParquetException::NYI("Decode(uint8_t*, int) for RleBooleanDecoder"); } - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits<::parquet::BooleanType>::Accumulator* out) override { - ::parquet::ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); - } - - int DecodeArrow( - int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - typename ::parquet::EncodingTraits<::parquet::BooleanType>::DictAccumulator* builder) override { - ::parquet::ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); - } - private: std::shared_ptr<::arrow::util::RleDecoder> decoder_; }; - class ParquetPlainFLBADecoder : public ParquetPlainDecoder<::parquet::FLBAType>, virtual public ::parquet::FLBADecoder { + class ParquetPlainFLBADecoder : public ParquetPlainDecoder<::parquet::FLBAType> { public: using Base = ParquetPlainDecoder<::parquet::FLBAType>; using Base::ParquetPlainDecoder; diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp index 6251044a85da44926b049f313bef92813e69552a..d02806cd719f0d796eb2bedf780125dd09c9a5a7 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp @@ -29,7 +29,7 @@ namespace omniruntime::reader { constexpr int32_t DECIMAL64_LEN = 8; -::parquet::Decoder* MakeOmniParquetDecoder(::parquet::Type::type type_num, ::parquet::Encoding::type encoding, +Decoder* MakeOmniParquetDecoder(::parquet::Type::type type_num, ::parquet::Encoding::type encoding, const ColumnDescriptor* descr) { if (encoding == ::parquet::Encoding::PLAIN) { switch (type_num) { @@ -61,21 +61,21 @@ constexpr int32_t DECIMAL64_LEN = 8; } -::parquet::Decoder* MakeOmniDictDecoder(::parquet::Type::type type_num, +Decoder* MakeOmniDictDecoder(::parquet::Type::type type_num, const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) { switch (type_num) { case ::parquet::Type::BOOLEAN: ::parquet::ParquetException::NYI("Dictionary BOOLEAN encoding not implemented for boolean type"); case ::parquet::Type::INT32: - return new ParquetDictDecoderImpl<::parquet::Int32Type>(descr, pool); + return new ParquetDictDecoder<::parquet::Int32Type>(descr, pool); case ::parquet::Type::INT64: - return new ParquetDictDecoderImpl<::parquet::Int64Type>(descr, pool); + return new ParquetDictDecoder<::parquet::Int64Type>(descr, pool); case ::parquet::Type::DOUBLE: - return new ParquetDictDecoderImpl<::parquet::DoubleType>(descr, pool); + return new ParquetDictDecoder<::parquet::DoubleType>(descr, pool); case ::parquet::Type::BYTE_ARRAY: return new OmniDictByteArrayDecoderImpl(descr, pool); case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: - return new ParquetDictDecoderImpl<::parquet::FLBAType>(descr, pool); + return new ParquetDictDecoder<::parquet::FLBAType>(descr, pool); default: ::parquet::ParquetException::NYI("Not supported dictionary decoder type: " + type_num); }