diff --git a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt index 45780185a3c4dad66193e71bc6b13e506be34591..31c0ddbfbcc55a9f77e24a171328906287cff66f 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt +++ b/omnioperator/omniop-spark-extension/cpp/src/CMakeLists.txt @@ -20,7 +20,11 @@ set (SOURCE_FILES jni/jni_common.cpp jni/ParquetColumnarBatchJniReader.cpp tablescan/ParquetReader.cpp - io/ParquetObsFile.cc) + io/ParquetObsFile.cc + tablescan/ParquetColumnReader.cpp + tablescan/ParquetTypedRecordReader.cpp + tablescan/ParquetDecoder.cpp + ) #Find required protobuf package find_package(Protobuf REQUIRED) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp index e24bff186a5ac7ad36076fd0b4346c79bbc18eb5..29a6a1b5e022112fb4fee4fdd4d35d9f50a1cbd9 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.cpp @@ -21,11 +21,6 @@ #include "jni_common.h" #include "tablescan/ParquetReader.h" -using namespace omniruntime::vec; -using namespace omniruntime::type; -using namespace std; -using namespace arrow; -using namespace parquet::arrow; using namespace spark::reader; std::vector GetIndices(JNIEnv *env, jobject jsonObj, const char* name) @@ -110,36 +105,28 @@ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_ParquetColumnarBatchJ } JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_ParquetColumnarBatchJniReader_recordReaderNext(JNIEnv *env, - jobject jObj, jlong reader, jintArray typeId, jlongArray vecNativeId) + jobject jObj, jlong reader, jlongArray vecNativeId) { JNI_FUNC_START ParquetReader *pReader = (ParquetReader *)reader; - std::shared_ptr recordBatchPtr; - auto state = pReader->ReadNextBatch(&recordBatchPtr); + std::vector recordBatch(pReader->columnReaders.size()); + long batchRowSize = 0; + auto state = pReader->ReadNextBatch(recordBatch, &batchRowSize); if (state != Status::OK()) { env->ThrowNew(runtimeExceptionClass, state.ToString().c_str()); return 0; } - int vecCnt = 0; - long batchRowSize = 0; - if (recordBatchPtr != NULL) { - batchRowSize = recordBatchPtr->num_rows(); - vecCnt = recordBatchPtr->num_columns(); - std::vector> fields = recordBatchPtr->schema()->fields(); - - for (int colIdx = 0; colIdx < vecCnt; colIdx++) { - std::shared_ptr array = recordBatchPtr->column(colIdx); - // One array in current batch - std::shared_ptr data = array->data(); - int omniTypeId = 0; - uint64_t omniVecId = 0; - spark::reader::CopyToOmniVec(data->type, omniTypeId, omniVecId, array); - - env->SetIntArrayRegion(typeId, colIdx, 1, &omniTypeId); - jlong omniVec = static_cast(omniVecId); - env->SetLongArrayRegion(vecNativeId, colIdx, 1, &omniVec); + + for (uint64_t colIdx = 0; colIdx < recordBatch.size(); colIdx++) { + auto vector = recordBatch[colIdx]; + // If vector is not initialized, meaning that all data had been read. + if (vector == NULL) { + return 0; } + jlong omniVec = (jlong)(vector); + env->SetLongArrayRegion(vecNativeId, colIdx, 1, &omniVec); } + return (jlong)batchRowSize; JNI_FUNC_END(runtimeExceptionClass) } diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.h b/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.h index 9f47c6fb7a4731a53191e0301ed64dc7da1a282b..7872ff8bb2814f9469e8bcf39453ede2cd293f0e 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.h +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/ParquetColumnarBatchJniReader.h @@ -28,12 +28,8 @@ #include #include #include -#include #include #include -#include -#include -#include #include "common/debug.h" #ifdef __cplusplus @@ -54,7 +50,7 @@ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_ParquetColumnarBatchJ * Signature: (J[I[J)J */ JNIEXPORT jlong JNICALL Java_com_huawei_boostkit_spark_jni_ParquetColumnarBatchJniReader_recordReaderNext - (JNIEnv *, jobject, jlong, jintArray, jlongArray); + (JNIEnv *, jobject, jlong, jlongArray); /* * Class: com_huawei_boostkit_spark_jni_ParquetColumnarBatchJniReader diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetColumnReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetColumnReader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a0b53d13638de84be56648fffeee5213b17ada7b --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetColumnReader.cpp @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2023-2023. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ParquetColumnReader.h" + +using namespace omniruntime::vec; + +namespace spark::reader { + +Status ParquetColumnReader::NextBatch(int64_t batch_size, BaseVector** out) { + RETURN_NOT_OK(LoadBatch(batch_size, out)); + return Status::OK(); +} + +Status ParquetColumnReader::LoadBatch(int64_t records_to_read, BaseVector** out) { + BEGIN_PARQUET_CATCH_EXCEPTIONS + record_reader_->Reset(); + record_reader_->Reserve(records_to_read); + while (records_to_read > 0) { + if (!record_reader_->HasMoreData()) { + break; + } + int64_t records_read = record_reader_->ReadRecords(records_to_read); + records_to_read -= records_read; + if (records_read == 0) { + NextRowGroup(); + } + } + + *out = record_reader_->GetBaseVec(); + if (*out == nullptr) { + return Status::Invalid("Parquet Read OmniVector is nullptr!"); + } + return Status::OK(); + END_PARQUET_CATCH_EXCEPTIONS +} + +void ParquetColumnReader::NextRowGroup() { + std::unique_ptr page_reader = input_->NextChunk(); + record_reader_->SetPageReader(std::move(page_reader)); +} + +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetColumnReader.h b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetColumnReader.h new file mode 100644 index 0000000000000000000000000000000000000000..8bf471fd57072a93c88b6621ec8f0ff0c2d8817c --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetColumnReader.h @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2023-2023. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SPARK_PARQUET_COLUMN_READER_H +#define SPARK_PARQUET_COLUMN_READER_H + +#include "ParquetTypedRecordReader.h" +#include +#include + +namespace spark::reader { + class ParquetColumnReader { + public: + ParquetColumnReader(std::shared_ptr<::parquet::arrow::ReaderContext> ctx, std::shared_ptr<::arrow::Field> field, + std::unique_ptr<::parquet::arrow::FileColumnIterator> input, ::parquet::internal::LevelInfo leaf_info) + : ctx_(std::move(ctx)), + field_(std::move(field)), + input_(std::move(input)), + descr_(input_->descr()) { + record_reader_ = MakeRecordReader(descr_, leaf_info, ctx_->pool, + field_->type()->id() == ::arrow::Type::DICTIONARY, field_->type()); + NextRowGroup(); + } + + ::arrow::Status NextBatch(int64_t batch_size, omniruntime::vec::BaseVector** out); + + ::arrow::Status LoadBatch(int64_t records_to_read, omniruntime::vec::BaseVector** out); + + const std::shared_ptr<::arrow::Field> field() { + return field_; + } + + private: + void NextRowGroup(); + + std::shared_ptr<::parquet::arrow::ReaderContext> ctx_; + std::shared_ptr<::arrow::Field> field_; + std::unique_ptr<::parquet::arrow::FileColumnIterator> input_; + const ::parquet::ColumnDescriptor* descr_; + std::shared_ptr record_reader_; + }; +} +#endif // SPARK_PARQUET_COLUMN_READER_H \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetDecoder.cpp b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetDecoder.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ebbd37d18999b2bf9ec30cefa49aff3030a79dc9 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetDecoder.cpp @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2020-2023. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ParquetDecoder.h" + +using namespace parquet::arrow; +using namespace parquet; +using namespace omniruntime::vec; + +namespace spark::reader { + + ParquetPlainBooleanDecoder::ParquetPlainBooleanDecoder(const ::parquet::ColumnDescriptor* descr) + : ParquetDecoderImpl(descr, ::parquet::Encoding::PLAIN) {} + + void ParquetPlainBooleanDecoder::SetData(int num_values, const uint8_t* data, int len) { + num_values_ = num_values; + bit_reader_ = std::make_unique<::arrow::bit_util::BitReader>(data, len); + } + + int ParquetPlainBooleanDecoder::Decode(uint8_t* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + bool val; + ::arrow::internal::BitmapWriter bit_writer(buffer, 0, max_values); + for (int i = 0; i < max_values; ++i) { + if (!bit_reader_->GetValue(1, &val)) { + ParquetException::EofException(); + } + if (val) { + bit_writer.Set(); + } + bit_writer.Next(); + } + bit_writer.Finish(); + num_values_ -= max_values; + return max_values; + } + + int ParquetPlainBooleanDecoder::Decode(bool* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + if (bit_reader_->GetBatch(1, buffer, max_values) != max_values) { + ::parquet::ParquetException::EofException(); + } + num_values_ -= max_values; + return max_values; + } + + template <> + void ParquetDictDecoderImpl<::parquet::BooleanType>::SetDict(ParquetTypedDecoder<::parquet::BooleanType>* dictionary) { + ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); + } + + template <> + void ParquetDictDecoderImpl::SetDict(ParquetTypedDecoder* dictionary) { + DecodeDict(dictionary); + + auto dict_values = reinterpret_cast(dictionary_->mutable_data()); + + int total_size = 0; + for (int i = 0; i < dictionary_length_; ++i) { + total_size += dict_values[i].len; + } + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, + /*shrink_to_fit=*/false)); + PARQUET_THROW_NOT_OK( + byte_array_offsets_->Resize((dictionary_length_ + 1) * sizeof(int32_t), + /*shrink_to_fit=*/false)); + + int32_t offset = 0; + uint8_t* bytes_data = byte_array_data_->mutable_data(); + int32_t* bytes_offsets = + reinterpret_cast(byte_array_offsets_->mutable_data()); + for (int i = 0; i < dictionary_length_; ++i) { + memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len); + bytes_offsets[i] = offset; + dict_values[i].ptr = bytes_data + offset; + offset += dict_values[i].len; + } + bytes_offsets[dictionary_length_] = offset; + } + + template <> + inline void ParquetDictDecoderImpl::SetDict(ParquetTypedDecoder* dictionary) { + DecodeDict(dictionary); + + auto dict_values = reinterpret_cast(dictionary_->mutable_data()); + + int fixed_len = descr_->type_length(); + int total_size = dictionary_length_ * fixed_len; + + PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, + /*shrink_to_fit=*/false)); + uint8_t* bytes_data = byte_array_data_->mutable_data(); + for (int32_t i = 0, offset = 0; i < dictionary_length_; ++i, offset += fixed_len) { + memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len); + dict_values[i].ptr = bytes_data + offset; + } + } +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetDecoder.h b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetDecoder.h new file mode 100644 index 0000000000000000000000000000000000000000..67aff1a5b191acabe8237768af4b01fddbbe20d8 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetDecoder.h @@ -0,0 +1,637 @@ +/** + * Copyright (C) 2020-2023. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SPARK_PARQUET_ENCODING_H +#define SPARK_PARQUET_ENCODING_H + +#include +#include +#include +#include +#include +#include + +using namespace omniruntime::vec; +using namespace arrow; + +namespace spark::reader { + + class ParquetDecoderImpl : virtual public ::parquet::Decoder { + public: + void SetData(int num_values, const uint8_t* data, int len) override { + num_values_ = num_values; + data_ = data; + len_ = len; + } + + int values_left() const override { return num_values_; } + ::parquet::Encoding::type encoding() const override { return encoding_; } + + protected: + explicit ParquetDecoderImpl(const ::parquet::ColumnDescriptor* descr, ::parquet::Encoding::type encoding) + : descr_(descr), encoding_(encoding), num_values_(0), data_(NULLPTR), len_(0) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const ::parquet::ColumnDescriptor* descr_; + + const ::parquet::Encoding::type encoding_; + int num_values_; + const uint8_t* data_; + int len_; + int type_length_; + }; + + // TODO: optimize batch move + template + inline int SpacedExpand(T* buffer, int num_values, int null_count, + bool* nulls) { + int idx_decode = num_values - null_count; + std::memset(static_cast(buffer + idx_decode), 0, null_count * sizeof(T)); + if (idx_decode == 0) { + // All nulls, nothing more to do + return num_values; + } + for (int i = num_values - 1; i >= 0; --i) { + if (!nulls[i]) { + idx_decode--; + std::memmove(buffer + i, buffer + idx_decode, sizeof(T)); + } + } + assert(idx_decode == 0); + return num_values; + } + + template + class ParquetTypedDecoder : virtual public ::parquet::TypedDecoder { + public: + using T = typename DType::c_type; + + int DecodeSpaced(T* buffer, int num_values, int null_count, + bool* nulls) { + if (null_count > 0) { + int values_to_read = num_values - null_count; + int values_read = Decode(buffer, values_to_read); + if (values_read != values_to_read) { + throw ::parquet::ParquetException("Number of values / definition_levels read did not match"); + } + + return SpacedExpand(buffer, num_values, null_count, nulls); + } else { + return Decode(buffer, num_values); + } + } + + int Decode(T* buffer, int num_values) override { + ::parquet::ParquetException::NYI("ParquetTypedDecoder for Decode"); + } + + virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { + ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrowNonNull"); + } + + virtual int DecodeArrow(int num_values, int null_count, bool* nulls, + int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { + ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrow"); + } + }; + + template + class ParquetDictDecoder : virtual public ParquetTypedDecoder { + public: + using T = typename DType::c_type; + + virtual void SetDict(ParquetTypedDecoder* dictionary) = 0; + + virtual void InsertDictionary(::arrow::ArrayBuilder* builder) = 0; + + virtual int DecodeIndicesSpaced(int num_values, int null_count, + const uint8_t* valid_bits, int64_t valid_bits_offset, + ::arrow::ArrayBuilder* builder) = 0; + + virtual int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) = 0; + + virtual int DecodeIndices(int num_values, int32_t* indices) = 0; + + virtual void GetDictionary(const T** dictionary, int32_t* dictionary_length) = 0; + }; + + template + class ParquetDictDecoderImpl : public ParquetDecoderImpl, virtual public ParquetDictDecoder { + public: + typedef typename Type::c_type T; + + explicit ParquetDictDecoderImpl(const ::parquet::ColumnDescriptor* descr, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : ParquetDecoderImpl(descr, ::parquet::Encoding::RLE_DICTIONARY), + dictionary_(::parquet::AllocateBuffer(pool, 0)), + dictionary_length_(0), + byte_array_data_(::parquet::AllocateBuffer(pool, 0)), + byte_array_offsets_(::parquet::AllocateBuffer(pool, 0)) {} + + void SetDict(ParquetTypedDecoder* dictionary) override; + + void SetData(int num_values, const uint8_t* data, int len) override { + num_values_ = num_values; + if (len == 0) { + idx_decoder_ = ::arrow::util::RleDecoder(data, len, 1); + return; + } + uint8_t bit_width = *data; + if (ARROW_PREDICT_FALSE(bit_width > 32)) { + throw ::parquet::ParquetException("Invalid or corrupted bit_width " + + std::to_string(bit_width) + ". Maximum allowed is 32."); + } + idx_decoder_ = ::arrow::util::RleDecoder(++data, --len, bit_width); + } + + int Decode(T* buffer, int num_values) override { + num_values = std::min(num_values, num_values_); + int decoded_values = + idx_decoder_.GetBatchWithDict(reinterpret_cast(dictionary_->data()), + dictionary_length_, buffer, num_values); + if (decoded_values != num_values) { + ::parquet::ParquetException::EofException(); + } + num_values_ -= num_values; + return num_values; + } + + int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + num_values = std::min(num_values, num_values_); + if (num_values != idx_decoder_.GetBatchWithDictSpaced( + reinterpret_cast(dictionary_->data()), + dictionary_length_, buffer, num_values, null_count, valid_bits, + valid_bits_offset)) { + ::parquet::ParquetException::EofException(); + } + num_values_ -= num_values; + return num_values; + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename ::parquet::EncodingTraits::Accumulator* out) override { + ::parquet::ParquetException::NYI("DecodeArrow(Accumulator) for OmniDictDecoderImpl"); + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename ::parquet::EncodingTraits::DictAccumulator* out) override { + ::parquet::ParquetException::NYI("DecodeArrow(DictAccumulator) for OmniDictDecoderImpl"); + } + + void InsertDictionary(::arrow::ArrayBuilder* builder) override { + ::parquet::ParquetException::NYI("InsertDictionary ArrayBuilder"); + } + + int DecodeIndicesSpaced(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + ::arrow::ArrayBuilder* builder) override { + ::parquet::ParquetException::NYI("DecodeIndicesSpaced ArrayBuilder"); + } + + int DecodeIndices(int num_values, ::arrow::ArrayBuilder* builder) override { + ::parquet::ParquetException::NYI("DecodeIndices ArrayBuilder"); + } + + int DecodeIndices(int num_values, int32_t* indices) override { + if (num_values != idx_decoder_.GetBatch(indices, num_values)) { + ::parquet::ParquetException::EofException(); + } + num_values_ -= num_values; + return num_values; + } + + void GetDictionary(const T** dictionary, int32_t* dictionary_length) override { + *dictionary_length = dictionary_length_; + *dictionary = reinterpret_cast(dictionary_->mutable_data()); + } + + virtual int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) { + ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrowNonNull"); + } + + virtual int DecodeArrow(int num_values, int null_count, bool* nulls, + int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { + ::parquet::ParquetException::NYI("ParquetTypedDecoder for DecodeArrow"); + } + + protected: + Status IndexInBounds(int32_t index) { + if (ARROW_PREDICT_TRUE(0 <= index && index < dictionary_length_)) { + return Status::OK(); + } + return Status::Invalid("Index not in dictionary bounds"); + } + + inline void DecodeDict(::parquet::TypedDecoder* dictionary) { + dictionary_length_ = static_cast(dictionary->values_left()); + PARQUET_THROW_NOT_OK(dictionary_->Resize(dictionary_length_ * sizeof(T), + /*shrink_to_fit=*/false)); + dictionary->Decode(reinterpret_cast(dictionary_->mutable_data()), dictionary_length_); + } + + std::shared_ptr<::parquet::ResizableBuffer> dictionary_; + + int32_t dictionary_length_; + + std::shared_ptr<::parquet::ResizableBuffer> byte_array_data_; + + std::shared_ptr<::parquet::ResizableBuffer> byte_array_offsets_; + + ::arrow::util::RleDecoder idx_decoder_; + }; + + template + void ParquetDictDecoderImpl::SetDict(ParquetTypedDecoder* dictionary) { + DecodeDict(dictionary); + } + + class OmniDictByteArrayDecoderImpl : public ParquetDictDecoderImpl<::parquet::ByteArrayType> { + public: + using BASE = ParquetDictDecoderImpl<::parquet::ByteArrayType>; + using BASE::ParquetDictDecoderImpl; + + int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) override { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, &result, outBaseVec, offset)); + return result; + } + + int DecodeArrow(int num_values, int null_count, bool* nulls, + int64_t offset, omniruntime::vec::BaseVector** vec) override { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, nulls, + offset, &result, vec)); + return result; + } + + private: + Status DecodeArrowDense(int num_values, int null_count, bool* nulls, + int64_t offset, + int* out_num_values, omniruntime::vec::BaseVector** out) { + constexpr int32_t kBufferSize = 1024; + int32_t indices[kBufferSize]; + + auto vec = dynamic_cast>*>(*out); + + auto dict_values = reinterpret_cast(dictionary_->data()); + int values_decoded = 0; + int num_indices = 0; + int pos_indices = 0; + + for (int i = 0; i < num_values; i++) { + if (!nulls[offset + i]) { + if (num_indices == pos_indices) { + const auto batch_size = + std::min(kBufferSize, num_values - null_count - values_decoded); + num_indices = idx_decoder_.GetBatch(indices, batch_size); + if (ARROW_PREDICT_FALSE(num_indices < 1)) { + return Status::Invalid("Invalid number of indices: ", num_indices); + } + pos_indices = 0; + } + const auto index = indices[pos_indices++]; + RETURN_NOT_OK(IndexInBounds(index)); + const auto& val = dict_values[index]; + std::string_view value(reinterpret_cast(val.ptr), val.len); + vec->SetValue(offset + i, value); + ++values_decoded; + } else { + vec->SetNull(offset + i); + } + } + + *out_num_values = values_decoded; + return Status::OK(); + } + + Status DecodeArrowNonNull(int num_values, int* out_num_values, omniruntime::vec::BaseVector** out, int offset) { + constexpr int32_t kBufferSize = 2048; + int32_t indices[kBufferSize]; + + auto vec = dynamic_cast>*>(*out); + + auto dict_values = reinterpret_cast(dictionary_->data()); + + int values_decoded = 0; + while (values_decoded < num_values) { + int32_t batch_size = std::min(kBufferSize, num_values - values_decoded); + int num_indices = idx_decoder_.GetBatch(indices, batch_size); + if (num_indices == 0) ::parquet::ParquetException::EofException(); + for (int i = 0; i < num_indices; ++i) { + auto idx = indices[i]; + RETURN_NOT_OK(IndexInBounds(idx)); + const auto& val = dict_values[idx]; + std::string_view value(reinterpret_cast(val.ptr), val.len); + vec->SetValue(i + offset, value); + } + values_decoded += num_indices; + } + *out_num_values = values_decoded; + return Status::OK(); + } + }; + + template + class ParquetPlainDecoder : public ParquetDecoderImpl, virtual public ParquetTypedDecoder { + public: + using T = typename DType::c_type; + explicit ParquetPlainDecoder(const ::parquet::ColumnDescriptor* descr); + + int Decode(T* buffer, int max_values) override; + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename ::parquet::EncodingTraits::Accumulator* builder) override { + ::parquet::ParquetException::NYI("DecodeArrow(Accumulator) for ParquetPlainDecoder"); + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename ::parquet::EncodingTraits::DictAccumulator* builder) override { + ::parquet::ParquetException::NYI("DecodeArrow(DictAccumulator) for ParquetPlainDecoder"); + } + }; + + template + inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, + int type_length, T* out) { + int64_t bytes_to_decode = num_values * static_cast(sizeof(T)); + if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) { + ::parquet::ParquetException::EofException(); + } + if (bytes_to_decode > 0) { + memcpy(out, data, bytes_to_decode); + } + return static_cast(bytes_to_decode); + } + + static inline int64_t ReadByteArray(const uint8_t* data, int64_t data_size, + ::parquet::ByteArray* out) { + if (ARROW_PREDICT_FALSE(data_size < 4)) { + parquet::ParquetException::EofException(); + } + const int32_t len = ::arrow::util::SafeLoadAs(data); + if (len < 0) { + throw parquet::ParquetException("Invalid BYTE_ARRAY value"); + } + const int64_t consumed_length = static_cast(len) + 4; + if (ARROW_PREDICT_FALSE(data_size < consumed_length)) { + parquet::ParquetException::EofException(); + } + *out = parquet::ByteArray{static_cast(len), data + 4}; + return consumed_length; + } + + template <> + inline int DecodePlain<::parquet::ByteArray>(const uint8_t* data, int64_t data_size, int num_values, + int type_length, ::parquet::ByteArray* out) { + int bytes_decoded = 0; + for (int i = 0; i < num_values; ++i) { + const auto increment = ReadByteArray(data, data_size, out + i); + if (ARROW_PREDICT_FALSE(increment > INT_MAX - bytes_decoded)) { + throw ::parquet::ParquetException("BYTE_ARRAY chunk too large"); + } + data += increment; + data_size -= increment; + bytes_decoded += static_cast(increment); + } + return bytes_decoded; + } + + template <> + inline int DecodePlain<::parquet::FixedLenByteArray>(const uint8_t* data, int64_t data_size, + int num_values, int type_length, + ::parquet::FixedLenByteArray* out) { + int64_t bytes_to_decode = static_cast(type_length) * num_values; + if (bytes_to_decode > data_size || bytes_to_decode > INT_MAX) { + ::parquet::ParquetException::EofException(); + } + for (int i = 0; i < num_values; ++i) { + out[i].ptr = data; + data += type_length; + data_size -= type_length; + } + return static_cast(bytes_to_decode); + } + + template + ParquetPlainDecoder::ParquetPlainDecoder(const ::parquet::ColumnDescriptor* descr) + : ParquetDecoderImpl(descr, ::parquet::Encoding::PLAIN) { + if (descr_ && descr_->physical_type() == ::parquet::Type::FIXED_LEN_BYTE_ARRAY) { + type_length_ = descr_->type_length(); + } else { + type_length_ = -1; + } + } + + template + int ParquetPlainDecoder::Decode(T* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + int bytes_consumed = DecodePlain(data_, len_, max_values, type_length_, buffer); + data_ += bytes_consumed; + len_ -= bytes_consumed; + num_values_ -= max_values; + return max_values; + } + + class ParquetPlainByteArrayDecoder : public ParquetPlainDecoder<::parquet::ByteArrayType>{ + public: + using Base = ParquetPlainDecoder<::parquet::ByteArrayType>; + using Base::ParquetPlainDecoder; + + int DecodeArrowNonNull(int num_values, omniruntime::vec::BaseVector** outBaseVec, int64_t offset) override { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDenseNonNull(num_values, &result, outBaseVec, offset)); + return result; + } + + int DecodeArrow(int num_values, int null_count, bool* nulls, + int64_t offset, omniruntime::vec::BaseVector** outBaseVec) { + int result = 0; + PARQUET_THROW_NOT_OK(DecodeArrowDense(num_values, null_count, nulls, + offset, &result, outBaseVec)); + return result; + } + + private: + Status DecodeArrowDense(int num_values, int null_count, bool* nulls, + int64_t offset, + int* out_values_decoded, omniruntime::vec::BaseVector** out) { + int values_decoded = 0; + auto vec = dynamic_cast>*>(*out); + + for (int i = 0; i < num_values; i++) { + if (!nulls[offset + i]) { + if (ARROW_PREDICT_FALSE(len_ < 4)) { + ::parquet::ParquetException::EofException(); + } + auto value_len = ::arrow::util::SafeLoadAs(data_); + if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { + return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); + } + auto increment = value_len + 4; + if (ARROW_PREDICT_FALSE(len_ < increment)) { + ::parquet::ParquetException::EofException(); + } + std::string_view value(reinterpret_cast(data_ + 4), value_len); + vec->SetValue(offset + i, value); + data_ += increment; + len_ -= increment; + ++values_decoded; + } else { + vec->SetNull(offset + i); + } + } + + num_values_ -= values_decoded; + *out_values_decoded = values_decoded; + return Status::OK(); + } + + Status DecodeArrowDenseNonNull(int num_values, + int* out_values_decoded, omniruntime::vec::BaseVector** out, int64_t offset) { + int values_decoded = 0; + auto vec = dynamic_cast>*>(*out); + + for (int i = 0; i < num_values; i++) { + if (ARROW_PREDICT_FALSE(len_ < 4)) { + ::parquet::ParquetException::EofException(); + } + auto value_len = ::arrow::util::SafeLoadAs(data_); + if (ARROW_PREDICT_FALSE(value_len < 0 || value_len > INT32_MAX - 4)) { + return Status::Invalid("Invalid or corrupted value_len '", value_len, "'"); + } + auto increment = value_len + 4; + if (ARROW_PREDICT_FALSE(len_ < increment)) { + ::parquet::ParquetException::EofException(); + } + std::string_view value(reinterpret_cast(data_ + 4), value_len); + (vec)->SetValue(offset + i, value); + data_ += increment; + len_ -= increment; + ++values_decoded; + } + num_values_ -= values_decoded; + *out_values_decoded = values_decoded; + return Status::OK(); + } + }; + + class ParquetBooleanDecoder : virtual public ParquetTypedDecoder<::parquet::BooleanType> { + public: + using ParquetTypedDecoder<::parquet::BooleanType>::Decode; + virtual int Decode(uint8_t* buffer, int max_values) = 0; + }; + + class ParquetPlainBooleanDecoder : public ParquetDecoderImpl, virtual public ParquetBooleanDecoder { + public: + explicit ParquetPlainBooleanDecoder(const ::parquet::ColumnDescriptor* descr); + void SetData(int num_values, const uint8_t* data, int len) override; + + int Decode(uint8_t* buffer, int max_values) override; + int Decode(bool* buffer, int max_values) override; + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename ::parquet::EncodingTraits<::parquet::BooleanType>::Accumulator* out) override { + ::parquet::ParquetException::NYI("DecodeArrow for ParquetPlainBooleanDecoder"); + } + + int DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename ::parquet::EncodingTraits<::parquet::BooleanType>::DictAccumulator* builder) override { + ::parquet::ParquetException::NYI("DecodeArrow for ParquetPlainBooleanDecoder"); + } + + private: + std::unique_ptr<::arrow::bit_util::BitReader> bit_reader_; + }; + + class ParquetRleBooleanDecoder : public ParquetDecoderImpl, virtual public ParquetBooleanDecoder { + public: + explicit ParquetRleBooleanDecoder(const ::parquet::ColumnDescriptor* descr) + : ParquetDecoderImpl(descr, ::parquet::Encoding::RLE) {} + + void SetData(int num_values, const uint8_t* data, int len) override { + num_values_ = num_values; + uint32_t num_bytes = 0; + + if (len < 4) { + throw ::parquet::ParquetException("Received invalid length : " + std::to_string(len) + + " (corrupt data page?)"); + } + + num_bytes = + ::arrow::bit_util::ToLittleEndian(::arrow::util::SafeLoadAs(data)); + if (num_bytes < 0 || num_bytes > static_cast(len - 4)) { + throw ::parquet::ParquetException("Received invalid number of bytes : " + + std::to_string(num_bytes) + " (corrupt data page?)"); + } + + auto decoder_data = data + 4; + decoder_ = std::make_shared<::arrow::util::RleDecoder>(decoder_data, num_bytes, + /*bit_width=*/1); + } + + int Decode(bool* buffer, int max_values) override { + max_values = std::min(max_values, num_values_); + + if (decoder_->GetBatch(buffer, max_values) != max_values) { + ::parquet::ParquetException::EofException(); + } + num_values_ -= max_values; + return max_values; + } + + int Decode(uint8_t* buffer, int max_values) override { + ::parquet::ParquetException::NYI("Decode(uint8_t*, int) for RleBooleanDecoder"); + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename ::parquet::EncodingTraits<::parquet::BooleanType>::Accumulator* out) override { + ::parquet::ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); + } + + int DecodeArrow( + int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename ::parquet::EncodingTraits<::parquet::BooleanType>::DictAccumulator* builder) override { + ::parquet::ParquetException::NYI("DecodeArrow for RleBooleanDecoder"); + } + + private: + std::shared_ptr<::arrow::util::RleDecoder> decoder_; + }; + + class ParquetPlainFLBADecoder : public ParquetPlainDecoder<::parquet::FLBAType>, virtual public ::parquet::FLBADecoder { + public: + using Base = ParquetPlainDecoder<::parquet::FLBAType>; + using Base::ParquetPlainDecoder; + }; +} +#endif // SPARK_PARQUET_ENCODING_H diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp index a6049df84c2c1b5d13488d8844fbc315e276f2eb..ffbc1acfc5463a76510862f6f7650eb4c9137ac5 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.cpp @@ -17,26 +17,15 @@ * limitations under the License. */ -#include -#include -#include -#include #include "jni/jni_common.h" #include "ParquetReader.h" -using namespace omniruntime::vec; -using namespace omniruntime::type; using namespace arrow; using namespace parquet::arrow; -using namespace arrow::compute; using namespace spark::reader; static std::mutex mutex_; static std::map restore_filesysptr; -static constexpr int32_t PARQUET_MAX_DECIMAL64_DIGITS = 18; -static constexpr int32_t INT128_BYTES = 16; -static constexpr int32_t INT64_BYTES = 8; -static constexpr int32_t BYTE_BITS = 8; static constexpr int32_t LOCAL_FILE_PREFIX = 5; static constexpr int32_t LOCAL_FILE_PREFIX_EXT = 7; static const std::string LOCAL_FILE = "file:"; @@ -87,8 +76,6 @@ Status ParquetReader::InitRecordReader(std::string& filePath, int64_t capacity, const std::vector& row_group_indices, const std::vector& column_indices, std::string& ugi, ObsConfig& obsInfo) { - arrow::MemoryPool* pool = default_memory_pool(); - // Configure reader settings auto reader_properties = parquet::ReaderProperties(pool); @@ -113,182 +100,127 @@ Status ParquetReader::InitRecordReader(std::string& filePath, int64_t capacity, reader_builder.properties(arrow_reader_properties); ARROW_ASSIGN_OR_RAISE(arrow_reader, reader_builder.Build()); - ARROW_RETURN_NOT_OK(arrow_reader->GetRecordBatchReader(row_group_indices, column_indices, &rb_reader)); + ARROW_RETURN_NOT_OK(GetRecordBatchReader(row_group_indices, column_indices)); return arrow::Status::OK(); } -Status ParquetReader::ReadNextBatch(std::shared_ptr *batch) +Status ParquetReader::ReadNextBatch(std::vector &batch, long *batchRowSize) { - ARROW_RETURN_NOT_OK(rb_reader->ReadNext(batch)); + ARROW_RETURN_NOT_OK(rb_reader->ReadNext(batch, batchRowSize)); return arrow::Status::OK(); } -/** - * For BooleanType, copy values one by one. - */ -uint64_t CopyBooleanType(std::shared_ptr array) +Status ParquetReader::GetRecordBatchReader(const std::vector &row_group_indices, const std::vector &column_indices) { - arrow::BooleanArray *lvb = dynamic_cast(array.get()); - auto numElements = lvb->length(); - auto originalVector = new Vector(numElements); - for (int64_t i = 0; i < numElements; i++) { - if (lvb->IsNull(i)) { - originalVector->SetNull(i); - } else { - if (lvb->Value(i)) { - originalVector->SetValue(i, true); - } else { - originalVector->SetValue(i, false); - } - } + std::shared_ptr<::arrow::Schema> batch_schema; + RETURN_NOT_OK(GetFieldReaders(row_group_indices, column_indices, &columnReaders, &batch_schema)); + + int64_t num_rows = 0; + for(int row_group : row_group_indices) { + num_rows += arrow_reader->parquet_reader()->metadata()->RowGroup(row_group)->num_rows(); } - return (uint64_t)originalVector; -} + // Use lambda function to generate BaseVectors + auto batches = [num_rows, this](std::vector &batch, long *batchRowSize) mutable -> Status { + int64_t read_size = std::min(arrow_reader->properties().batch_size(), num_rows); + num_rows -= read_size; + *batchRowSize = read_size; + + if (columnReaders.empty() || read_size <= 0) { + return Status::OK(); + } -/** - * For int16/int32/int64/double type, copy values in batches and skip setNull if there is no nulls. - */ -template uint64_t CopyFixedWidth(std::shared_ptr array) -{ - using T = typename NativeType::type; - PARQUET_TYPE *lvb = dynamic_cast(array.get()); - auto numElements = lvb->length(); - auto values = lvb->raw_values(); - auto originalVector = new Vector(numElements); - // Check ColumnVectorBatch has null or not firstly - if (lvb->null_count() != 0) { - for (int64_t i = 0; i < numElements; i++) { - if (lvb->IsNull(i)) { - originalVector->SetNull(i); + for (uint64_t i = 0; i < columnReaders.size(); ++i) { + RETURN_NOT_OK(columnReaders[i]->NextBatch(read_size, &batch[i])); + } + + // Check BaseVector + for (const auto& column : batch) { + if (column == nullptr) { + return Status::Invalid("BaseVector should not be nullptr after reading"); } } - } - originalVector->SetValues(0, values, numElements); - return (uint64_t)originalVector; + + return Status::OK(); + }; + + rb_reader = std::make_unique(std::move(batches)); + return Status::OK(); } -uint64_t CopyVarWidth(std::shared_ptr array) -{ - auto lvb = dynamic_cast(array.get()); - auto numElements = lvb->length(); - auto originalVector = new Vector>(numElements); - for (int64_t i = 0; i < numElements; i++) { - if (lvb->IsValid(i)) { - auto data = lvb->GetView(i); - originalVector->SetValue(i, data); - } else { - originalVector->SetNull(i); - } - } - return (uint64_t)originalVector; +std::shared_ptr> VectorToSharedSet(const std::vector &values) { + std::shared_ptr> result(new std::unordered_set()); + result->insert(values.begin(), values.end()); + return result; } -uint64_t CopyToOmniDecimal128Vec(std::shared_ptr array) +Status ParquetReader::GetFieldReaders(const std::vector &row_group_indices, const std::vector &column_indices, + std::vector>* out, std::shared_ptr<::arrow::Schema>* out_schema) { - auto lvb = dynamic_cast(array.get()); - auto numElements = lvb->length(); - auto originalVector = new Vector(numElements); - for (int64_t i = 0; i < numElements; i++) { - if (lvb->IsValid(i)) { - auto data = lvb->GetValue(i); - __int128_t val; - memcpy_s(&val, sizeof(val), data, INT128_BYTES); - omniruntime::type::Decimal128 d128(val); - originalVector->SetValue(i, d128); - } else { - originalVector->SetNull(i); - } + // We only read schema fields which have columns indicated in the indices vector + ARROW_ASSIGN_OR_RAISE(std::vector field_indices, arrow_reader->manifest().GetFieldIndices(column_indices)); + auto included_leaves = VectorToSharedSet(column_indices); + out->resize(field_indices.size()); + ::arrow::FieldVector out_fields(field_indices.size()); + + for (size_t i = 0; i < out->size(); i++) { + std::unique_ptr reader; + RETURN_NOT_OK(GetFieldReader(field_indices[i], included_leaves, row_group_indices, &reader)); + out_fields[i] = reader->field(); + out->at(i) = std::move(reader); } - return (uint64_t)originalVector; + + *out_schema = ::arrow::schema(std::move(out_fields), arrow_reader->manifest().schema_metadata); + return Status::OK(); } -uint64_t CopyToOmniDecimal64Vec(std::shared_ptr array) +FileColumnIteratorFactory SomeRowGroupsFactory(std::vector row_group_indices) { + return [row_group_indices] (int i, parquet::ParquetFileReader* reader) { + return new FileColumnIterator(i, reader, row_group_indices); + }; +} + +Status ParquetReader::GetFieldReader(int i, const std::shared_ptr>& included_leaves, + const std::vector &row_group_indices, std::unique_ptr* out) { - auto lvb = dynamic_cast(array.get()); - auto numElements = lvb->length(); - auto originalVector = new Vector(numElements); - for (int64_t i = 0; i < numElements; i++) { - if (lvb->IsValid(i)) { - auto data = lvb->GetValue(i); - int64_t val; - memcpy_s(&val, sizeof(val), data, INT64_BYTES); - originalVector->SetValue(i, val); - } else { - originalVector->SetNull(i); - } + if (ARROW_PREDICT_FALSE(i < 0 || static_cast(i) >= arrow_reader->manifest().schema_fields.size())) { + return Status::Invalid("Column index out of bounds (got ", i, + ", should be between 0 and ", arrow_reader->manifest().schema_fields.size(), ")"); } - return (uint64_t)originalVector; + auto ctx = std::make_shared(); + ctx->reader = arrow_reader->parquet_reader(); + ctx->pool = pool; + ctx->iterator_factory = SomeRowGroupsFactory(row_group_indices); + ctx->filter_leaves = true; + ctx->included_leaves = included_leaves; + auto field = arrow_reader->manifest().schema_fields[i]; + return GetReader(field, field.field, ctx, out); } -int spark::reader::CopyToOmniVec(std::shared_ptr vcType, int &omniTypeId, uint64_t &omniVecId, - std::shared_ptr array) +Status ParquetReader::GetReader(const SchemaField &field, const std::shared_ptr &arrow_field, + const std::shared_ptr &ctx, std::unique_ptr *out) { - switch (vcType->id()) { - case arrow::Type::BOOL: - omniTypeId = static_cast(OMNI_BOOLEAN); - omniVecId = CopyBooleanType(array); - break; - case arrow::Type::INT16: - omniTypeId = static_cast(OMNI_SHORT); - omniVecId = CopyFixedWidth(array); - break; - case arrow::Type::INT32: - omniTypeId = static_cast(OMNI_INT); - omniVecId = CopyFixedWidth(array); - break; - case arrow::Type::DATE32: - omniTypeId = static_cast(OMNI_DATE32); - omniVecId = CopyFixedWidth(array); - break; - case arrow::Type::INT64: - omniTypeId = static_cast(OMNI_LONG); - omniVecId = CopyFixedWidth(array); - break; - case arrow::Type::DATE64: - omniTypeId = static_cast(OMNI_DATE64); - omniVecId = CopyFixedWidth(array); - break; - case arrow::Type::DOUBLE: - omniTypeId = static_cast(OMNI_DOUBLE); - omniVecId = CopyFixedWidth(array); - break; - case arrow::Type::STRING: - omniTypeId = static_cast(OMNI_VARCHAR); - omniVecId = CopyVarWidth(array); - break; - case arrow::Type::DECIMAL128: { - auto decimalType = static_cast(vcType.get()); - if (decimalType->precision() > PARQUET_MAX_DECIMAL64_DIGITS) { - omniTypeId = static_cast(OMNI_DECIMAL128); - omniVecId = CopyToOmniDecimal128Vec(array); - } else { - omniTypeId = static_cast(OMNI_DECIMAL64); - omniVecId = CopyToOmniDecimal64Vec(array); - } - break; + BEGIN_PARQUET_CATCH_EXCEPTIONS + + auto type_id = arrow_field->type()->id(); + + if (type_id == ::arrow::Type::EXTENSION) { + return Status::Invalid("Unsupported type: ", arrow_field->ToString()); + } + + if (field.children.size() == 0) { + if (!field.is_leaf()) { + return Status::Invalid("Parquet non-leaf node has no children"); } - default: { - throw std::runtime_error("Native ColumnarFileScan Not support For This Type: " + vcType->id()); + if (!ctx->IncludesLeaf(field.column_index)) { + *out = nullptr; + return Status::OK(); } + std::unique_ptr input(ctx->iterator_factory(field.column_index, ctx->reader)); + *out = std::make_unique(ctx, arrow_field, std::move(input), field.level_info); + } else { + return Status::Invalid("Unsupported type: ", arrow_field->ToString()); } - return 1; -} + return Status::OK(); -std::pair spark::reader::TransferToOmniVecs(std::shared_ptr batch) -{ - int64_t num_columns = batch->num_columns(); - std::vector> fields = batch->schema()->fields(); - auto vecTypes = new int64_t[num_columns]; - auto vecs = new int64_t[num_columns]; - for (int64_t colIdx = 0; colIdx < num_columns; colIdx++) { - std::shared_ptr array = batch->column(colIdx); - // One array in current batch - std::shared_ptr data = array->data(); - int omniTypeId = 0; - uint64_t omniVecId = 0; - spark::reader::CopyToOmniVec(data->type, omniTypeId, omniVecId, array); - vecTypes[colIdx] = omniTypeId; - vecs[colIdx] = omniVecId; - } - return std::make_pair(vecTypes, vecs); + END_PARQUET_CATCH_EXCEPTIONS } \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h index 9a55d785ca9a3e926cea28ebea392e7e73680da7..3be724d54f84d5011d6360e9c3216248377d5bf0 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetReader.h @@ -20,24 +20,29 @@ #ifndef SPARK_THESTRAL_PLUGIN_PARQUETREADER_H #define SPARK_THESTRAL_PLUGIN_PARQUETREADER_H -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include "ParquetColumnReader.h" #include namespace spark::reader { + + class OmniRecordBatchReader { + public: + OmniRecordBatchReader(std::function &batch, long *batchRowSize)> batches) + : batches_(std::move(batches)) {} + + ~OmniRecordBatchReader() {} + + Status ReadNext(std::vector &out, long *batchRowSize) { + return batches_(out, batchRowSize); + } + + private: + std::function &batch, long *batchRowSize)> batches_; + }; + + class ParquetReader { public: ParquetReader() {} @@ -46,11 +51,28 @@ namespace spark::reader { const std::vector& row_group_indices, const std::vector& column_indices, std::string& ugi, ObsConfig& obsInfo); - arrow::Status ReadNextBatch(std::shared_ptr *batch); + arrow::Status ReadNextBatch(std::vector &batch, long *batchRowSize); std::unique_ptr arrow_reader; - std::shared_ptr rb_reader; + std::unique_ptr rb_reader; + + std::vector> columnReaders; + + arrow::MemoryPool* pool = arrow::default_memory_pool(); + + private: + arrow::Status GetRecordBatchReader(const std::vector &row_group_indices, const std::vector &column_indices); + + arrow::Status GetFieldReaders(const std::vector &row_group_indices, const std::vector &column_indices, + std::vector>* out, std::shared_ptr<::arrow::Schema>* out_schema); + + arrow::Status GetFieldReader(int i, const std::shared_ptr>& included_leaves, + const std::vector &row_group_indices, std::unique_ptr* out); + + arrow::Status GetReader(const parquet::arrow::SchemaField &field, const std::shared_ptr &arrow_field, + const std::shared_ptr &ctx, std::unique_ptr* out); + }; class Filesystem { @@ -66,10 +88,5 @@ namespace spark::reader { std::string GetFileSystemKey(std::string& path, std::string& ugi); Filesystem* GetFileSystemPtr(std::string& path, std::string& ugi); - - int CopyToOmniVec(std::shared_ptr vcType, int &omniTypeId, uint64_t &omniVecId, - std::shared_ptr array); - - std::pair TransferToOmniVecs(std::shared_ptr batch); } #endif // SPARK_THESTRAL_PLUGIN_PARQUETREADER_H \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetTypedRecordReader.cpp b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetTypedRecordReader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..744a38c785b3b07fdb35b0b58d54e8845a8018c2 --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetTypedRecordReader.cpp @@ -0,0 +1,498 @@ +/** + * Copyright (C) 2023-2023. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "ParquetTypedRecordReader.h" +#include "ParquetDecoder.h" + +using namespace parquet::internal; +using namespace arrow; +using namespace parquet; + +namespace spark::reader { + +std::unique_ptr MakeOmniParquetDecoder(::parquet::Type::type type_num, ::parquet::Encoding::type encoding, + const ColumnDescriptor* descr) { + if (encoding == ::parquet::Encoding::PLAIN) { + switch (type_num) { + case ::parquet::Type::BOOLEAN: + return std::make_unique(descr); + case ::parquet::Type::INT32: + return std::make_unique>(descr); + case ::parquet::Type::INT64: + return std::make_unique>(descr); + case ::parquet::Type::DOUBLE: + return std::make_unique>(descr); + case ::parquet::Type::BYTE_ARRAY: + return std::make_unique(descr); + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: + return std::make_unique(descr); + default: + ::parquet::ParquetException::NYI("Not supported decoder type: " + type_num); + } + } else if (encoding == ::parquet::Encoding::RLE) { + if (type_num == ::parquet::Type::BOOLEAN) { + return std::make_unique(descr); + } + ::parquet::ParquetException::NYI("RLE encoding only supports BOOLEAN"); + } else { + ::parquet::ParquetException::NYI("Selected encoding is not supported"); + } + DCHECK(false) << "Should not be able to reach this code"; + return nullptr; +} + + +std::unique_ptr<::parquet::Decoder> MakeOmniDictDecoder(::parquet::Type::type type_num, + const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) { + switch (type_num) { + case ::parquet::Type::BOOLEAN: + ::parquet::ParquetException::NYI("Dictionary BOOLEAN encoding not implemented for boolean type"); + case ::parquet::Type::INT32: + return std::make_unique>(descr, pool); + case ::parquet::Type::INT64: + return std::make_unique>(descr, pool); + case ::parquet::Type::DOUBLE: + return std::make_unique>(descr, pool); + case ::parquet::Type::BYTE_ARRAY: + return std::make_unique(descr, pool); + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: + return std::make_unique>(descr, pool); + default: + ::parquet::ParquetException::NYI("Not supported dictionary decoder type: " + type_num); + } + DCHECK(false) << "Should not be able to reach this code"; + return nullptr; +} + +template +std::unique_ptr> MakeParquetDictDecoder( + const ColumnDescriptor* descr = NULLPTR, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) { + using OutType = ParquetDictDecoder; + auto decoder = MakeOmniDictDecoder(DType::type_num, descr, pool); + return std::unique_ptr(dynamic_cast(decoder.release())); +} + +template +std::unique_ptr> MakeParquetTypedDecoder( + ::parquet::Encoding::type encoding, const ColumnDescriptor* descr = NULLPTR) { + using OutType = ParquetTypedDecoder; + std::unique_ptr base = MakeOmniParquetDecoder(DType::type_num, encoding, descr); + return std::unique_ptr(dynamic_cast(base.release())); +} + +// Advance to the next data page +template +bool ParquetColumnReaderBase::ReadNewPage() { + // Loop until we find the next data page. + while (true) { + current_page_ = pager_->NextPage(); + if (!current_page_) { + // EOS + return false; + } + + if (current_page_->type() == PageType::DICTIONARY_PAGE) { + ConfigureDictionary(static_cast(current_page_.get())); + continue; + } else if (current_page_->type() == PageType::DATA_PAGE) { + const auto page = std::static_pointer_cast(current_page_); + const int64_t levels_byte_size = InitializeLevelDecoders( + *page, page->repetition_level_encoding(), page->definition_level_encoding()); + InitializeDataDecoder(*page, levels_byte_size); + return true; + } else if (current_page_->type() == PageType::DATA_PAGE_V2) { + const auto page = std::static_pointer_cast(current_page_); + int64_t levels_byte_size = InitializeLevelDecodersV2(*page); + InitializeDataDecoder(*page, levels_byte_size); + return true; + } else { + // We don't know what this page type is. We're allowed to skip non-data + // pages. + continue; + } + } + return true; +} + +template +void ParquetColumnReaderBase::ConfigureDictionary(const DictionaryPage* page) { + int encoding = static_cast(page->encoding()); + if (page->encoding() == ::parquet::Encoding::PLAIN_DICTIONARY || + page->encoding() == ::parquet::Encoding::PLAIN) { + encoding = static_cast(::parquet::Encoding::RLE_DICTIONARY); + } + + auto it = decoders_.find(encoding); + if (it != decoders_.end()) { + throw ParquetException("Column cannot have more than one dictionary."); + } + + if (page->encoding() == ::parquet::Encoding::PLAIN_DICTIONARY || + page->encoding() == ::parquet::Encoding::PLAIN) { + auto dictionary = MakeParquetTypedDecoder(::parquet::Encoding::PLAIN, descr_); + dictionary->SetData(page->num_values(), page->data(), page->size()); + + // The dictionary is fully decoded during DictionaryDecoder::Init, so the + // DictionaryPage buffer is no longer required after this step + // + // TODO(wesm): investigate whether this all-or-nothing decoding of the + // dictionary makes sense and whether performance can be improved + + std::unique_ptr> decoder = MakeParquetDictDecoder(descr_, pool_); + decoder->SetDict(dynamic_cast(dictionary.get())); + decoders_[encoding] = + std::unique_ptr(dynamic_cast(decoder.release())); + } else { + ParquetException::NYI("only plain dictionary encoding has been implemented"); + } + + new_dictionary_ = true; + current_decoder_ = decoders_[encoding].get(); + DCHECK(current_decoder_); +} + +// Initialize repetition and definition level decoders on the next data page. + +// If the data page includes repetition and definition levels, we +// initialize the level decoders and return the number of encoded level bytes. +// The return value helps determine the number of bytes in the encoded data. +template +int64_t ParquetColumnReaderBase::InitializeLevelDecoders(const DataPage& page, + ::parquet::Encoding::type repetition_level_encoding, + ::parquet::Encoding::type definition_level_encoding) { + // Read a data page. + num_buffered_values_ = page.num_values(); + + // Have not decoded any values from the data page yet + num_decoded_values_ = 0; + + const uint8_t* buffer = page.data(); + int32_t levels_byte_size = 0; + int32_t max_size = page.size(); + + // Data page Layout: Repetition Levels - Definition Levels - encoded values. + // Levels are encoded as rle or bit-packed. + // Init repetition levels + if (max_rep_level_ > 0) { + int32_t rep_levels_bytes = repetition_level_decoder_.SetData( + repetition_level_encoding, max_rep_level_, + static_cast(num_buffered_values_), buffer, max_size); + buffer += rep_levels_bytes; + levels_byte_size += rep_levels_bytes; + max_size -= rep_levels_bytes; + } + // TODO figure a way to set max_def_level_ to 0 + // if the initial value is invalid + + // Init definition levels + if (max_def_level_ > 0) { + int32_t def_levels_bytes = definition_level_decoder_.SetData( + definition_level_encoding, max_def_level_, + static_cast(num_buffered_values_), buffer, max_size); + levels_byte_size += def_levels_bytes; + max_size -= def_levels_bytes; + } + + return levels_byte_size; +} + + +template +int64_t ParquetColumnReaderBase::InitializeLevelDecodersV2(const ::parquet::DataPageV2& page) { + // Read a data page. + num_buffered_values_ = page.num_values(); + + // Have not decoded any values from the data page yet + num_decoded_values_ = 0; + const uint8_t* buffer = page.data(); + + const int64_t total_levels_length = + static_cast(page.repetition_levels_byte_length()) + + page.definition_levels_byte_length(); + + if (total_levels_length > page.size()) { + throw ParquetException("Data page too small for levels (corrupt header?)"); + } + + if (max_rep_level_ > 0) { + repetition_level_decoder_.SetDataV2(page.repetition_levels_byte_length(), + max_rep_level_, static_cast(num_buffered_values_), buffer); + } + // ARROW-17453: Even if max_rep_level_ is 0, there may still be + // repetition level bytes written and/or reported in the header by + // some writers (e.g. Athena) + buffer += page.repetition_levels_byte_length(); + + if (max_def_level_ > 0) { + definition_level_decoder_.SetDataV2(page.definition_levels_byte_length(), + max_def_level_, static_cast(num_buffered_values_), buffer); + } + + return total_levels_length; +} + +static bool IsDictionaryIndexEncoding(const ::parquet::Encoding::type& e) { + return e == ::parquet::Encoding::RLE_DICTIONARY || e == ::parquet::Encoding::PLAIN_DICTIONARY; +} + +// Get a decoder object for this page or create a new decoder if this is the +// first page with this encoding. +template +void ParquetColumnReaderBase::InitializeDataDecoder(const DataPage& page, int64_t levels_byte_size) { + const uint8_t* buffer = page.data() + levels_byte_size; + const int64_t data_size = page.size() - levels_byte_size; + + if (data_size < 0) { + throw ParquetException("Page smaller than size of encoded levels"); + } + + ::parquet::Encoding::type encoding = page.encoding(); + + if (IsDictionaryIndexEncoding(encoding)) { + encoding = ::parquet::Encoding::RLE_DICTIONARY; + } + + auto it = decoders_.find(static_cast(encoding)); + if (it != decoders_.end()) { + DCHECK(it->second.get() != nullptr); + current_decoder_ = it->second.get(); + } else { + switch (encoding) { + case ::parquet::Encoding::PLAIN: { + auto decoder = MakeParquetTypedDecoder(::parquet::Encoding::PLAIN, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast(encoding)] = std::move(decoder); + break; + } + case ::parquet::Encoding::RLE: { + auto decoder = MakeParquetTypedDecoder(::parquet::Encoding::PLAIN, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast(encoding)] = std::move(decoder); + break; + } + case ::parquet::Encoding::RLE_DICTIONARY: + case ::parquet::Encoding::BYTE_STREAM_SPLIT: + case ::parquet::Encoding::DELTA_BINARY_PACKED: + case ::parquet::Encoding::DELTA_BYTE_ARRAY: + case ::parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY: + + default: + throw ParquetException("Unknown encoding type."); + } + } + current_encoding_ = encoding; + current_decoding_type = DType::type_num; + current_decoder_->SetData(static_cast(num_buffered_values_), buffer,static_cast(data_size)); +} + +std::shared_ptr MakeByteArrayRecordReader(const ColumnDescriptor* descr, + LevelInfo leaf_info, + ::arrow::MemoryPool* pool, + bool read_dictionary) { + if (read_dictionary) { + std::stringstream ss; + ss << "Invalid ParquetByteArrayDictionary is not implement yet " << static_cast(descr->physical_type()); + throw ParquetException(ss.str()); + } else { + return std::make_shared(descr, leaf_info, pool); + } +} + +std::shared_ptr MakeRecordReader(const ColumnDescriptor* descr, + LevelInfo leaf_info, ::arrow::MemoryPool* pool, + bool read_dictionary, + const std::shared_ptr<::arrow::DataType>& type) { + switch (type->id()) { + case ::arrow::Type::BOOL: + return std::make_shared>(descr, leaf_info, pool); + case ::arrow::Type::INT16: + return std::make_shared(descr, leaf_info, pool); + case ::arrow::Type::INT32: + return std::make_shared>(descr, leaf_info, pool); + case ::arrow::Type::DATE32: + return std::make_shared>(descr, leaf_info, pool); + case ::arrow::Type::INT64: + return std::make_shared>(descr, leaf_info, pool); + case ::arrow::Type::DATE64: + return std::make_shared>(descr, leaf_info, pool); + case ::arrow::Type::DOUBLE: + return std::make_shared>(descr, leaf_info, pool); + case ::arrow::Type::STRING: { + return MakeByteArrayRecordReader(descr, leaf_info, pool, read_dictionary); + } + case ::arrow::Type::DECIMAL: { + switch (descr->physical_type()) { + case ::parquet::Type::INT32: + return std::make_shared(descr, leaf_info, pool); + case ::parquet::Type::INT64: + return std::make_shared>(descr, leaf_info, pool); + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { + int32_t precision = ::arrow::internal::checked_cast(*type).precision(); + if (precision > PARQUET_MAX_DECIMAL64_DIGITS) { + return std::make_shared(descr, leaf_info, pool); + } else { + return std::make_shared(descr, leaf_info, pool); + } + } + default: + std::stringstream ss; + ss << "RecordReader not support decimal type " << static_cast(descr->physical_type()); + throw ParquetException(ss.str()); + } + } + default: { + // PARQUET-1481: This can occur if the file is corrupt + std::stringstream ss; + ss << "Invalid physical column type: " << static_cast(descr->physical_type()); + throw ParquetException(ss.str()); + } + } + // Unreachable code, but suppress compiler warning + return nullptr; +} + +// Helper function used by Decimal128::FromBigEndian +static inline uint64_t UInt64FromBigEndian(const uint8_t* bytes, int32_t length) { + // We don't bounds check the length here because this is called by + // FromBigEndian that has a Decimal128 as its out parameters and + // that function is already checking the length of the bytes and only + // passes lengths between zero and eight. + uint64_t result = 0; + // Using memcpy instead of special casing for length + // and doing the conversion in 16, 32 parts, which could + // possibly create unaligned memory access on certain platforms + memcpy(reinterpret_cast(&result) + 8 - length, bytes, length); + return ::arrow::bit_util::FromBigEndian(result); +} + +static inline Result FromBigEndianToOmniDecimal128(const uint8_t* bytes, int32_t length) { + static constexpr int32_t kMinDecimalBytes = 1; + static constexpr int32_t kMaxDecimalBytes = 16; + + int64_t high, low; + + if (ARROW_PREDICT_FALSE(length < kMinDecimalBytes || length > kMaxDecimalBytes)) { + return Status::Invalid("Length of byte array passed to Decimal128::FromBigEndian ", + "was ", length, ", but must be between ", kMinDecimalBytes, + " and ", kMaxDecimalBytes); + } + + // Bytes are coming in big-endian, so the first byte is the MSB and therefore holds the + // sign bit. + const bool is_negative = static_cast(bytes[0]) < 0; + + // 1. Extract the high bytes + // Stop byte of the high bytes + const int32_t high_bits_offset = std::max(0, length - 8); + const auto high_bits = UInt64FromBigEndian(bytes, high_bits_offset); + + if (high_bits_offset == 8) { + // Avoid undefined shift by 64 below + high = high_bits; + } else { + high = -1 * (is_negative && length < kMaxDecimalBytes); + // Shift left enough bits to make room for the incoming int64_t + high = SafeLeftShift(high, high_bits_offset * CHAR_BIT); + // Preserve the upper bits by inplace OR-ing the int64_t + high |= high_bits; + } + + // 2. Extract the low bytes + // Stop byte of the low bytes + const int32_t low_bits_offset = std::min(length, 8); + const auto low_bits = + UInt64FromBigEndian(bytes + high_bits_offset, length - high_bits_offset); + + if (low_bits_offset == 8) { + // Avoid undefined shift by 64 below + low = low_bits; + } else { + // Sign extend the low bits if necessary + low = -1 * (is_negative && length < 8); + // Shift left enough bits to make room for the incoming int64_t + low = SafeLeftShift(low, low_bits_offset * CHAR_BIT); + // Preserve the upper bits by inplace OR-ing the int64_t + low |= low_bits; + } + + __int128_t temp_high = high; + temp_high = temp_high << (8 * CHAR_BIT); + __int128_t val = temp_high | static_cast(low); + + return omniruntime::type::Decimal128(val); +} + +Status RawBytesToDecimal128Bytes(const uint8_t* bytes, int32_t length, + omniruntime::vec::BaseVector** out_buf, int64_t index) { + auto out = static_cast*>(*out_buf); + ARROW_ASSIGN_OR_RAISE(auto t, FromBigEndianToOmniDecimal128(bytes, length)); + out->SetValue(index, t); + return Status::OK(); +} + +Status RawBytesToDecimal64Bytes(const uint8_t* bytes, int32_t length, + omniruntime::vec::BaseVector** out_buf, int64_t index) { + auto out = static_cast*>(*out_buf); + + // Directly Extract the low bytes + // Stop byte of the low bytes + int64_t low = 0; + const bool is_negative = static_cast(bytes[0]) < 0; + const int32_t low_bits_offset = std::min(length, 8); + auto low_bits = UInt64FromBigEndian(bytes, low_bits_offset); + + if (low_bits_offset == 8) { + // Avoid undefined shift by 64 below + low = low_bits; + } else { + // Sign extend the low bits if necessary + low = -1 * (is_negative && length < 8); + // Shift left enough bits to make room for the incoming int64_t + low = SafeLeftShift(low, low_bits_offset * CHAR_BIT); + // Preserve the upper bits by inplace OR-ing the int64_t + low |= low_bits; + } + + out->SetValue(index, low); + return Status::OK(); +} + +void DefLevelsToNullsSIMD(const int16_t* def_levels, int64_t num_def_levels, const int16_t max_def_level, + int64_t* values_read, int64_t* null_count, bool* nulls) { + for (int i = 0; i < num_def_levels; ++i) { + if (def_levels[i] < max_def_level) { + nulls[i] = true; + (*null_count)++; + } + } + *values_read = num_def_levels; +} + +void DefLevelsToNulls(const int16_t* def_levels, int64_t num_def_levels, LevelInfo level_info, + int64_t* values_read, int64_t* null_count, bool* nulls) { + if (level_info.rep_level == 0) { + DefLevelsToNullsSIMD(def_levels, num_def_levels, level_info.def_level, values_read, null_count, nulls); + } else { + ::ParquetException::NYI("rep_level > 0 NYI"); + } +} + +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetTypedRecordReader.h b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetTypedRecordReader.h new file mode 100644 index 0000000000000000000000000000000000000000..8f3be73dd3891b1ae416a030673bf9555fd3ebdf --- /dev/null +++ b/omnioperator/omniop-spark-extension/cpp/src/tablescan/ParquetTypedRecordReader.h @@ -0,0 +1,847 @@ +/** + * Copyright (C) 2023-2023. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef SPARK_PARQUET_COLUMN_TYPE_READER_H +#define SPARK_PARQUET_COLUMN_TYPE_READER_H + +#include "ParquetDecoder.h" +#include +#include +#include + +using ResizableBuffer = ::arrow::ResizableBuffer; +using namespace omniruntime::vec; + +namespace spark::reader { + constexpr int64_t kMinLevelBatchSize = 1024; + static constexpr int32_t PARQUET_MAX_DECIMAL64_DIGITS = 18; + + inline void CheckNumberDecoded(int64_t number_decoded, int64_t expected) { + if (ARROW_PREDICT_FALSE(number_decoded != expected)) { + ::parquet::ParquetException::EofException("Decoded values " + std::to_string(number_decoded) + + " does not match expected" + std::to_string(expected)); + } + } + + template + SignedInt SafeLeftShift(SignedInt u, Shift shift) { + using UnsignedInt = typename std::make_unsigned::type; + return static_cast(static_cast(u) << shift); + } + + ::arrow::Status RawBytesToDecimal128Bytes(const uint8_t* bytes, int32_t length, BaseVector** out_buf, int64_t index); + + ::arrow::Status RawBytesToDecimal64Bytes(const uint8_t* bytes, int32_t length, BaseVector** out_buf, int64_t index); + + void DefLevelsToNulls(const int16_t* def_levels, int64_t num_def_levels, ::parquet::internal::LevelInfo level_info, + int64_t* values_read, int64_t* null_count, bool* nulls); + + template + class ParquetColumnReaderBase { + public: + using T = typename DType::c_type; + + ParquetColumnReaderBase(const ::parquet::ColumnDescriptor* descr, ::arrow::MemoryPool* pool) + : descr_(descr), + max_def_level_(descr->max_definition_level()), + max_rep_level_(descr->max_repetition_level()), + num_buffered_values_(0), + num_decoded_values_(0), + pool_(pool), + current_decoder_(nullptr), + current_encoding_(::parquet::Encoding::UNKNOWN) {} + + virtual ~ParquetColumnReaderBase() = default; + + protected: + int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) { + if (max_def_level_ == 0) { + return 0; + } + return definition_level_decoder_.Decode(static_cast(batch_size), levels); + } + + bool HasNextInternal() { + if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) { + if (!ReadNewPage() || num_buffered_values_ == 0) { + return false; + } + } + return true; + } + + int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) { + if (max_rep_level_ == 0) { + return 0; + } + return repetition_level_decoder_.Decode(static_cast(batch_size), levels); + } + + bool ReadNewPage(); + + void ConfigureDictionary(const ::parquet::DictionaryPage* page); + + int64_t InitializeLevelDecoders(const ::parquet::DataPage& page, + ::parquet::Encoding::type repetition_level_encoding, + ::parquet::Encoding::type definition_level_encoding); + + int64_t InitializeLevelDecodersV2(const ::parquet::DataPageV2& page); + + void InitializeDataDecoder(const ::parquet::DataPage& page, int64_t levels_byte_size); + + int64_t available_values_current_page() const { + return num_buffered_values_ - num_decoded_values_; + } + + const ::parquet::ColumnDescriptor* descr_; + const int16_t max_def_level_; + const int16_t max_rep_level_; + + std::unique_ptr<::parquet::PageReader> pager_; + std::shared_ptr<::parquet::Page> current_page_; + + ::parquet::LevelDecoder definition_level_decoder_; + ::parquet::LevelDecoder repetition_level_decoder_; + + int64_t num_buffered_values_; + int64_t num_decoded_values_; + + ::arrow::MemoryPool* pool_; + + using DecoderType = ParquetTypedDecoder; + DecoderType* current_decoder_; + ::parquet::Encoding::type current_encoding_; + ::parquet::Type::type current_decoding_type; + + bool new_dictionary_ = false; + + std::unordered_map> decoders_; + + void ConsumeBufferedValues(int64_t num_values) { + num_decoded_values_ += num_values; + } + }; + + class OmniRecordReader { + public: + virtual ~OmniRecordReader() = default; + + /// \brief Attempt to read indicated number of records from column chunk + /// Note that for repeated fields, a record may have more than one value + /// and all of them are read. + virtual int64_t ReadRecords(int64_t num_records) = 0; + + /// \brief Attempt to skip indicated number of records from column chunk. + /// Note that for repeated fields, a record may have more than one value + /// and all of them are skipped. + /// \return number of records skipped + virtual int64_t SkipRecords(int64_t num_records) = 0; + + /// \brief Pre-allocate space for data. Results in better flat read performance + virtual void Reserve(int64_t num_values) = 0; + + /// \brief Clear consumed values and repetition/definition levels as the + /// result of calling ReadRecords + virtual void Reset() = 0; + + /// \brief Return true if the record reader has more internal data yet to + /// process + virtual bool HasMoreData() const = 0; + + /// \brief Advance record reader to the next row group. Must be set before + /// any records could be read/skipped. + /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader + virtual void SetPageReader(std::unique_ptr reader) = 0; + + virtual BaseVector* GetBaseVec() = 0; + + /// \brief Decoded definition levels + int16_t* def_levels() const { + return reinterpret_cast(def_levels_->mutable_data()); + } + + /// \brief Decoded repetition levels + int16_t* rep_levels() const { + return reinterpret_cast(rep_levels_->mutable_data()); + } + + /// \brief Decoded values, including nulls, if any + /// FLBA and ByteArray types do not use this array and read into their own + /// builders. + uint8_t* values() const { return values_->mutable_data(); } + + /// \brief Number of values written, including space left for nulls if any. + /// If this Reader was constructed with read_dense_for_nullable(), there is no space for + /// nulls and null_count() will be 0. There is no read-ahead/buffering for values. For + /// FLBA and ByteArray types this value reflects the values written with the last + /// ReadRecords call since those readers will reset the values after each call. + int64_t values_written() const { return values_written_; } + + /// \brief Number of definition / repetition levels (from those that have + /// been decoded) that have been consumed inside the reader. + int64_t levels_position() const { return levels_position_; } + + /// \brief Number of definition / repetition levels that have been written + /// internally in the reader. This may be larger than values_written() because + /// for repeated fields we need to look at the levels in advance to figure out + /// the record boundaries. + int64_t levels_written() const { return levels_written_; } + + /// \brief Number of nulls in the leaf that we have read so far into the + /// values vector. This is only valid when !read_dense_for_nullable(). When + /// read_dense_for_nullable() it will always be 0. + int64_t null_count() const { return null_count_; } + + /// \brief True if the leaf values are nullable + bool nullable_values() const { return nullable_values_; } + + /// \brief True if reading directly as Arrow dictionary-encoded + bool read_dictionary() const { return read_dictionary_; } + + + /// \brief Indicates if we can have nullable values. Note that repeated fields + /// may or may not be nullable. + bool nullable_values_; + + bool at_record_start_; + int64_t records_read_; + + /// \brief Stores values. These values are populated based on each ReadRecords + /// call. No extra values are buffered for the next call. SkipRecords will not + /// add any value to this buffer. + std::shared_ptr values_; + /// \brief False for BYTE_ARRAY, in which case we don't allocate the values + /// buffer and we directly read into builder classes. + bool uses_values_; + + /// \brief Values that we have read into 'values_' + 'null_count_'. + int64_t values_written_; + int64_t values_capacity_; + int64_t null_count_; + + /// \brief Buffer for definition levels. May contain more levels than + /// is actually read. This is because we read levels ahead to + /// figure out record boundaries for repeated fields. + /// For flat required fields, 'def_levels_' and 'rep_levels_' are not + /// populated. For non-repeated fields 'rep_levels_' is not populated. + /// 'def_levels_' and 'rep_levels_' must be of the same size if present. + std::shared_ptr def_levels_; + /// \brief Buffer for repetition levels. Only populated for repeated + /// fields. + std::shared_ptr rep_levels_; + + /// \brief Number of definition / repetition levels that have been written + /// internally in the reader. This may be larger than values_written() since + /// for repeated fields we need to look at the levels in advance to figure out + /// the record boundaries. + int64_t levels_written_; + /// \brief Position of the next level that should be consumed. + int64_t levels_position_; + int64_t levels_capacity_; + + bool read_dictionary_ = false; + }; + + /** + * ParquetTypedRecordReader is used to generate omnivector directly from the def_level/rep_level/values. + * And we directly use omnivector's nulls to store each null value flag instead of bitmap to reduce extra cost. + * When setting omnivector's values, it can choose whether transferring values according to the TYPE_ID and DType. + * @tparam TYPE_ID omni type + * @tparam DType parquet store type + */ + template + class ParquetTypedRecordReader : public ParquetColumnReaderBase, virtual public OmniRecordReader { + public: + using T = typename DType::c_type; + using V = typename NativeType::type; + using BASE = ParquetColumnReaderBase; + + explicit ParquetTypedRecordReader(const ::parquet::ColumnDescriptor* descr, + ::parquet::internal::LevelInfo leaf_info, ::arrow::MemoryPool* pool) + // Pager must be set using SetPageReader. + : BASE(descr, pool) { + leaf_info_ = leaf_info; + nullable_values_ = leaf_info.HasNullableValues(); + at_record_start_ = true; + values_written_ = 0; + null_count_ = 0; + values_capacity_ = 0; + levels_written_ = 0; + levels_position_ = 0; + levels_capacity_ = 0; + uses_values_ = !(descr->physical_type() == ::parquet::Type::BYTE_ARRAY); + byte_width_ = descr->type_length(); + + if (uses_values_) { + values_ = ::parquet::AllocateBuffer(pool); + } + def_levels_ = ::parquet::AllocateBuffer(pool); + rep_levels_ = ::parquet::AllocateBuffer(pool); + Reset(); + } + + ~ParquetTypedRecordReader() { + if (parquet_vec_ != nullptr) { + delete[] parquet_vec_; + } + } + + // Compute the values capacity in bytes for the given number of elements + int64_t bytes_for_values(int64_t nitems) const { + int64_t type_size = GetTypeByteSize(this->descr_->physical_type()); + int64_t bytes_for_values = -1; + if (::arrow::internal::MultiplyWithOverflow(nitems, type_size, &bytes_for_values)) { + throw ::parquet::ParquetException("Total size of items too large"); + } + return bytes_for_values; + } + + int64_t ReadRecords(int64_t num_records) override { + if (num_records == 0) return 0; + // Delimit records, then read values at the end + int64_t records_read = 0; + + if (has_values_to_process()) { + records_read += ReadRecordData(num_records); + } + + int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records); + + // If we are in the middle of a record, we continue until reaching the + // desired number of records or the end of the current record if we've found + // enough records + while (!at_record_start_ || records_read < num_records) { + // Is there more data to read in this row group? + if (!this->HasNextInternal()) { + if (!at_record_start_) { + // We ended the row group while inside a record that we haven't seen + // the end of yet. So increment the record count for the last record in + // the row group + ++records_read; + at_record_start_ = true; + } + break; + } + + /// We perform multiple batch reads until we either exhaust the row group + /// or observe the desired number of records + int64_t batch_size = + std::min(level_batch_size, this->available_values_current_page()); + + // No more data in column + if (batch_size == 0) { + break; + } + + if (this->max_def_level_ > 0) { + ReserveLevels(batch_size); + + int16_t* def_levels = this->def_levels() + levels_written_; + int16_t* rep_levels = this->rep_levels() + levels_written_; + + // Not present for non-repeated fields + int64_t levels_read = 0; + if (this->max_rep_level_ > 0) { + levels_read = this->ReadDefinitionLevels(batch_size, def_levels); + if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { + throw ::parquet::ParquetException("Number of decoded rep / def levels did not match"); + } + } else if (this->max_def_level_ > 0) { + levels_read = this->ReadDefinitionLevels(batch_size, def_levels); + } + + // Exhausted column chunk + if (levels_read == 0) { + break; + } + + levels_written_ += levels_read; + records_read += ReadRecordData(num_records - records_read); + } else { + // No repetition or definition levels + batch_size = std::min(num_records - records_read, batch_size); + records_read += ReadRecordData(batch_size); + } + } + + return records_read; + } + + // Throw away levels from start_levels_position to levels_position_. + // Will update levels_position_, levels_written_, and levels_capacity_ + // accordingly and move the levels to left to fill in the gap. + // It will resize the buffer without releasing the memory allocation. + void ThrowAwayLevels(int64_t start_levels_position) { + ARROW_DCHECK_LE(levels_position_, levels_written_); + ARROW_DCHECK_LE(start_levels_position, levels_position_); + ARROW_DCHECK_GT(this->max_def_level_, 0); + ARROW_DCHECK_NE(def_levels_, nullptr); + + int64_t gap = levels_position_ - start_levels_position; + if (gap == 0) return; + + int64_t levels_remaining = levels_written_ - gap; + + auto left_shift = [&](ResizableBuffer* buffer) { + int16_t* data = reinterpret_cast(buffer->mutable_data()); + std::copy(data + levels_position_, data + levels_written_, + data + start_levels_position); + PARQUET_THROW_NOT_OK(buffer->Resize(levels_remaining * sizeof(int16_t), + /*shrink_to_fit=*/false)); + }; + + left_shift(def_levels_.get()); + + if (this->max_rep_level_ > 0) { + ARROW_DCHECK_NE(rep_levels_, nullptr); + left_shift(rep_levels_.get()); + } + + levels_written_ -= gap; + levels_position_ -= gap; + levels_capacity_ -= gap; + } + + + int64_t SkipRecords(int64_t num_records) override { + throw ::parquet::ParquetException("SkipRecords not implemented yet"); + } + + // We may outwardly have the appearance of having exhausted a column chunk + // when in fact we are in the middle of processing the last batch + bool has_values_to_process() const { return levels_position_ < levels_written_; } + + // Process written repetition/definition levels to reach the end of + // records. Only used for repeated fields. + // Process no more levels than necessary to delimit the indicated + // number of logical records. Updates internal state of RecordReader + // + // \return Number of records delimited + int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) { + int64_t values_to_read = 0; + int64_t records_read = 0; + + const int16_t* def_levels = this->def_levels() + levels_position_; + const int16_t* rep_levels = this->rep_levels() + levels_position_; + + DCHECK_GT(this->max_rep_level_, 0); + + // Count logical records and number of values to read + while (levels_position_ < levels_written_) { + const int16_t rep_level = *rep_levels++; + if (rep_level == 0) { + // If at_record_start_ is true, we are seeing the start of a record + // for the second time, such as after repeated calls to + // DelimitRecords. In this case we must continue until we find + // another record start or exhausting the ColumnChunk + if (!at_record_start_) { + // We've reached the end of a record; increment the record count. + ++records_read; + if (records_read == num_records) { + // We've found the number of records we were looking for. Set + // at_record_start_ to true and break + at_record_start_ = true; + break; + } + } + } + // We have decided to consume the level at this position; therefore we + // must advance until we find another record boundary + at_record_start_ = false; + + const int16_t def_level = *def_levels++; + if (def_level == this->max_def_level_) { + ++values_to_read; + } + ++levels_position_; + } + *values_seen = values_to_read; + return records_read; + } + + void Reserve(int64_t capacity) override { + ReserveLevels(capacity); + ReserveValues(capacity); + InitVec(capacity); + } + + virtual void InitVec(int64_t capacity) { + vec_ = new Vector(capacity); + if (parquet_vec_ != nullptr) { + auto capacity_bytes = capacity * byte_width_; + memset(parquet_vec_, 0, capacity_bytes); + } else { + auto capacity_bytes = capacity * byte_width_; + parquet_vec_ = new uint8_t[capacity_bytes]; + } + // Init nulls + if (nullable_values_) { + nulls_ = unsafe::UnsafeBaseVector::GetNulls(vec_); + } + } + + + int64_t UpdateCapacity(int64_t capacity, int64_t size, int64_t extra_size) { + if (extra_size < 0) { + throw ::parquet::ParquetException("Negative size (corrupt file?)"); + } + int64_t target_size = -1; + if (::arrow::internal::AddWithOverflow(size, extra_size, &target_size)) { + throw ::parquet::ParquetException("Allocation size too large (corrupt file?)"); + } + if (target_size >= (1LL << 62)) { + throw ::parquet::ParquetException("Allocation size too large (corrupt file?)"); + } + if (capacity >= target_size) { + return capacity; + } + return ::arrow::bit_util::NextPower2(target_size); + } + + void ReserveLevels(int64_t extra_levels) { + if (this->max_def_level_ > 0) { + const int64_t new_levels_capacity = + UpdateCapacity(levels_capacity_, levels_written_, extra_levels); + if (new_levels_capacity > levels_capacity_) { + constexpr auto kItemSize = static_cast(sizeof(int16_t)); + int64_t capacity_in_bytes = -1; + if (::arrow::internal::MultiplyWithOverflow(new_levels_capacity, kItemSize, &capacity_in_bytes)) { + throw ::parquet::ParquetException("Allocation size too large (corrupt file?)"); + } + PARQUET_THROW_NOT_OK( + def_levels_->Resize(capacity_in_bytes, /*shrink_to_fit=*/false)); + if (this->max_rep_level_ > 0) { + PARQUET_THROW_NOT_OK( + rep_levels_->Resize(capacity_in_bytes, /*shrink_to_fit=*/false)); + } + levels_capacity_ = new_levels_capacity; + } + } + } + + void ReserveValues(int64_t extra_values) { + const int64_t new_values_capacity = + UpdateCapacity(values_capacity_, values_written_, extra_values); + if (new_values_capacity > values_capacity_) { + // XXX(wesm): A hack to avoid memory allocation when reading directly + // into builder classes + if (uses_values_) { + PARQUET_THROW_NOT_OK(values_->Resize(bytes_for_values(new_values_capacity), + /*shrink_to_fit=*/false)); + } + values_capacity_ = new_values_capacity; + } + } + + void Reset() override { + ResetValues(); + if (levels_written_ > 0) { + // Throw away levels from 0 to levels_position_. + ThrowAwayLevels(0); + } + + vec_ = nullptr; + } + + void SetPageReader(std::unique_ptr<::parquet::PageReader> reader) override { + at_record_start_ = true; + this->pager_ = std::move(reader); + ResetDecoders(); + } + + bool HasMoreData() const override { return this->pager_ != nullptr; } + + const ::parquet::ColumnDescriptor* descr() const { return this->descr_; } + + // Dictionary decoders must be reset when advancing row groups + void ResetDecoders() { this->decoders_.clear(); } + + virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { + int64_t num_decoded = this->current_decoder_->DecodeSpaced( + ValuesHead(), static_cast(values_with_nulls), + static_cast(null_count), nulls_ + values_written_); + CheckNumberDecoded(num_decoded, values_with_nulls); + } + + virtual void ReadValuesDense(int64_t values_to_read) { + int64_t num_decoded = + this->current_decoder_->Decode(ValuesHead(), static_cast(values_to_read)); + CheckNumberDecoded(num_decoded, values_to_read); + } + + // Return number of logical records read. + int64_t ReadRecordData(int64_t num_records) { + // Conservative upper bound + const int64_t possible_num_values = + std::max(num_records, levels_written_ - levels_position_); + ReserveValues(possible_num_values); + + const int64_t start_levels_position = levels_position_; + + int64_t records_read = 0; + int64_t values_to_read = 0; + if (this->max_rep_level_ > 0) { + records_read = DelimitRecords(num_records, &values_to_read); + } else if (this->max_def_level_ > 0) { + records_read = std::min(levels_written_ - levels_position_, num_records); + levels_position_ += records_read; + } else { + records_read = values_to_read = num_records; + } + + int64_t null_count = 0; + if (leaf_info_.HasNullableValues()) { + int64_t values_read = 0; + DefLevelsToNulls(def_levels() + start_levels_position, levels_position_ - start_levels_position, leaf_info_, + &values_read, &null_count, nulls_ + start_levels_position); + values_to_read = values_read - null_count; + DCHECK_GE(values_to_read, 0); + ReadValuesSpaced(values_read, null_count); + } else { + DCHECK_GE(values_to_read, 0); + ReadValuesDense(values_to_read); + } + + if (this->leaf_info_.def_level > 0) { + // Optional, repeated, or some mix thereof + this->ConsumeBufferedValues(levels_position_ - start_levels_position); + } else { + // Flat, non-repeated + this->ConsumeBufferedValues(values_to_read); + } + // Total values, including null spaces, if any + values_written_ += values_to_read + null_count; + null_count_ += null_count; + + return records_read; + } + + void ResetValues() { + if (values_written_ <= 0) { + return; + } + // Resize to 0, but do not shrink to fit + if (uses_values_) { + PARQUET_THROW_NOT_OK(values_->Resize(0, /*shrink_to_fit=*/false)); + } + values_written_ = 0; + values_capacity_ = 0; + null_count_ = 0; + } + + virtual BaseVector* GetBaseVec() { + if (vec_ == nullptr) { + throw ::parquet::ParquetException("BaseVector is nullptr!"); + } + auto res = dynamic_cast*>(vec_); + res->SetValues(0, Values(), values_written_); + return vec_; + } + + protected: + template + T* ValuesHead() { + return reinterpret_cast(values_->mutable_data()) + values_written_; + } + + template + T* Values() const { + return reinterpret_cast(values_->mutable_data()); + } + ::parquet::internal::LevelInfo leaf_info_; + omniruntime::vec::BaseVector* vec_ = nullptr; + uint8_t* parquet_vec_ = nullptr; + bool* nulls_ = nullptr; + int32_t byte_width_; + }; + + class ParquetShortRecordReader : public ParquetTypedRecordReader { + public: + using BASE = ParquetTypedRecordReader; + ParquetShortRecordReader(const ::parquet::ColumnDescriptor* descr, ::parquet::internal::LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : BASE(descr, leaf_info, pool) {} + + BaseVector* GetBaseVec() override { + if (vec_ == nullptr) { + throw ::parquet::ParquetException("GetBaseVec() is nullptr!"); + } + auto res = dynamic_cast *>(vec_); + auto values = Values(); + for (int i = 0; i < values_written_; i++) { + res->SetValue(i, static_cast(values[i])); + } + return vec_; + } + }; + + class ParquetIntDecimal64RecordReader : public ParquetTypedRecordReader { + public: + using BASE = ParquetTypedRecordReader; + ParquetIntDecimal64RecordReader(const ::parquet::ColumnDescriptor* descr, ::parquet::internal::LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : BASE(descr, leaf_info, pool) {} + + BaseVector* GetBaseVec() override { + if (vec_ == nullptr) { + throw ::parquet::ParquetException("GetBaseVec() is nullptr!"); + } + auto res = dynamic_cast *>(vec_); + auto values = Values(); + for (int i = 0; i < values_written_; i++) { + res->SetValue(i, static_cast(values[i])); + } + return vec_; + } + }; + + class ParquetFLBADecimal64RecordReader : public ParquetTypedRecordReader { + public: + ParquetFLBADecimal64RecordReader(const ::parquet::ColumnDescriptor* descr, ::parquet::internal::LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : ParquetTypedRecordReader(descr, leaf_info, pool) {} + + void ReadValuesDense(int64_t values_to_read) override { + auto values = ValuesHead<::parquet::FLBA>(); + int64_t num_decoded = this->current_decoder_->Decode(values, static_cast(values_to_read)); + for (int i = 0; i < num_decoded; i++) { + memcpy_s(GetParquetVecOffsetPtr(i), byte_width_, values[i].ptr, byte_width_); + } + DCHECK_EQ(num_decoded, values_to_read); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + auto values = ValuesHead<::parquet::FLBA>(); + int64_t num_decoded = this->current_decoder_->DecodeSpaced(values, static_cast(values_to_read), + static_cast(null_count), nulls_ + values_written_); + for (int i = 0; i < num_decoded; i++) { + memcpy_s(GetParquetVecOffsetPtr(i), byte_width_, values[i].ptr, byte_width_); + } + DCHECK_EQ(num_decoded, values_to_read); + } + + uint8_t* GetParquetVecOffsetPtr(int index) { + return parquet_vec_ + (index + values_written_) * byte_width_; + } + + uint8_t* GetParquetVecHeadPtr(int index) { + return parquet_vec_ + index * byte_width_; + } + + BaseVector* GetBaseVec() override { + if (vec_ == nullptr) { + throw ::parquet::ParquetException("GetBaseVector() is nullptr"); + } + for (int64_t i = 0; i < values_written_; i++) { + if (nulls_ == nullptr || !nulls_[i]) { + PARQUET_THROW_NOT_OK(RawBytesToDecimal64Bytes(GetParquetVecHeadPtr(i), byte_width_, &vec_, i)); + } + } + return vec_; + } + }; + + class ParquetFLBADecimal128RecordReader : public ParquetTypedRecordReader { + public: + ParquetFLBADecimal128RecordReader(const ::parquet::ColumnDescriptor* descr, ::parquet::internal::LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : ParquetTypedRecordReader(descr, leaf_info, pool) {} + + void ReadValuesDense(int64_t values_to_read) override { + auto values = ValuesHead<::parquet::FLBA>(); + int64_t num_decoded = this->current_decoder_->Decode(values, static_cast(values_to_read)); + for (int i = 0; i < num_decoded; i++) { + memcpy_s(GetParquetVecOffsetPtr(i), byte_width_, values[i].ptr, byte_width_); + } + DCHECK_EQ(num_decoded, values_to_read); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + auto values = ValuesHead<::parquet::FLBA>(); + int64_t num_decoded = this->current_decoder_->DecodeSpaced(values, static_cast(values_to_read), + static_cast(null_count), nulls_ + values_written_); + for (int i = 0; i < num_decoded; i++) { + memcpy_s(GetParquetVecOffsetPtr(i), byte_width_, values[i].ptr, byte_width_); + } + DCHECK_EQ(num_decoded, values_to_read); + } + + uint8_t* GetParquetVecOffsetPtr(int index) { + return parquet_vec_ + (index + values_written_) * byte_width_; + } + + uint8_t* GetParquetVecHeadPtr(int index) { + return parquet_vec_ + index * byte_width_; + } + + BaseVector* GetBaseVec() override { + if (vec_ == nullptr) { + throw ::parquet::ParquetException("GetBaseVector() is nullptr"); + } + for (int64_t i = 0; i < values_written_; i++) { + if (nulls_ == nullptr || !nulls_[i]) { + PARQUET_THROW_NOT_OK(RawBytesToDecimal128Bytes(GetParquetVecHeadPtr(i), byte_width_, &vec_, i)); + } + } + return vec_; + } + }; + + class ParquetByteArrayChunkedRecordReader : public ParquetTypedRecordReader { + public: + ParquetByteArrayChunkedRecordReader(const ::parquet::ColumnDescriptor* descr, ::parquet::internal::LevelInfo leaf_info, + ::arrow::MemoryPool* pool) + : ParquetTypedRecordReader(descr, leaf_info, pool) { + DCHECK_EQ(descr_->physical_type(), ::parquet::Type::BYTE_ARRAY); + } + + void InitVec(int64_t capacity) override { + vec_ = new Vector>(capacity); + if (nullable_values_) { + nulls_ = unsafe::UnsafeBaseVector::GetNulls(vec_); + } + } + + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(static_cast(values_to_read), + &vec_, values_written_); + CheckNumberDecoded(num_decoded, values_to_read); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = this->current_decoder_->DecodeArrow( + static_cast(values_to_read), static_cast(null_count), + nulls_, values_written_, &vec_); + CheckNumberDecoded(num_decoded, values_to_read - null_count); + } + + BaseVector* GetBaseVec() { + if (vec_ == nullptr) { + throw ::parquet::ParquetException("GetBaseVec() is nullptr"); + } + return vec_; + } + }; + + std::shared_ptr MakeRecordReader(const ::parquet::ColumnDescriptor* descr, + ::parquet::internal::LevelInfo leaf_info, ::arrow::MemoryPool* pool, + const bool read_dictionary, const std::shared_ptr<::arrow::DataType>& type); +} +#endif //SPARK_PARQUET_COLUMN_TYPE_READER_H \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp b/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp index 39c30151e3d4d81ffc8fa6fff7dc5e7766b153a0..63ef31e52f101fd442a67da6cdac9f79a005973a 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/tablescan/parquet_scan_test.cpp @@ -19,7 +19,6 @@ #include #include -#include #include "scan_test.h" #include "tablescan/ParquetReader.h" @@ -43,51 +42,50 @@ TEST(read, test_parquet_reader) const std::vector column_indices = {0, 1, 3, 6, 7, 8, 9, 10, 12}; ParquetReader *reader = new ParquetReader(); - std::string ugi = "root@sample"; + std::string ugi = "user@sample"; ObsConfig obsInfo; auto state1 = reader->InitRecordReader(filename, 1024, row_group_indices, column_indices, ugi, obsInfo); ASSERT_EQ(state1, Status::OK()); - std::shared_ptr batch; - auto state2 = reader->ReadNextBatch(&batch); + std::vector recordBatch(column_indices.size()); + long batchRowSize = 0; + auto state2 = reader->ReadNextBatch(recordBatch, &batchRowSize); ASSERT_EQ(state2, Status::OK()); - std::cout << "num_rows: " << batch->num_rows() << std::endl; - std::cout << "num_columns: " << batch->num_columns() << std::endl; - std::cout << "Print: " << batch->ToString() << std::endl; - auto pair = TransferToOmniVecs(batch); + std::cout << "num_rows: " << batchRowSize << std::endl; + std::cout << "num_columns: " << recordBatch.size() << std::endl; - BaseVector *intVector = reinterpret_cast(pair.second[0]); + BaseVector *intVector = reinterpret_cast(recordBatch[0]); auto int_result = static_cast(omniruntime::vec::VectorHelper::UnsafeGetValues(intVector)); ASSERT_EQ(*int_result, 10); - auto varCharVector = reinterpret_cast> *>(pair.second[1]); + auto varCharVector = reinterpret_cast> *>(recordBatch[1]); std::string str_expected = "varchar_1"; ASSERT_TRUE(str_expected == varCharVector->GetValue(0)); - BaseVector *longVector = reinterpret_cast(pair.second[2]); + BaseVector *longVector = reinterpret_cast(recordBatch[2]); auto long_result = static_cast(omniruntime::vec::VectorHelper::UnsafeGetValues(longVector)); ASSERT_EQ(*long_result, 10000); - BaseVector *doubleVector = reinterpret_cast(pair.second[3]); + BaseVector *doubleVector = reinterpret_cast(recordBatch[3]); auto double_result = static_cast(omniruntime::vec::VectorHelper::UnsafeGetValues(doubleVector)); ASSERT_EQ(*double_result, 1111.1111); - BaseVector *nullVector = reinterpret_cast(pair.second[4]); + BaseVector *nullVector = reinterpret_cast(recordBatch[4]); ASSERT_TRUE(nullVector->IsNull(0)); - BaseVector *decimal64Vector = reinterpret_cast(pair.second[5]); + BaseVector *decimal64Vector = reinterpret_cast(recordBatch[5]); auto decimal64_result = static_cast(omniruntime::vec::VectorHelper::UnsafeGetValues(decimal64Vector)); ASSERT_EQ(*decimal64_result, 13111110); - BaseVector *booleanVector = reinterpret_cast(pair.second[6]); + BaseVector *booleanVector = reinterpret_cast(recordBatch[6]); auto boolean_result = static_cast(omniruntime::vec::VectorHelper::UnsafeGetValues(booleanVector)); ASSERT_EQ(*boolean_result, true); - BaseVector *smallintVector = reinterpret_cast(pair.second[7]); + BaseVector *smallintVector = reinterpret_cast(recordBatch[7]); auto smallint_result = static_cast(omniruntime::vec::VectorHelper::UnsafeGetValues(smallintVector)); ASSERT_EQ(*smallint_result, 11); - BaseVector *dateVector = reinterpret_cast(pair.second[8]); + BaseVector *dateVector = reinterpret_cast(recordBatch[8]); auto date_result = static_cast(omniruntime::vec::VectorHelper::UnsafeGetValues(dateVector)); omniruntime::type::Date32 date32(*date_result); char chars[11]; @@ -107,23 +105,32 @@ TEST(read, test_parquet_reader) delete dateVector; } -TEST(read, test_decimal128_copy) +TEST(read, test_varchar) { - auto decimal_type = arrow::decimal(20, 1); - arrow::Decimal128Builder builder(decimal_type); - arrow::Decimal128 value(20230420); - auto s1 = builder.Append(value); - std::shared_ptr array; - auto s2 = builder.Finish(&array); - - int omniTypeId = 0; - uint64_t omniVecId = 0; - spark::reader::CopyToOmniVec(decimal_type, omniTypeId, omniVecId, array); - - BaseVector *decimal128Vector = reinterpret_cast(omniVecId); - auto decimal128_result = - static_cast(omniruntime::vec::VectorHelper::UnsafeGetValues(decimal128Vector)); - ASSERT_TRUE((*decimal128_result).ToString() == "20230420"); - - delete decimal128Vector; + std::string filename = "/../../../java/src/test/java/com/huawei/boostkit/spark/jni/parquetsrc/date_dim.parquet"; + filename = PROJECT_PATH + filename; + const std::vector row_group_indices = {0}; + const std::vector column_indices = {23, 24, 25, 26, 27}; + ParquetReader *reader = new ParquetReader(); + std::string ugi = "user@sample"; + ObsConfig obsInfo; + auto state1 = reader->InitRecordReader(filename, 4096, row_group_indices, column_indices, ugi, obsInfo); + ASSERT_EQ(state1, Status::OK()); + int total_nums = 0; + int iter = 0; + while (true) { + std::vector recordBatch(column_indices.size()); + long batchRowSize = 0; + auto state2 = reader->ReadNextBatch(recordBatch, &batchRowSize); + if (batchRowSize == 0) { + break; + } + total_nums += batchRowSize; + std::cout << iter++ << " num rows: " << batchRowSize << std::endl; + for (auto vec : recordBatch) { + delete vec; + } + recordBatch.clear(); + } + std::cout << "total nums: " << total_nums << std::endl; } \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java index c45f33bb50a565d0937304817d9a8f6daf86e3e3..a59d63a524327ffe513a10b12a787503011fab4c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/com/huawei/boostkit/spark/jni/ParquetColumnarBatchJniReader.java @@ -19,11 +19,18 @@ package com.huawei.boostkit.spark.jni; import com.huawei.boostkit.spark.ObsConf; -import nova.hetu.omniruntime.type.DataType; import nova.hetu.omniruntime.vector.*; -import org.apache.spark.sql.catalyst.util.RebaseDateTime; - +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,56 +60,40 @@ public class ParquetColumnarBatchJniReader { return parquetReader; } - public int next(Vec[] vecList) { + public int next(Vec[] vecList, List types) { int vectorCnt = vecList.length; - int[] typeIds = new int[vectorCnt]; long[] vecNativeIds = new long[vectorCnt]; - long rtn = recordReaderNext(parquetReader, typeIds, vecNativeIds); + long rtn = recordReaderNext(parquetReader, vecNativeIds); if (rtn == 0) { return 0; } - int nativeGetId = 0; for (int i = 0; i < vectorCnt; i++) { - switch (DataType.DataTypeId.values()[typeIds[nativeGetId]]) { - case OMNI_BOOLEAN: { - vecList[i] = new BooleanVec(vecNativeIds[nativeGetId]); - break; - } - case OMNI_SHORT: { - vecList[i] = new ShortVec(vecNativeIds[nativeGetId]); - break; - } - case OMNI_DATE32: { - vecList[i] = new IntVec(vecNativeIds[nativeGetId]); - break; - } - case OMNI_INT: { - vecList[i] = new IntVec(vecNativeIds[nativeGetId]); - break; - } - case OMNI_LONG: - case OMNI_DECIMAL64: { - vecList[i] = new LongVec(vecNativeIds[nativeGetId]); - break; - } - case OMNI_DOUBLE: { - vecList[i] = new DoubleVec(vecNativeIds[nativeGetId]); - break; - } - case OMNI_VARCHAR: { - vecList[i] = new VarcharVec(vecNativeIds[nativeGetId]); - break; - } - case OMNI_DECIMAL128: { - vecList[i] = new Decimal128Vec(vecNativeIds[nativeGetId]); - break; - } - default: { - throw new RuntimeException("UnSupport type for ColumnarFileScan:" + - DataType.DataTypeId.values()[typeIds[i]]); + DataType type = types.get(i); + if (type instanceof LongType) { + vecList[i] = new LongVec(vecNativeIds[i]); + } else if (type instanceof BooleanType) { + vecList[i] = new BooleanVec(vecNativeIds[i]); + } else if (type instanceof ShortType) { + vecList[i] = new ShortVec(vecNativeIds[i]); + } else if (type instanceof IntegerType) { + vecList[i] = new IntVec(vecNativeIds[i]); + } else if (type instanceof DecimalType) { + if (DecimalType.is64BitDecimalType(type)) { + vecList[i] = new LongVec(vecNativeIds[i]); + } else { + vecList[i] = new Decimal128Vec(vecNativeIds[i]); } + } else if (type instanceof DoubleType) { + vecList[i] = new DoubleVec(vecNativeIds[i]); + } else if (type instanceof StringType) { + vecList[i] = new VarcharVec(vecNativeIds[i]); + } else if (type instanceof DateType) { + vecList[i] = new IntVec(vecNativeIds[i]); + } else if (type instanceof ByteType) { + vecList[i] = new VarcharVec(vecNativeIds[i]); + } else { + throw new RuntimeException("Unsupport type for ColumnarFileScan: " + type.typeName()); } - nativeGetId++; } return (int)rtn; } @@ -113,7 +104,7 @@ public class ParquetColumnarBatchJniReader { public native long initializeReader(JSONObject job); - public native long recordReaderNext(long parquetReader, int[] typeId, long[] vecNativeId); + public native long recordReaderNext(long parquetReader, long[] vecNativeId); public native void recordReaderClose(long parquetReader); diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/datasources/parquet/OmniParquetColumnarBatchReader.java b/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/datasources/parquet/OmniParquetColumnarBatchReader.java index 3aa70dfeedbe93910b784d565e64cb82ec13c5ff..6a89750ad40b5105c7a0e68cc29015621ae78fac 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/datasources/parquet/OmniParquetColumnarBatchReader.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/datasources/parquet/OmniParquetColumnarBatchReader.java @@ -54,6 +54,7 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; import org.apache.spark.sql.execution.vectorized.OmniColumnVector; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType$; @@ -86,6 +87,8 @@ public class OmniParquetColumnarBatchReader extends RecordReader types = new ArrayList<>(); private boolean isFilterPredicate = false; public OmniParquetColumnarBatchReader(int capacity, ParquetMetadata fileFooter) { @@ -242,6 +245,7 @@ public class OmniParquetColumnarBatchReader extends RecordReader types; + @Before public void setUp() throws Exception { parquetColumnarBatchJniReader = new ParquetColumnarBatchJniReader(); @@ -45,6 +51,9 @@ public class ParquetColumnarBatchJniReaderTest extends TestCase { rowGroupIndices.add(0); List columnIndices = new ArrayList<>(); Collections.addAll(columnIndices, 0, 1, 3, 6, 7, 8, 9, 10, 12); + types = new ArrayList<>(); + Collections.addAll(types, IntegerType, StringType, LongType, DoubleType, createDecimalType(9, 8), + createDecimalType(18, 5), BooleanType, ShortType, DateType); File file = new File("../cpp/test/tablescan/resources/parquet_data_all_type"); String path = file.getAbsolutePath(); parquetColumnarBatchJniReader.initializeReaderJava(path, 100000, rowGroupIndices, columnIndices, "root@sample"); @@ -61,7 +70,7 @@ public class ParquetColumnarBatchJniReaderTest extends TestCase { @Test public void testRead() { - long num = parquetColumnarBatchJniReader.next(vecs); + long num = parquetColumnarBatchJniReader.next(vecs, types); assertTrue(num == 1); } }