From 980683fc80ff25fbebf0dc65ed71d5da38ea122f Mon Sep 17 00:00:00 2001 From: zhousipei Date: Thu, 6 Feb 2025 20:09:07 +0800 Subject: [PATCH 1/6] remove redundant rle code --- .../cpp/src/orcfile/OmniRLEv2.cc | 123 ++++-------------- .../cpp/src/orcfile/OmniRLEv2.hh | 59 ++------- 2 files changed, 36 insertions(+), 146 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc index a5c515d3c..f91de1bd2 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc @@ -140,7 +140,7 @@ namespace omniruntime::reader { runLength += 1; runRead = 0; - readLongsByType(OmniVec, 0, runLength, offset , numValues, bitSize, dataTypeId, notNull); + readLongs(literals.data(), 0, runLength, bitSize); if (isSigned) { for (uint64_t i = 0; i < runLength; ++i) { @@ -372,7 +372,7 @@ namespace omniruntime::reader { base = -base; } - orc::RleDecoderV2::readLongs(literals.data(), 0, runLength, bitSize); + readLongs(literals.data(), 0, runLength, bitSize); // any remaining bits are thrown out resetReadLongs(); @@ -385,7 +385,7 @@ namespace omniruntime::reader { "(patchBitSize + pgw > 64)!"); } uint32_t cfb = orc::getClosestFixedBits(patchBitSize + pgw); - orc::RleDecoderV2::readLongs(unpackedPatch.data(), 0, pl, cfb); + readLongs(unpackedPatch.data(), 0, pl, cfb); // any remaining bits are thrown out resetReadLongs(); @@ -515,7 +515,7 @@ namespace omniruntime::reader { // value to result buffer. if the delta base value is negative then it // is a decreasing sequence else an increasing sequence. // read deltas using the literals buffer. - orc::RleDecoderV2::readLongs(literals.data(), 2, runLength - 2, bitSize); + readLongs(literals.data(), 2, runLength - 2, bitSize); if (deltaBase < 0) { for (uint64_t i = 2; i < runLength; ++i) { @@ -532,79 +532,33 @@ namespace omniruntime::reader { return copyDataFromBufferByType(OmniVec, offset, numValues, notNull, dataTypeId); } - void OmniRleDecoderV2::readLongsByType(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t numValues, uint64_t fbs, - omniruntime::type::DataTypeId dataTypeId, const char* const notNull) { - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_SHORT: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_INT: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_LONG: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_DATE32: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_DATE64: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_DOUBLE: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("copyDataFromBuffer_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("copyDataFromBuffer_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("copyDataFromBuffer_type DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("copyDataFromBuffer_type DECIMAL128 should not in here!!!"); - default: - printf("copyDataFromBuffer switch no process!!!"); - } - - return; - } - - template - void OmniRleDecoderV2::readLongs(omniruntime::vec::BaseVector*& OmniVec, int64_t *data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t numValues, uint64_t fbs, - const char* const notNull) { + void OmniRleDecoderV2::readLongs(int64_t *data, uint64_t offset, uint64_t len, uint64_t fbs) { switch (fbs) { case 4: - return unrolledUnpack4(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack4(data, offset, len); case 8: - return unrolledUnpack8(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack8(data, offset, len); case 16: - return unrolledUnpack16(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack16(data, offset, len); case 24: - return unrolledUnpack24(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack24(data, offset, len); case 32: - return unrolledUnpack32(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack32(data, offset, len); case 40: - return unrolledUnpack40(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack40(data, offset, len); case 48: - return unrolledUnpack48(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack48(data, offset, len); case 56: - return unrolledUnpack56(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack56(data, offset, len); case 64: - return unrolledUnpack64(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack64(data, offset, len); default: // Fallback to the default implementation for deprecated bit size. - return plainUnpackLongs(OmniVec, data, offset, len, omniOffset, numValues, notNull, fbs); + return plainUnpackLongs(data, offset, len, fbs); } } - template - void OmniRleDecoderV2::plainUnpackLongs(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull, uint64_t fbs) { + void OmniRleDecoderV2::plainUnpackLongs(int64_t *data, uint64_t offset, uint64_t len, uint64_t fbs) { for (uint64_t i = offset; i < (offset + len); i++) { uint64_t result = 0; uint64_t bitsLeftToRead = fbs; @@ -629,10 +583,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack64(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack64(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -671,10 +622,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack56(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack56(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -711,10 +659,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack48(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack48(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -749,10 +694,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack40(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack40(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -785,10 +727,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack32(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack32(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -820,10 +759,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack24(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack24(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -853,10 +789,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack16(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack16(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -884,10 +817,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack8(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack8(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -907,10 +837,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack4(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack4(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Make sure bitsLeft is 0 before the loop. bitsLeft can only be 0, 4, or 8. diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh index d7377958e..133a18517 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh @@ -96,64 +96,27 @@ namespace omniruntime::reader { uint64_t copyDataFromBuffer(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, uint64_t numValues, const char* notNull); - void readLongsByType(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t numValues, uint64_t fbs, - omniruntime::type::DataTypeId dataTypeId, const char* const notNull); + void readLongs(int64_t *data, uint64_t offset, uint64_t len, uint64_t fbs); - template - void readLongs(omniruntime::vec::BaseVector*& OmniVec, int64_t *data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t numValues, uint64_t fbs, - const char* const notNull); - - template - void unrolledUnpack4(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); + void unrolledUnpack4(int64_t *data, uint64_t offset, uint64_t len); - template - void unrolledUnpack8(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); + void unrolledUnpack8(int64_t *data, uint64_t offset, uint64_t len); - template - void unrolledUnpack16(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); + void unrolledUnpack16(int64_t *data, uint64_t offset, uint64_t len); - template - void unrolledUnpack24(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); + void unrolledUnpack24(int64_t *data, uint64_t offset, uint64_t len); - template - void unrolledUnpack32(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); + void unrolledUnpack32(int64_t *data, uint64_t offset, uint64_t len); - template - void unrolledUnpack40(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); + void unrolledUnpack40(int64_t *data, uint64_t offset, uint64_t len); - template - void unrolledUnpack48(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); + void unrolledUnpack48(int64_t *data, uint64_t offset, uint64_t len); - template - void unrolledUnpack56(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); + void unrolledUnpack56(int64_t *data, uint64_t offset, uint64_t len); - template - void unrolledUnpack64(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); + void unrolledUnpack64(int64_t *data, uint64_t offset, uint64_t len); - template - void plainUnpackLongs(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull, uint64_t fbs); + void plainUnpackLongs(int64_t *data, uint64_t offset, uint64_t len, uint64_t fbs); }; } -- Gitee From 77e2d44cf55c1b8b76c92f1cd38abb33665e2d26 Mon Sep 17 00:00:00 2001 From: zhousipei Date: Fri, 7 Feb 2025 10:16:28 +0800 Subject: [PATCH 2/6] clean code --- .../cpp/src/orcfile/OmniRLEv2.cc | 65 +++++++++++++------ .../cpp/src/orcfile/OmniRLEv2.hh | 8 ++- 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc index f91de1bd2..b4e1684f7 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc @@ -22,23 +22,23 @@ using omniruntime::vec::VectorBatch; namespace omniruntime::reader { - std::unique_ptr makeFixLenthVector(uint64_t numValues, + std::unique_ptr makeFixedLengthVector(uint64_t numValues, omniruntime::type::DataTypeId dataTypeId) { switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return std::make_unique>(numValues); - case omniruntime::type::OMNI_SHORT: - return std::make_unique>(numValues); - case omniruntime::type::OMNI_INT: - return std::make_unique>(numValues); - case omniruntime::type::OMNI_LONG: - return std::make_unique>(numValues); - case omniruntime::type::OMNI_DATE32: - return std::make_unique>(numValues); - case omniruntime::type::OMNI_DATE64: - return std::make_unique>(numValues); - default: - throw std::runtime_error("Not support for this type: " + dataTypeId); + case omniruntime::type::OMNI_BOOLEAN: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_SHORT: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_INT: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_LONG: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_DATE32: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_DATE64: + return std::make_unique>(numValues); + default: + throw std::runtime_error("MakeFixedLengthVector Not support for this type: " + dataTypeId); } } @@ -48,7 +48,31 @@ namespace omniruntime::reader { case omniruntime::type::OMNI_DOUBLE: return std::make_unique>(numValues); default: - throw std::runtime_error("Not support double vector for this type: " + dataTypeId); + throw std::runtime_error("MakeDoubleVector Not support double vector for this type: " + dataTypeId); + } + } + + std::unique_ptr makeDecimalVector(uint64_t numValues, + omniruntime::type::DataTypeId dataTypeId) { + switch (dataTypeId) { + case omniruntime::type::OMNI_DECIMAL64: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_DECIMAL128: + return std::make_unique>(numValues); + default: + throw std::runtime_error("makeDecimalVector Not support vector for this type: " + dataTypeId); + } + } + + std::unique_ptr makeVarcharVector(uint64_t numValues, + omniruntime::type::DataTypeId dataTypeId) { + switch (dataTypeId) { + case omniruntime::type::OMNI_CHAR: + case omniruntime::type::OMNI_VARCHAR: + return std::make_unique>>(numValues); + default: + throw std::runtime_error("MakeVarcharVector Not support vector for this type: " + dataTypeId); } } @@ -62,18 +86,17 @@ namespace omniruntime::reader { case orc::TypeKind::TIMESTAMP: case orc::TypeKind::TIMESTAMP_INSTANT: case orc::TypeKind::LONG: - return makeFixLenthVector(numValues, dataTypeId); + return makeFixedLengthVector(numValues, dataTypeId); case orc::TypeKind::DOUBLE: return makeDoubleVector(numValues, dataTypeId); case orc::TypeKind::CHAR: - throw std::runtime_error("CHAR not finished!!!"); case orc::TypeKind::STRING: case orc::TypeKind::VARCHAR: - throw std::runtime_error("VARCHAR not finished!!!"); + return makeVarcharVector(numValues, dataTypeId); case orc::TypeKind::DECIMAL: - throw std::runtime_error("DECIMAL should not in here!!!"); + return makeDecimalVector(numValues, dataTypeId); default: { - throw std::runtime_error("Not support For This Type: " + baseTp->getKind()); + throw std::runtime_error("Not support For This ORC Type: " + baseTp->getKind()); } } } diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh index 133a18517..ea0112cff 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh @@ -25,12 +25,18 @@ namespace omniruntime::reader { - std::unique_ptr makeFixLenthVector(uint64_t numValues, + std::unique_ptr makeFixedLengthVector(uint64_t numValues, omniruntime::type::DataTypeId dataTypeId); std::unique_ptr makeDoubleVector(uint64_t numValues, omniruntime::type::DataTypeId dataTypeId); + std::unique_ptr makeVarcharVector(uint64_t numValues, + omniruntime::type::DataTypeId dataTypeId); + + std::unique_ptr makeDecimalVector(uint64_t numValues, + omniruntime::type::DataTypeId dataTypeId); + std::unique_ptr makeNewVector(uint64_t numValues, const orc::Type* baseTp, omniruntime::type::DataTypeId dataTypeId); -- Gitee From d6dfcfa7eb1e2fffb9d10722c0d8df5555cd5cd3 Mon Sep 17 00:00:00 2001 From: zhousipei Date: Fri, 7 Feb 2025 18:05:48 +0800 Subject: [PATCH 3/6] refactor nulls --- .../cpp/src/orcfile/OmniColReader.cc | 698 ++++++++---------- .../cpp/src/orcfile/OmniColReader.hh | 131 ++-- 2 files changed, 384 insertions(+), 445 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc index 133511838..009a56f23 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc @@ -25,6 +25,8 @@ #include "util/omni_exception.h" using omniruntime::vec::VectorBatch; +using omniruntime::vec::BaseVector; +using omniruntime::exception::OmniException; using orc::ColumnReader; using orc::ByteRleDecoder; using orc::Type; @@ -43,13 +45,13 @@ namespace omniruntime::reader { switch (static_cast(kind)) { case orc::proto::ColumnEncoding_Kind_DIRECT: case orc::proto::ColumnEncoding_Kind_DICTIONARY: - return orc::RleVersion_1; + return orc::RleVersion_1; case orc::proto::ColumnEncoding_Kind_DIRECT_V2: case orc::proto::ColumnEncoding_Kind_DICTIONARY_V2: - return orc::RleVersion_2; - default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", - "Unknown encoding in omniConvertRleVersion"); + return orc::RleVersion_2; + default: + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "Unknown encoding in omniConvertRleVersion"); } } @@ -57,26 +59,64 @@ namespace omniruntime::reader { RleVersion version, MemoryPool& pool) { switch (static_cast(version)) { case orc::RleVersion_1: - // should not use - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "RleVersion_1 should not use!!!"); + // should not use + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "RleVersion_1 Not supported yet"); case orc::RleVersion_2: - return std::unique_ptr(new OmniRleDecoderV2(std::move(input), - isSigned, pool)); - default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "Not implemented yet"); + return std::unique_ptr(new OmniRleDecoderV2(std::move(input), isSigned, pool)); + default: + throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "Not implemented yet"); } } - std::unique_ptr createOmniBooleanRleDecoder - (std::unique_ptr input) { + std::unique_ptr createOmniBooleanRleDecoder(std::unique_ptr input) { OmniBooleanRleDecoder* decoder = new OmniBooleanRleDecoder(std::move(input)); return std::unique_ptr(reinterpret_cast(decoder)); } - std::unique_ptr createOmniByteRleDecoder - (std::unique_ptr input) { + std::unique_ptr createOmniByteRleDecoder(std::unique_ptr input) { return std::unique_ptr(new OmniByteRleDecoder(std::move(input))); } + + OmniColumnReader::OmniColumnReader(const Type &type, StripeStreams &stripe) + : ColumnReader(type, stripe) { + std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_PRESENT, true); + if (stream.get()) { + notNullDecoder = createOmniBooleanRleDecoder(std::move(stream)); + } + } + + uint64_t OmniColumnReader::skip(uint64_t numValues) { + OmniByteRleDecoder *decoder = notNullDecoder.get(); + if (decoder) { + // pass through the values that we want to skip and count how many are non-null + const uint64_t MAX_BUFFER_SIZE = 32768; + uint64_t bufferSize = std::min(MAX_BUFFER_SIZE, numValues); + // buffer, 0: null; 1: non-null + char buffer[MAX_BUFFER_SIZE]; + uint64_t remaining = numValues; + while (remaining > 0) { + uint64_t chunkSize = std::min(remaining, bufferSize); + decoder->next(buffer, chunkSize, nullptr); + remaining -= chunkSize; + // update non-null count + for (uint64_t i = 0; i < chunkSize; i++) { + if (!buffer[i]) { + // minus null + numValues -= 1; + } + } + } + } + return numValues; + } + + void OmniColumnReader::seekToRowGroup(std::unordered_map &positions) { + if (notNullDecoder.get()) { + notNullDecoder->seek(positions.at(columnId)); + } + } + /** * Create a reader for the given stripe. @@ -88,91 +128,71 @@ namespace omniruntime::reader { case orc::INT: case orc::LONG: case orc::SHORT: - return std::unique_ptr( - new OmniIntegerColumnReader(type, stripe)); + return std::make_unique(type, stripe); case orc::BINARY: case orc::CHAR: case orc::STRING: case orc::VARCHAR: - switch (static_cast(stripe.getEncoding(type.getColumnId()).kind())){ + switch (static_cast(stripe.getEncoding(type.getColumnId()).kind())) { case orc::proto::ColumnEncoding_Kind_DICTIONARY: case orc::proto::ColumnEncoding_Kind_DICTIONARY_V2: - return std::unique_ptr( - new OmniStringDictionaryColumnReader(type, stripe)); + return std::make_unique(type, stripe); case orc::proto::ColumnEncoding_Kind_DIRECT: case orc::proto::ColumnEncoding_Kind_DIRECT_V2: - return std::unique_ptr( - new OmniStringDirectColumnReader(type, stripe)); + return std::make_unique(type, stripe); default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", - "omniBuildReader unhandled string encoding"); - } + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "omniBuildReader unhandled string encoding"); + } - case orc::BOOLEAN: - return std::unique_ptr(new OmniBooleanColumnReader(type, stripe)); + case orc::BOOLEAN: + return std::make_unique(type, stripe); - case orc::BYTE: - return std::unique_ptr(new OmniByteColumnReader(type, stripe)); + case orc::BYTE: + return std::make_unique(type, stripe); case orc::STRUCT: - return std::unique_ptr( - new OmniStructColumnReader(type, stripe, julianPtr)); + return std::make_unique(type, stripe, julianPtr); case orc::TIMESTAMP: return std::unique_ptr (new OmniTimestampColumnReader(type, stripe, false, julianPtr)); case orc::TIMESTAMP_INSTANT: - return std::unique_ptr - (new OmniTimestampColumnReader(type, stripe, true, julianPtr)); + return std::make_unique(type, stripe, true, julianPtr); case orc::DECIMAL: - // Is this a Hive 0.11 or 0.12 file? - if (type.getPrecision() == 0) { - return std::unique_ptr - (new OmniDecimalHive11ColumnReader(type, stripe)); - } else if (type.getPrecision() <= - OmniDecimal64ColumnReader::MAX_PRECISION_64) { - return std::unique_ptr - (new OmniDecimal64ColumnReader(type, stripe)); - } else { - return std::unique_ptr - (new OmniDecimal128ColumnReader(type, stripe)); - } + // Is this a Hive 0.11 or 0.12 file? + if (type.getPrecision() == 0) { + return std::make_unique(type, stripe); + } else if (type.getPrecision() <= OmniDecimal64ColumnReader::MAX_PRECISION_64) { + return std::make_unique(type, stripe); + } else { + return std::make_unique(type, stripe); + } - case orc::FLOAT: - case orc::DOUBLE: - return std::unique_ptr( - new OmniDoubleColumnReader(type, stripe)); + case orc::FLOAT: + case orc::DOUBLE: + return std::make_unique(type, stripe); - default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "omniBuildReader unhandled type"); + default: + throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "omniBuildReader unhandled type"); } } - inline void readNulls(ColumnReader* colReader, uint64_t numValues, char* incomingMask, char* nulls, bool& hasNull) { - ByteRleDecoder* decoder = colReader->notNullDecoder.get(); - // TO do 需要将char*转换为bool*数组, 可以优化 + inline void readNulls(OmniColumnReader *colReader, uint64_t numValues, bool *incomingNulls, bool *nulls) { + OmniByteRleDecoder* decoder = reinterpret_cast(colReader->notNullDecoder.get()); + if (decoder) { - decoder->next(nulls, numValues, incomingMask); + decoder->nextNulls(nulls, numValues, incomingNulls); // check to see if there are nulls in this batch - for(uint64_t i=0; i < numValues; ++i) { - auto ptr = nulls; - if (!(ptr[i])) { - // To do hasNull is protected - hasNull = true; - return; - } } - } else if (incomingMask) { - // if we don't have a notNull stream, copy the incomingMask + } else if (incomingNulls) { + // if we don't have a notNull stream, copy the incomingNulls // To do finished - hasNull = true; - memcpy_s(nulls, numValues, incomingMask, numValues); + memcpy_s(nulls, numValues, incomingNulls, numValues); return; } - // To do hasNull is protected - hasNull = false; } void scaleInt128(orc::Int128& value, uint32_t scale, uint32_t currentScale) { @@ -233,28 +253,29 @@ namespace omniruntime::reader { * OmniStructColumnReader funcs */ OmniStructColumnReader::OmniStructColumnReader(const Type& type, StripeStreams& stripe, - common::JulianGregorianRebase *julianPtr): ColumnReader(type, stripe) { + common::JulianGregorianRebase *julianPtr): OmniColumnReader(type, stripe) { // count the number of selected sub-columns const std::vector selectedColumns = stripe.getSelectedColumns(); switch (static_cast(stripe.getEncoding(columnId).kind())) { - case orc::proto::ColumnEncoding_Kind_DIRECT: - for(unsigned int i=0; i < type.getSubtypeCount(); ++i) { + case orc::proto::ColumnEncoding_Kind_DIRECT: + for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) { const Type& child = *type.getSubtype(i); if (selectedColumns[static_cast(child.getColumnId())]) { children.push_back(omniBuildReader(child, stripe, julianPtr)); } } break; - case orc::proto::ColumnEncoding_Kind_DIRECT_V2: - case orc::proto::ColumnEncoding_Kind_DICTIONARY: - case orc::proto::ColumnEncoding_Kind_DICTIONARY_V2: + case orc::proto::ColumnEncoding_Kind_DIRECT_V2: + case orc::proto::ColumnEncoding_Kind_DICTIONARY: + case orc::proto::ColumnEncoding_Kind_DICTIONARY_V2: default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "Unknown encoding for OmniStructColumnReader"); + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "Unknown encoding for OmniStructColumnReader"); } } uint64_t OmniStructColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); for(auto& ptr : children) { ptr->skip(numValues); } @@ -262,17 +283,13 @@ namespace omniruntime::reader { } void OmniStructColumnReader::next(void *&batch, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) { + const orc::Type& baseTp, int* omniTypeId) { auto vecs = reinterpret_cast*>(batch); - nextInternal(*vecs, numValues, notNull, baseTp, omniTypeId); - } - - void OmniStructColumnReader::nextEncoded(orc::ColumnVectorBatch& rowBatch, uint64_t numValues, char *notNull) { - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "OmniStructColumnReader::nextEncoded not finished!!!"); + nextInternal(*vecs, numValues, nullptr, baseTp, omniTypeId); } void OmniStructColumnReader::seekToRowGroup(std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); for(auto& ptr : children) { ptr->seekToRowGroup(positions); @@ -281,34 +298,34 @@ namespace omniruntime::reader { template void OmniStructColumnReader::nextInternal(std::vector &vecs, uint64_t numValues, - char *notNull, const orc::Type& baseTp, int* omniTypeId) { + char *incomingNulls, const orc::Type& baseTp, int* omniTypeId) { + + if (encoded) { + std::string message("OmniStructColumnReader::nextInternal encoded is not finished!"); + throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", message); + } bool hasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, hasNull); + bool nulls[numValues]; + readNulls(this, numValues, incomingNulls, nulls); + uint64_t i = 0; - uint64_t i=0; - notNull = hasNull ? nulls : nullptr; for(auto iter = children.begin(); iter != children.end(); ++iter, ++i) { - if (encoded) { - std::string message("OmniStructColumnReader::nextInternal encoded is not finished!!!"); - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", message); + const Type* orcType = baseTp.getSubtype(i); + omniruntime::type::DataTypeId dataTypeId; + if (omniTypeId == nullptr) { + dataTypeId = getDefaultOmniType(orcType); } else { - omniruntime::vec::BaseVector* tempVec = nullptr; - const Type* type = baseTp.getSubtype(i); - if (omniTypeId == nullptr) { - int tempOmniTypeId = getOmniTypeByOrcType(type); - (*iter)->next(reinterpret_cast(tempVec), numValues, notNull, *type, &tempOmniTypeId); - vecs.push_back(tempVec); - } else { - (*iter)->next(reinterpret_cast(tempVec), numValues, notNull, *type, &omniTypeId[i]); - vecs.push_back(tempVec); - } + dataTypeId = static_cast(omniTypeId[i]); } + auto omnivector = omniruntime::reader::makeNewVector(numValues, orcType, dataTypeId); + reinterpret_cast(&(*iter->get()))->next(omnivector.get(), numValues, + hasNull ? nulls : nullptr, dataTypeId); + vecs.push_back(omnivector.release()); } } - omniruntime::type::DataTypeId OmniStructColumnReader::getOmniTypeByOrcType(const Type* type) { - constexpr int32_t OMNI_MAX_DECIMAL64_DIGITS = 18; + omniruntime::type::DataTypeId OmniStructColumnReader::getDefaultOmniType(const Type* type) { + constexpr int32_t OMNI_MAX_DECIMAL64_DIGITS = 18; switch (type->getKind()) { case orc::TypeKind::BOOLEAN: return omniruntime::type::OMNI_BOOLEAN; @@ -337,83 +354,77 @@ namespace omniruntime::reader { return omniruntime::type::OMNI_DECIMAL64; } default: - printf("no getOmniTypeByOrcType type to process!!!"); + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "Not Supported Type: " + type->getKind()); } - - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", - "OmniStructColumnReader::getOmniTypeByOrcType no type!!!"); - return omniruntime::type::OMNI_INVALID; } /** * all next funcs */ - void OmniIntegerColumnReader::next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) { - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - rle->next(reinterpret_cast(vec), numValues, HasNull ? nulls : nullptr, &baseTp, - *omniTypeId); - } - - void OmniBooleanColumnReader::next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) { - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - rle->next(reinterpret_cast(vec), numValues, HasNull ? nulls : nullptr, &baseTp, - *omniTypeId); - } - - void OmniByteColumnReader::next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) { - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - rle->next(reinterpret_cast(vec), numValues, HasNull ? nulls : nullptr, &baseTp, - *omniTypeId); - } - - void OmniTimestampColumnReader::next(void*& omnivec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) { - auto dataTypeId = static_cast(*omniTypeId); + void OmniIntegerColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + rle->next(vec, numValues, hasNull ? nulls : nullptr, omniTypeId); + } + + void OmniBooleanColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + if (omniTypeId != omniruntime::type::OMNI_BOOLEAN) { + throw OmniException("EXPRESSION_NOT_SUPPORT", "Not Supported Type: " + omniTypeId); + } + OmniBooleanRleDecoder *boolDecoder = reinterpret_cast(rle.get()); + boolDecoder->next(vec, numValues, hasNull ? nulls : nullptr, omniTypeId); + } + + void OmniByteColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + throw OmniException("EXPRESSION_NOT_SUPPORT", "Not Supported yet."); + } + + void OmniTimestampColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + auto dataTypeId = static_cast(omniTypeId); switch (dataTypeId) { - case omniruntime::type::OMNI_DATE32: - return nextByType(omnivec, numValues, notNull, baseTp, omniTypeId); - case omniruntime::type::OMNI_DATE64: - return nextByType(omnivec, numValues, notNull, baseTp, omniTypeId); - case omniruntime::type::OMNI_TIMESTAMP: - return nextByType(omnivec, numValues, notNull, baseTp, omniTypeId); + case omniruntime::type::OMNI_DATE32: { + auto intValues = omniruntime::vec::unsafe::UnsafeBaseVector::GetRawValues( + static_cast*>(vec)); + return nextByType(intValues, numValues, hasNull ? nulls : nullptr); + } + case omniruntime::type::OMNI_DATE64: { + auto longValues = omniruntime::vec::unsafe::UnsafeBaseVector::GetRawValues( + static_cast*>(vec)); + return nextByType(longValues, numValues, hasNull ? nulls : nullptr); + } + case omniruntime::type::OMNI_TIMESTAMP: { + auto longValues = omniruntime::vec::unsafe::UnsafeBaseVector::GetRawValues( + static_cast*>(vec)); + return nextByType(longValues, numValues, hasNull ? nulls : nullptr); + } default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", - "OmniTimestampColumnReader type not support!!!"); + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "OmniTimestampColumnReader type not support: " + dataTypeId); } } - template - void OmniTimestampColumnReader::nextByType(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = std::make_unique>(static_cast(numValues)); - - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - notNull = HasNull ? nulls : nullptr; + template + void OmniTimestampColumnReader::nextByType(T *data, uint64_t numValues, bool *nulls) { int64_t secsBuffer[numValues]; - secondsRle->next(secsBuffer, numValues, notNull); + secondsRle->next(secsBuffer, numValues, nulls); int64_t nanoBuffer[numValues]; - nanoRle->next(nanoBuffer, numValues, notNull); + nanoRle->next(nanoBuffer, numValues, nulls); // Construct the values - for(uint64_t i=0; i < numValues; i++) { - if (notNull == nullptr || notNull[i]) { + for(uint64_t i = 0; i < numValues; i++) { + if (nulls == nullptr || nulls[i]) { uint64_t zeros = nanoBuffer[i] & 0x7; nanoBuffer[i] >>= 3; if (zeros != 0) { - for(uint64_t j = 0; j <= zeros; ++j) { + for (uint64_t j = 0; j <= zeros; ++j) { nanoBuffer[i] *= 10; } } @@ -437,143 +448,79 @@ namespace omniruntime::reader { } if (julianPtr != nullptr) { - vec->SetValue(static_cast(i), static_cast( - julianPtr->RebaseJulianToGregorianMicros(secsBuffer[i] * 1000000L + nanoBuffer[i] / 1000L))); + data[i] = static_cast( + julianPtr->RebaseJulianToGregorianMicros(secsBuffer[i] * 1000000L + nanoBuffer[i] / 1000L)); } else { - vec->SetValue(static_cast(i), static_cast(secsBuffer[i] * 1000000L + nanoBuffer[i] / 1000L)); + data[i] = static_cast(secsBuffer[i] * 1000000L + nanoBuffer[i] / 1000L); } - } else { - vec->SetNull(static_cast(i)); } } - - omnivec = vec.release(); } - void OmniDoubleColumnReader::next(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - auto dataTypeId = static_cast(*omniTypeId); - std::unique_ptr tempOmnivec = makeNewVector(numValues, &baseTp, dataTypeId); - auto pushOmniVec = tempOmnivec.get(); + void OmniDoubleColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + auto dataTypeId = static_cast(omniTypeId); switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_SHORT: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_INT: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_LONG: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_TIMESTAMP: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DATE32: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DATE64: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DOUBLE: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("OmniDoubleColumnReader_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("OmniDoubleColumnReader_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("OmniDoubleColumnReader_type DECIMAL64 not finished!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("OmniDoubleColumnReader_type DECIMAL64 not finished!!!"); + case omniruntime::type::OMNI_DOUBLE: { + auto doubleValues = omniruntime::vec::unsafe::UnsafeBaseVector::GetRawValues( + static_cast*>(vec)); + return nextByType(doubleValues, numValues, hasNull ? nulls : nullptr); + } default: - printf("OmniDoubleColumnReader_type swtich no process!!!"); + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "OmniDoubleColumnReader type not support: " + dataTypeId); } - - omnivec = tempOmnivec.release(); } - template - void OmniDoubleColumnReader::nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp) { - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - // update the notNull from the parent class - notNull = HasNull ? nulls : nullptr; - - using namespace omniruntime::type; - using T = typename NativeType::type; - - auto vec = reinterpret_cast*>(omnivec); - + template + void OmniDoubleColumnReader::nextByType(T *data, uint64_t numValues, bool *nulls) { if (columnKind == orc::FLOAT) { - if(notNull) { - for(size_t i=0; i < numValues; ++i) { - if(notNull[i]) { - vec->SetValue(static_cast(i), static_cast(readFloat())); - } else { - vec->SetNull(i); + if(nulls) { + for(size_t i = 0; i < numValues; ++i) { + if(!nulls[i]) { + data[i] = static_cast(readFloat()); } } } else { - for(size_t i=0; i < numValues; ++i) { - vec->SetValue(static_cast(i), static_cast(readFloat())); + for(size_t i = 0; i < numValues; ++i) { + data[i] = static_cast(readFloat()); } } } else { - if (notNull) { - for(size_t i=0; i < numValues; ++i) { - if (notNull[i]) { - vec->SetValue(static_cast(i), static_cast(readDouble())); - } else { - vec->SetNull(i); + if (nulls) { + for(size_t i = 0; i < numValues; ++i) { + if (!nulls[i]) { + data[i] = static_cast(readDouble()); } } } else { - for(size_t i=0; i < numValues; ++i) { - vec->SetValue(static_cast(i), static_cast(readDouble())); + for(size_t i = 0; i < numValues; ++i) { + data[i] = static_cast(readDouble()); } } } } - void OmniStringDictionaryColumnReader::next(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - bool HasNull = false; - auto newVector = std::make_unique>>(numValues); - - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - // update the notNull from the parent class - notNull = HasNull ? nulls : nullptr; - - - bool is_char = false; - if (baseTp.getKind() == orc::TypeKind::CHAR) { - is_char = true; - } + void OmniStringDictionaryColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, + int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); char *blob = dictionary->dictionaryBlob.data(); int64_t *dictionaryOffsets = dictionary->dictionaryOffset.data(); int64_t outputLengths[numValues]; - rle->next(outputLengths, numValues, notNull); + rle->next(outputLengths, numValues, nulls); uint64_t dictionaryCount = dictionary->dictionaryOffset.size() - 1; - if (notNull) { + + auto varcharVector = reinterpret_cast>*>(vec); + if (hasNull) { for(uint64_t i=0; i < numValues; ++i) { - if (notNull[i]) { + if (!null[i]) { int64_t entry = outputLengths[i]; if (entry < 0 || static_cast(entry) >= dictionaryCount ) { throw orc::ParseError("Entry index out of range in StringDictionaryColumn"); @@ -583,13 +530,13 @@ namespace omniruntime::reader { auto len = dictionaryOffsets[entry+1] - dictionaryOffsets[entry]; char* ptr = blob + dictionaryOffsets[entry]; - if (is_char) { + if (isChar) { FindLastNotEmpty(ptr, len); } auto data = std::string_view(ptr, len); - newVector->SetValue(i, data); + varcharVector->SetValue(i, data); } else { - newVector->SetNull(i); + varcharVector->SetNull(i); } } } else { @@ -603,36 +550,24 @@ namespace omniruntime::reader { auto len = dictionaryOffsets[entry+1] - dictionaryOffsets[entry]; char* ptr = blob + dictionaryOffsets[entry]; - if (is_char) { + if (isChar) { FindLastNotEmpty(ptr, len); } auto data = std::string_view(ptr, len); - newVector->SetValue(i, data); + varcharVector->SetValue(i, data); } } - - omnivec = newVector.release(); } - void OmniStringDirectColumnReader::next(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - bool HasNull = false; - auto newVector = std::make_unique>>(numValues); - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - // update the notNull from the parent class - notNull = HasNull ? nulls : nullptr; + void OmniStringDirectColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); int64_t lengthPtr[numValues]; - bool is_char = false; - if (baseTp.getKind() == orc::TypeKind::CHAR) { - is_char = true; - } - // read the length vector - lengthRle->next(lengthPtr, numValues, notNull); + lengthRle->next(lengthPtr, numValues, nulls); // figure out the total length of data we need from the blob stream const size_t totalLength = computeSize(lengthPtr, notNull, numValues); @@ -660,22 +595,24 @@ namespace omniruntime::reader { lastBufferLength -= moreBytes; } + auto varcharVector = reinterpret_cast>*>(vec); size_t filledSlots = 0; char* tempPtr = ptr; - if (notNull) { + if (hasNull) { while (filledSlots < numValues) { - if (notNull[filledSlots]) { + if (!nulls[filledSlots]) { //求出长度,如果为char,则需要去除最后的空格 auto len = lengthPtr[filledSlots]; - if (is_char) { + if (isChar) { FindLastNotEmpty(tempPtr, len); } auto data = std::string_view(tempPtr, len); - newVector->SetValue(filledSlots, data); + varcharVector->SetValue(filledSlots, data); tempPtr += lengthPtr[filledSlots]; } else { - newVector->SetNull(filledSlots); + varcharVector->SetNull(filledSlots); } filledSlots += 1; } @@ -683,11 +620,11 @@ namespace omniruntime::reader { while (filledSlots < numValues) { //求出长度,如果为char,则需要去除最后的空格 auto len = lengthPtr[filledSlots]; - if (is_char) { + if (isChar) { FindLastNotEmpty(tempPtr, len); } auto data = std::string_view(tempPtr, len); - newVector->SetValue(filledSlots, data); + varcharVector->SetValue(filledSlots, data); tempPtr += lengthPtr[filledSlots]; filledSlots += 1; @@ -697,65 +634,57 @@ namespace omniruntime::reader { omnivec = newVector.release(); } - void OmniDecimal64ColumnReader::next(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - auto newVector = std::make_unique>(numValues); - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - notNull = HasNull ? nulls : nullptr; + void OmniDecimal64ColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + // read the next group of scales int64_t scaleBuffer[numValues]; - scaleDecoder->next(scaleBuffer, numValues, notNull); + scaleDecoder->next(scaleBuffer, numValues, nulls); - - if (notNull) { - for(size_t i=0; i < numValues; ++i) { - if (notNull[i]) { + auto vector = reinterpret_cast*>(vec); + if (hasNull) { + for(size_t i = 0; i < numValues; ++i) { + if (!null[i]) { int64_t value = 0; readInt64(value, static_cast(scaleBuffer[i])); - newVector->SetValue(static_cast(i), static_cast(value)); - } else { - newVector->SetNull(static_cast(i)); + vector->SetValue(static_cast(i), static_cast(value)); } } } else { - for(size_t i=0; i < numValues; ++i) { + for(size_t i = 0; i < numValues; ++i) { int64_t value = 0; readInt64(value, static_cast(scaleBuffer[i])); - newVector->SetValue(static_cast(i), static_cast(value)); + vector->SetValue(static_cast(i), static_cast(value)); } } - - omnivec = newVector.release(); } - void OmniDecimal128ColumnReader::next(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - auto newVector = std::make_unique>(numValues); - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - notNull = HasNull ? nulls : nullptr; + void OmniDecimal128ColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + // read the next group of scales int64_t scaleBuffer[numValues]; - scaleDecoder->next(scaleBuffer, numValues, notNull); - if (notNull) { - for(size_t i=0; i < numValues; ++i) { - if (notNull[i]) { + scaleDecoder->next(scaleBuffer, numValues, nulls); + + auto vector = reinterpret_cast*>(vec); + if (hasNull) { + for(size_t i = 0; i < numValues; ++i) { + if (!nulls[i]) { orc::Int128 value = 0; readInt128(value, static_cast(scaleBuffer[i])); __int128_t dst = value.getHighBits(); dst <<= 64; dst |= value.getLowBits(); - newVector->SetValue(i, omniruntime::type::Decimal128(dst)); - } else { - newVector->SetNull(i); + vector->SetValue(i, omniruntime::type::Decimal128(dst)); } } } else { - for(size_t i=0; i < numValues; ++i) { + for(size_t i = 0; i < numValues; ++i) { orc::Int128 value = 0; readInt128(value, static_cast(scaleBuffer[i])); __int128_t dst = value.getHighBits(); @@ -764,25 +693,21 @@ namespace omniruntime::reader { newVector->SetValue(i, omniruntime::type::Decimal128(dst)); } } - - omnivec = newVector.release(); } - void OmniDecimalHive11ColumnReader::next(void*&vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - auto newVector = std::make_unique>(numValues); - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - notNull = HasNull ? nulls : nullptr; + void OmniDecimalHive11ColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); // read the next group of scales int64_t scaleBuffer[numValues]; - scaleDecoder->next(scaleBuffer, numValues, notNull); + scaleDecoder->next(scaleBuffer, numValues, nulls); - if (notNull) { + auto vector = reinterpret_cast*>(vec); + if (hasNull) { for (size_t i = 0; i < numValues; ++i) { - if (notNull[i]) { + if (!nulls[i]) { orc::Int128 value = 0; if (!readInt128(value, static_cast(scaleBuffer[i]))) { if (throwOnOverflow) { @@ -791,17 +716,15 @@ namespace omniruntime::reader { *errorStream << "Warning: " << "Hive 0.11 decimal with more than 38 digits " << "replaced by NULL. \n"; - newVector->SetNull(i); + vector->SetNull(i); } } else { __int128_t dst = value.getHighBits(); dst <<= 64; dst |= value.getLowBits(); - newVector->SetValue(i, omniruntime::type::Decimal128(dst)); + vector->SetValue(i, omniruntime::type::Decimal128(dst)); } - } else { - newVector->SetNull(i); - } + } } } else { for (size_t i = 0; i < numValues; ++i) { @@ -813,25 +736,24 @@ namespace omniruntime::reader { *errorStream << "Warning: " << "Hive 0.11 decimal with more than 38 digits " << "replaced by NULL. \n"; - newVector->SetNull(i); + vector->SetNull(i); } } else { __int128_t dst = value.getHighBits(); dst <<= 64; dst |= value.getLowBits(); - newVector->SetValue(i, omniruntime::type::Decimal128(dst)); + vector->SetValue(i, omniruntime::type::Decimal128(dst)); } } } } - OmniIntegerColumnReader::OmniIntegerColumnReader(const Type& type, - StripeStreams& stripe): ColumnReader(type, stripe) { + OmniIntegerColumnReader::OmniIntegerColumnReader(const Type& type, StripeStreams& stripe) + : OmniColumnReader(type, stripe) { RleVersion vers = omniConvertRleVersion(stripe.getEncoding(columnId).kind()); - std::unique_ptr stream = - stripe.getStream(columnId, orc::proto::Stream_Kind_DATA, true); + std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_DATA, true); if (stream == nullptr) - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "DATA stream not found in Integer column"); + throw OmniException("EXPRESSION_NOT_SUPPORT", "DATA stream not found in Integer column"); rle = createOmniRleDecoder(std::move(stream), true, vers, memoryPool); } @@ -840,14 +762,14 @@ namespace omniruntime::reader { } uint64_t OmniIntegerColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); rle->skip(numValues); return numValues; } void OmniIntegerColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); rle->seek(positions.at(columnId)); } @@ -876,7 +798,7 @@ namespace omniruntime::reader { OmniDecimal64ColumnReader::OmniDecimal64ColumnReader(const Type& type, StripeStreams& stripe - ): ColumnReader(type, stripe) { + ): OmniColumnReader(type, stripe) { scale = static_cast(type.getScale()); precision = static_cast(type.getPrecision()); valueStream = stripe.getStream(columnId, orc::proto::Stream_Kind_DATA, true); @@ -897,7 +819,7 @@ namespace omniruntime::reader { } uint64_t OmniDecimal64ColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); uint64_t skipped = 0; while (skipped < numValues) { readBuffer(); @@ -911,7 +833,7 @@ namespace omniruntime::reader { void OmniDecimal64ColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); valueStream->seek(positions.at(columnId)); scaleDecoder->seek(positions.at(columnId)); // clear buffer state after seek @@ -998,7 +920,7 @@ namespace omniruntime::reader { StripeStreams& stripe, bool isInstantType, common::JulianGregorianRebase *julianPtr - ): ColumnReader(type, stripe), + ): OmniColumnReader(type, stripe), writerTimezone(isInstantType ? orc::getTimezoneByName("GMT") : stripe.getWriterTimezone()), @@ -1025,7 +947,7 @@ namespace omniruntime::reader { } uint64_t OmniTimestampColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); secondsRle->skip(numValues); nanoRle->skip(numValues); return numValues; @@ -1033,13 +955,13 @@ namespace omniruntime::reader { void OmniTimestampColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); secondsRle->seek(positions.at(columnId)); nanoRle->seek(positions.at(columnId)); } OmniStringDirectColumnReader::OmniStringDirectColumnReader(const Type& type, StripeStreams& stripe) - : ColumnReader(type, stripe) { + : OmniColumnReader(type, stripe) { RleVersion rleVersion = omniConvertRleVersion(stripe.getEncoding(columnId).kind()); std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_LENGTH, true); @@ -1052,6 +974,9 @@ namespace omniruntime::reader { throw orc::ParseError("DATA stream not found in StringDirectColumn"); lastBuffer = nullptr; lastBufferLength = 0; + if (type.getKind() == orc::TypeKind::CHAR) { + isChar = true; + } } OmniStringDirectColumnReader::~OmniStringDirectColumnReader() { @@ -1060,7 +985,7 @@ namespace omniruntime::reader { uint64_t OmniStringDirectColumnReader::skip(uint64_t numValues) { const size_t BUFFER_SIZE = 1024; - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); int64_t buffer[BUFFER_SIZE]; uint64_t done = 0; size_t totalBytes = 0; @@ -1091,18 +1016,16 @@ namespace omniruntime::reader { return numValues; } - size_t OmniStringDirectColumnReader::computeSize(const int64_t* lengths, - const char* notNull, - uint64_t numValues) { + size_t OmniStringDirectColumnReader::computeSize(const int64_t* lengths, bool *nulls, uint64_t numValues) { size_t totalLength = 0; - if (notNull) { - for(size_t i=0; i < numValues; ++i) { - if (notNull[i]) { + if (nulls) { + for(size_t i = 0; i < numValues; ++i) { + if (!nulls[i]) { totalLength += static_cast(lengths[i]); } } } else { - for(size_t i=0; i < numValues; ++i) { + for(size_t i = 0; i < numValues; ++i) { totalLength += static_cast(lengths[i]); } } @@ -1111,7 +1034,7 @@ namespace omniruntime::reader { void OmniStringDirectColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); blobStream->seek(positions.at(columnId)); lengthRle->seek(positions.at(columnId)); // clear buffer state after seek @@ -1120,8 +1043,7 @@ namespace omniruntime::reader { } OmniStringDictionaryColumnReader::OmniStringDictionaryColumnReader(const Type& type, StripeStreams& stripe) - : ColumnReader(type, stripe), - dictionary(new orc::StringDictionary(stripe.getMemoryPool())) { + : OmniColumnReader(type, stripe), dictionary(new orc::StringDictionary(stripe.getMemoryPool())) { RleVersion rleVersion = omniConvertRleVersion(stripe.getEncoding(columnId) .kind()); uint32_t dictSize = stripe.getEncoding(columnId).dictionarysize(); @@ -1156,6 +1078,9 @@ namespace omniruntime::reader { "DICTIONARY_DATA stream not found in StringDictionaryColumn"); } omniReadFully(dictionary->dictionaryBlob.data(), blobSize, blobStream.get()); + if (type.getKind() == orc::TypeKind::CHAR) { + isChar = true; + } } OmniStringDictionaryColumnReader::~OmniStringDictionaryColumnReader() { @@ -1163,19 +1088,19 @@ namespace omniruntime::reader { } uint64_t OmniStringDictionaryColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); rle->skip(numValues); return numValues; } void OmniStringDictionaryColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); rle->seek(positions.at(columnId)); } - OmniBooleanColumnReader::OmniBooleanColumnReader(const orc::Type& type, - orc::StripeStreams& stripe): ColumnReader(type, stripe){ + OmniBooleanColumnReader::OmniBooleanColumnReader(const orc::Type& type, orc::StripeStreams& stripe) + : OmniColumnReader(type, stripe){ std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_DATA, true); if (stream == nullptr) @@ -1188,7 +1113,7 @@ namespace omniruntime::reader { } uint64_t OmniBooleanColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); rle->skip(numValues); return numValues; } @@ -1196,14 +1121,13 @@ namespace omniruntime::reader { void OmniBooleanColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); rle->seek(positions.at(columnId)); } - OmniByteColumnReader::OmniByteColumnReader(const Type& type, - StripeStreams& stripe - ): ColumnReader(type, stripe){ + OmniByteColumnReader::OmniByteColumnReader(const Type& type, StripeStreams& stripe) + : OmniColumnReader(type, stripe){ std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_DATA, true); if (stream == nullptr) @@ -1216,21 +1140,21 @@ namespace omniruntime::reader { } uint64_t OmniByteColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); rle->skip(numValues); return numValues; } void OmniByteColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); rle->seek(positions.at(columnId)); } OmniDoubleColumnReader::OmniDoubleColumnReader(const Type& type, StripeStreams& stripe - ): ColumnReader(type, stripe), + ): OmniColumnReader(type, stripe), columnKind(type.getKind()), bytesPerValue((type.getKind() == orc::FLOAT) ? 4 : 8), @@ -1246,7 +1170,7 @@ namespace omniruntime::reader { } uint64_t OmniDoubleColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); if (static_cast(bufferEnd - bufferPointer) >= bytesPerValue * numValues) { @@ -1269,7 +1193,7 @@ namespace omniruntime::reader { void OmniDoubleColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); inputStream->seek(positions.at(columnId)); // clear buffer state after seek bufferEnd = nullptr; diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh index f3dbdeb68..8ac694b5b 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh @@ -29,50 +29,78 @@ #include "common/JulianGregorianRebase.h" namespace omniruntime::reader { - class OmniStructColumnReader: public orc::ColumnReader { + + class OmniColumnReader: public orc::ColumnReader { + public: + OmniColumnReader(const orc::Type& type, orc::StripeStream& stripe); + + virtual ~OmniColumnReader() {} + + /** + * Skip number of specified rows. + */ + virtual uint64_t skip(uint64_t numValues); + + /** + * Read OmniVector in OmniTypeId, which contains specified rows. + */ + virtual void next( + omniruntime::vec::BaseVector *omniVector, + uint64_t numValues, + bool *incomingNulls, + int omniTypeId) { + throw std::runtime_error("next() in base class should not be called"); + } + + /** + * Seek to beginning of a row group in the current stripe + * @param positions a list of PositionProviders storing the positions + */ + virtual void seekToRowGroup(std::unordered_map &positions); + }; + + class OmniStructColumnReader: public OmniColumnReader { private: - std::vector> children; + std::vector> children; public: - OmniStructColumnReader(const orc::Type& type, orc::StripeStreams& stipe, common::JulianGregorianRebase *julianPtr); + OmniStructColumnReader(const orc::Type& type, orc::StripeStreams& stipe, + common::JulianGregorianRebase *julianPtr); uint64_t skip(uint64_t numValues) override; /** * direct read VectorBatch in next * @param omniVecBatch the VectorBatch to push - * @param numValues the VectorBatch to push - * @param notNull the VectorBatch to push - * @param baseTp the vectorBatch to push + * @param numValues the numValues of VectorBatch + * @param notNull the notNull array indicates value not null + * @param baseTp the orc type * @param omniTypeId the omniTypeId to push */ void next(void *&omniVecBatch, uint64_t numValues, char *notNull, const orc::Type& baseTp, int* omniTypeId) override; - void nextEncoded(orc::ColumnVectorBatch& rowBatch, - uint64_t numValues, - char *notNull) override; - void seekToRowGroup( std::unordered_map& positions) override; private: /** - * direct read VectorBatch in next + * direct read VectorBatch in next for omni * @param omniVecBatch the VectorBatch to push - * @param numValues the VectorBatch to push - * @param notNull the VectorBatch to push - * @param baseTp the vectorBatch to push + * @param numValues the numValues of VectorBatch + * @param notNull the notNull array indicates value not null + * @param baseTp the orc type * @param omniTypeId the omniTypeId to push */ template - void nextInternal(std::vector &vecs, uint64_t numValues, char *notNull, + void nextInternal(std::vector &vecs, uint64_t numValues, bool *incomingNulls, const orc::Type& baseTp, int* omniTypeId); - omniruntime::type::DataTypeId getOmniTypeByOrcType(const orc::Type* type); + // Get default omni type from orc type. + omniruntime::type::DataTypeId getDefaultOmniType(const orc::Type *type); }; - class OmniBooleanColumnReader: public orc::ColumnReader { + class OmniBooleanColumnReader: public OmniColumnReader { protected: std::unique_ptr rle; @@ -82,13 +110,12 @@ namespace omniruntime::reader { uint64_t skip(uint64_t numValues) override; - void next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; void seekToRowGroup(std::unordered_map& positions) override; }; - class OmniByteColumnReader: public orc::ColumnReader { + class OmniByteColumnReader: public OmniColumnReader { protected: std::unique_ptr rle; @@ -98,14 +125,12 @@ namespace omniruntime::reader { uint64_t skip(uint64_t numValues) override; - void next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; - void seekToRowGroup( - std::unordered_map& positions) override; + void seekToRowGroup(std::unordered_map& positions) override; }; - class OmniIntegerColumnReader: public orc::ColumnReader { + class OmniIntegerColumnReader: public OmniColumnReader { protected: std::unique_ptr rle; @@ -115,20 +140,17 @@ namespace omniruntime::reader { uint64_t skip(uint64_t numValues) override; - void next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; - void seekToRowGroup( - std::unordered_map& positions) override; + void seekToRowGroup(std::unordered_map& positions) override; }; - class OmniTimestampColumnReader: public orc::ColumnReader { + class OmniTimestampColumnReader: public OmniColumnReader { public: - void next(void*& vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; - template - void nextByType(void*& omnivec, uint64_t numValues, char* notNull, const orc::Type& baseTp, int* omniTypeId); + template + void nextByType(T *data, uint64_t numValues, bool *nulls; private: std::unique_ptr secondsRle; @@ -151,19 +173,17 @@ namespace omniruntime::reader { std::unordered_map& positions) override; }; - class OmniDoubleColumnReader: public orc::ColumnReader { + class OmniDoubleColumnReader: public OmniColumnReader { public: OmniDoubleColumnReader(const orc::Type& type, orc::StripeStreams& stripe); ~OmniDoubleColumnReader() override; uint64_t skip(uint64_t numValues) override; - void next(void*& vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; - template - void nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp); + template + void nextByType(T *data, uint64_t numValues, bool *nulls; void seekToRowGroup( std::unordered_map& positions) override; @@ -207,14 +227,14 @@ namespace omniruntime::reader { }; - class OmniStringDictionaryColumnReader: public orc::ColumnReader { + class OmniStringDictionaryColumnReader: public OmniColumnReader { public: - void next(void*& vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; private: std::shared_ptr dictionary; std::unique_ptr rle; + bool isChar = false; public: OmniStringDictionaryColumnReader(const orc::Type& type, orc::StripeStreams& stipe); @@ -222,30 +242,28 @@ namespace omniruntime::reader { uint64_t skip(uint64_t numValues) override; - void seekToRowGroup( - std::unordered_map& positions) override; + void seekToRowGroup(std::unordered_map& positions) override; }; - class OmniStringDirectColumnReader: public orc::ColumnReader { + class OmniStringDirectColumnReader: public OmniColumnReader { public: - void next(void*& vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; private: std::unique_ptr lengthRle; std::unique_ptr blobStream; const char *lastBuffer; size_t lastBufferLength; + bool isChar = false; /** * Compute the total length of the values. * @param lengths the array of lengths - * @param notNull the array of notNull flags + * @param nulls the array of nulls flags * @param numValues the lengths of the arrays * @return the total number of bytes for the non-null values */ - size_t computeSize(const int64_t *lengths, const char *notNull, - uint64_t numValues); + size_t computeSize(const int64_t *lengths, bool *nulls, uint64_t numValues); public: OmniStringDirectColumnReader(const orc::Type& type, orc::StripeStreams& stipe); @@ -257,10 +275,9 @@ namespace omniruntime::reader { std::unordered_map& positions) override; }; - class OmniDecimal64ColumnReader: public orc::ColumnReader { + class OmniDecimal64ColumnReader: public OmniColumnReader { public: - void next(void*&vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; public: static const uint32_t MAX_PRECISION_64 = 18; @@ -327,8 +344,7 @@ namespace omniruntime::reader { class OmniDecimal128ColumnReader : public OmniDecimal64ColumnReader { public: - void next(void*&vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; public: OmniDecimal128ColumnReader(const orc::Type& type, orc::StripeStreams& stipe); @@ -349,8 +365,7 @@ namespace omniruntime::reader { OmniDecimalHive11ColumnReader(const orc::Type& type, orc::StripeStreams& stipe); ~OmniDecimalHive11ColumnReader() override; - void next(void*&vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; }; std::unique_ptr omniBuildReader(const orc::Type& type, -- Gitee From 015a39c572b2123f778640b942a3aef1320a9fe7 Mon Sep 17 00:00:00 2001 From: zhousipei Date: Sat, 8 Feb 2025 09:41:38 +0800 Subject: [PATCH 4/6] refactor nulls from char to bool --- .../cpp/src/orcfile/OmniByteRLE.cc | 292 ++++++------------ .../cpp/src/orcfile/OmniByteRLE.hh | 23 +- 2 files changed, 104 insertions(+), 211 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc index 1f3dfbbc9..7b9897ee8 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc @@ -24,6 +24,7 @@ namespace omniruntime::reader { const int MINIMUM_REPEAT = 3; const int MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT; + const uint32_t BITS_OF_BYTE = 8; void OmniBooleanRleDecoder::seek(orc::PositionProvider& location) { OmniByteRleDecoder::seek(location); @@ -54,86 +55,111 @@ namespace omniruntime::reader { } } - void OmniBooleanRleDecoder::next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId) { - auto dataTypeId = static_cast(omniTypeId); - std::unique_ptr tempOmnivec = makeNewVector(numValues, baseTp, dataTypeId); - auto pushOmniVec = tempOmnivec.get(); - switch (dataTypeId) { + void OmniBooleanRleDecoder::nextNulls(bool *data, uint64_t numValues, bool *nulls) { + // next spot to fill in + uint64_t position = 0; + // use up any remaining bits + if (nulls) { + while (remainingBits > 0 && position < numValues) { + if (!nulls[position]) { + remainingBits -= 1; + data[position] = !((static_cast(lastByte) >> remainingBits) & 0x1); + } else { + data[position] = true; + } + position += 1; + } + } else { + while (remainingBits > 0 && position < numValues) { + remainingBits -= 1; + data[position] = !((static_cast(lastByte) >> remainingBits) & 0x1); + position += 1; + } + } + // count the number of nulls remaining + uint64_t nonNulls = numValues - position; + if (nulls) { + for (uint64_t i = position; i < numValues; ++i) { + if (nulls[i]) { + nonNulls -= 1; + } + } + } + // fill in the remaining values + if (nonNulls == 0) { + while (position < numValues) { + data[position++] = true; + } + } else if (position < numValues) { + // read the new bytes into array + uint64_t bytesRead = (nonNulls + 7) / 8; + OmniByteRleDecoder::next(reinterpret_cast(data + position), bytesRead, nullptr); + lastByte = data[position + bytesRead - 1]; + remainingBits = bytesRead * 8 - nonNulls; + // expand the array backwards so that we don't clobber the data + uint64_t bitLeft = bytesRead * 8 - remainingBits; + if (nulls) { + for (int64_t i = static_cast(numValues) - 1; i >= static_cast(position); --i) { + if (!nulls[i]) { + uint64_t shiftPosn = (-bitLeft) % 8; + data[i] = !((data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x01); + bitsLeft -= 1; + } else { + data[i] = true; + } + } + } else { + for (int64_t i = static_cast(numValues) - 1; i >= static_cast(position); --i) { + uint64_t shiftPosn = (-bitsLeft) % 8; + data[i] = !((data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x01); + bitsLeft -= 1; + } + } + } + } + + + void OmniBooleanRleDecoder::next(omniruntime::vec::BaseVector *omnivec, uint64_t numValues, bool *nulls, + int omniTypeId) { + switch (omniTypeId) { case omniruntime::type::OMNI_BOOLEAN: - nextByType(pushOmniVec, numValues, notNull, baseTp, omniTypeId); - break; - case omniruntime::type::OMNI_SHORT: - throw std::runtime_error("OmniBooleanRleDecoder SHORT not finished!!!"); - break; - case omniruntime::type::OMNI_INT: - throw std::runtime_error("OmniBooleanRleDecoder INT not finished!!!"); - break; - case omniruntime::type::OMNI_LONG: - throw std::runtime_error("OmniBooleanRleDecoder LONG not finished!!!"); - break; - case omniruntime::type::OMNI_TIMESTAMP: - throw std::runtime_error("OmniBooleanRleDecoder TIMESTAMP not finished!!!"); + auto boolValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(omnivec)); + return next(boolValues, numValues, nulls); break; - case omniruntime::type::OMNI_DATE32: - throw std::runtime_error("OmniBooleanRleDecoder DATE32 not finished!!!"); - break; - case omniruntime::type::OMNI_DATE64: - throw std::runtime_error("OmniBooleanRleDecoder DATE64 not finished!!!"); - break; - case omniruntime::type::OMNI_DOUBLE: - throw std::runtime_error("OmniBooleanRleDecoder DOUBLE not finished!!!"); - break; - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("OmniBooleanRleDecoder CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("OmniBooleanRleDecoder VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("OmniBooleanRleDecoder DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("OmniBooleanRleDecoder DECIMAL64 should not in here!!!"); default: - printf("OmniBooleanRleDecoder switch no process!!!"); + throw std::runtime_error("OmniBooleanRleDecoder not support type: " + omniTypeId); } - - omnivec = tempOmnivec.release(); } - template - void OmniBooleanRleDecoder::nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(omnivec); - + template + void OmniBooleanRleDecoder::next(T *data, uint64_t numValues, bool *nulls) { // next spot to fill in uint64_t position = 0; // use up any remaining bits - if (notNull) { - while(remainingBits > 0 && position < numValues) { - if (notNull[position]) { + if (nulls) { + while (remainingBits > 0 && position < numValues) { + if (!nulls[position]) { remainingBits -= 1; - vec->SetValue(static_cast(position), static_cast((static_cast(lastByte) >> - remainingBits) & 0x1)); + data[position] = static_cast((static_cast(lastByte) >> remainingBits) & 0x1)); } else { - vec->SetNull(static_cast(position)); + data[position] = 0; } position += 1; } } else { - while(remainingBits > 0 && position < numValues) { + while (remainingBits > 0 && position < numValues) { remainingBits -= 1; - vec->SetValue(static_cast(position++), static_cast((static_cast(lastByte) >> - remainingBits) & 0x1)); + data[position++] = static_cast((static_cast(lastByte) >> remainingBits) & 0x1)); } } // count the number of nonNulls remaining uint64_t nonNulls = numValues - position; - if (notNull) { - for(uint64_t i = position; i < numValues; ++i) { - if (!notNull[i]) { + if (nulls) { + for (uint64_t i = position; i < numValues; ++i) { + if (!nulls[i]) { nonNulls -= 1; } } @@ -142,35 +168,31 @@ namespace omniruntime::reader { // fill in the remaining values if (nonNulls == 0) { while (position < numValues) { - vec->SetNull(static_cast(position++)); + data[position++] = 0; } } else if (position < numValues) { // read the new bytes into the array uint64_t bytesRead = (nonNulls + 7) / 8; - auto *values = reinterpret_cast(omniruntime::vec::VectorHelper::UnsafeGetValues(omnivec)); - OmniByteRleDecoder::next(values + position, bytesRead, nullptr); - lastByte = static_cast(vec->GetValue(position + bytesRead - 1)); + OmniByteRleDecoder::next(reinterpret_cast(data + position), bytesRead, nullptr); + lastByte = data[position + bytesRead - 1]; remainingBits = bytesRead * 8 - nonNulls; // expand the array backwards so that we don't clobber the data - uint64_t bitsLeft = bytesRead * 8- remainingBits; - if (notNull) { - for (int64_t i =static_cast(numValues) - 1; - i >= static_cast(position); --i) { - if (notNull[i]) { + uint64_t bitsLeft = bytesRead * 8 - remainingBits; + if (nulls) { + for (int64_t i = static_cast(numValues) - 1; i >= static_cast(position); --i) { + if (!nulls[i]) { uint64_t shiftPosn = (-bitsLeft) % 8; - auto value = static_cast(vec->GetValue(position + (bitsLeft - 1) / 8)) >> shiftPosn; - vec->SetValue(static_cast(i), static_cast(value & 0x1)); + data[i] = static_cast((data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1); bitsLeft -= 1; } else { - vec->SetNull(static_cast(i)); + data[i] = 0; } } } else { - for(int64_t i = static_cast(numValues) - 1; + for (int64_t i = static_cast(numValues) - 1; i >= static_cast(position); --i, --bitsLeft) { uint64_t shiftPosn = (-bitsLeft) % 8; - auto value = static_cast(vec->GetValue(position + (bitsLeft - 1) / 8)) >> shiftPosn; - vec->SetValue(static_cast(i), static_cast(value & 0x1)); + data[i] = static_cast((data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1); } } } @@ -329,129 +351,5 @@ namespace omniruntime::reader { } } - void OmniByteRleDecoder::next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId) { - auto dataTypeId = static_cast(omniTypeId); - std::unique_ptr tempOmnivec = makeNewVector(numValues, baseTp, dataTypeId); - auto pushOmniVec = tempOmnivec.get(); - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_SHORT: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_INT: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_LONG: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_TIMESTAMP: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DATE32: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DATE64: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DOUBLE: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("OmniByteRleDecoder CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("OmniByteRleDecoder VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("OmniByteRleDecoder DECIMAL64 not finished!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("OmniByteRleDecoder DECIMAL64 not finished!!!"); - default: - printf("OmniByteRleDecoder swtich no process!!!"); - } - - omnivec = tempOmnivec.release(); - } - - template - void OmniByteRleDecoder::nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(omnivec); - - uint64_t position = 0; - // skip over null values - while (notNull && position < numValues && !notNull[position]) { - position += 1; - } - while (position < numValues) { - // if we are out of values, read more - if (remainingValues == 0) { - readHeader(); - } - // how many do we read out of this block? - size_t count = std::min(static_cast(numValues - position), - remainingValues); - uint64_t consumed = 0; - if (repeating) { - if (notNull) { - for(uint64_t i=0; i < count; ++i) { - if (notNull[position + i]) { - vec->SetValue(static_cast(position + i), static_cast(value)); - consumed += 1; - } else { - vec->SetNull(static_cast(position + i)); - } - } - } else { - for (uint64_t i = position; i < position + count; ++i) { - vec->SetValue(static_cast(i), static_cast(value)); - } - consumed = count; - } - } else { - if (notNull) { - for(uint64_t i = 0; i < count; ++i) { - if (notNull[position + i]) { - vec->SetValue(static_cast(position + i), static_cast(readByte())); - consumed += 1; - } else { - vec->SetNull(static_cast(position + i)); - } - } - } else { - uint64_t i = 0; - while (i < count) { - if (bufferStart == bufferEnd) { - nextBuffer(); - } - uint64_t copyBytes = - std::min(static_cast(count - i), - static_cast(bufferEnd - bufferStart)); - vec->SetValues(static_cast(position + i), bufferStart, static_cast(copyBytes)); - bufferStart += copyBytes; - i += copyBytes; - } - consumed = count; - } - } - remainingValues -= consumed; - position += count; - // skip over any null values - while (notNull && position < numValues && !notNull[position]) { - position += 1; - } - } - } //OmniByteRleDecoder end } diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh index 0ed215347..c5e5ddb43 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh @@ -43,13 +43,6 @@ namespace omniruntime::reader { */ virtual void next(char* data, uint64_t numValues, char* notNull); - virtual void next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId); - - template - void nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp); - protected: inline void nextBuffer(); inline signed char readByte(); @@ -80,15 +73,17 @@ namespace omniruntime::reader { virtual void skip(uint64_t numValues); /** - * Read a number of values into the batch. - */ + * Read nulls flag to data. + */ + void nextNulls(bool *data, uint64_t numValues, bool *nulls); - virtual void next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId); + /** + * Read a number of values into the batch by nulls. + */ + void next(omniruntime::vec::BaseVector *omnivec, uint64_t numValues, bool *nulls, int omniTypeId); - template - void nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId); + template + void next(T *data, uint64_t numValues, bool *nulls); protected: size_t remainingBits; -- Gitee From 359e3cdc58cbcfc5e93939019332bb0713a826d5 Mon Sep 17 00:00:00 2001 From: zhousipei Date: Sat, 8 Feb 2025 10:37:23 +0800 Subject: [PATCH 5/6] refactor generate values in rle --- .../cpp/src/orcfile/OmniRLEv2.cc | 352 ++++-------------- .../cpp/src/orcfile/OmniRLEv2.hh | 57 +-- 2 files changed, 91 insertions(+), 318 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc index b4e1684f7..5419409d6 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc @@ -101,20 +101,59 @@ namespace omniruntime::reader { } } - void OmniRleDecoderV2::next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId) { - uint64_t nRead = 0; + void OmniRleDecoderV2::next(int64_t *data, uint64_t numValues, bool *nulls) { + next(data, numValues, nulls); + } + + void OmniRleDecoderV2::next(int32_t *data, uint64_t numValues, bool *nulls) { + next(data, numValues, nulls); + } + + void OmniRleDecoderV2::next(int16_t *data, uint64_t numValues, bool *nulls) { + next(data, numValues, nulls); + } + + void OmniRleDecoderV2::next(bool *data, uint64_t numValues, bool *nulls) { + next(data, numValues, nulls); + } + + void OmniRleDecoderV2::next(omniruntime::vec::BaseVector *omnivec, uint64_t numValues, bool *nulls, int omniTypeId) { + switch (omniTypeId) { + case omniruntime::type::OMNI_BOOLEAN: { + auto boolValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(omnivec)); + return next(boolValues, numValues, nulls); + } + case omniruntime::type::OMNI_SHORT: { + auto shortValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(omnivec)); + return next(shortValues, numValues, nulls); + } + case omniruntime::type::OMNI_INT: + case omniruntime::type::OMNI_DATE32: { + auto intValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(omnivec)); + return next(intValues, numValues, nulls); + } + case omniruntime::type::OMNI_LONG: + case omniruntime::type::OMNI_DATE64: { + auto longValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(omnivec)); + return next(longValues, numValues, nulls); + } + default: + throw std::runtime_error("OmniRleDecoderV2 Not support For This Type: " + omniTypeId); + } + } - auto dataTypeId = static_cast(omniTypeId); - std::unique_ptr tempOmnivec = makeNewVector(numValues, baseTp, dataTypeId); - auto pushOmniVec = tempOmnivec.get(); + template + void OmniRleDecoderV2::next(T *data, uint64_t numValues, bool *nulls) { + uint64_t nRead = 0; while (nRead < numValues) { // SKip any nulls before attempting to read first byte. - while (notNull && !notNull[nRead]) { - tempOmnivec->SetNull(nRead); + while (nulls && nulls[nRead]) { if (++nRead == numValues) { - omnivec = tempOmnivec.release(); return; //ended with null values } } @@ -127,30 +166,27 @@ namespace omniruntime::reader { uint64_t offset = nRead, length = numValues - nRead; orc::EncodingType enc = static_cast((firstByte >> 6) & 0x03); - switch (static_cast(enc)) { + switch (static_cast(enc)) { case orc::SHORT_REPEAT: - nRead += nextShortRepeatsByType(pushOmniVec, offset, length, notNull, dataTypeId); + nRead += nextShortRepeats(data, offset, length, nulls); break; case orc::DIRECT: - nRead += nextDirect(pushOmniVec, offset, length, notNull, dataTypeId); + nRead += nextDirect(data, offset, length, nulls); break; case orc::PATCHED_BASE: - nRead += nextPatchedByType(pushOmniVec, offset, length, notNull, dataTypeId); + nRead += nextPatched(data, offset, length, nulls); break; case orc::DELTA: - nRead += nextDeltaByType(pushOmniVec, offset, length, notNull, dataTypeId); + nRead += nextDelta(data, offset, length, nulls); break; default: throw orc::ParseError("unknown encoding"); } } - - omnivec = tempOmnivec.release(); } - uint64_t OmniRleDecoderV2::nextDirect(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { + template + uint64_t OmniRleDecoderV2::nextDirect(T *data, uint64_t offset, uint64_t numValues, bool *nulls) { if (runRead == runLength) { // extract the number of fixed bits unsigned char fbo = (firstByte >> 1) & 0x1f; @@ -172,56 +208,11 @@ namespace omniruntime::reader { } } - return copyDataFromBufferByType(OmniVec, offset, numValues, notNull, dataTypeId); + return copyDataFromBuffer(data, offset, numValues, nulls); } - uint64_t OmniRleDecoderV2::nextShortRepeatsByType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return nextShortRepeats - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_SHORT: - return nextShortRepeats - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_INT: - return nextShortRepeats - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_LONG: - return nextShortRepeatsLongType - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_DATE32: - return nextShortRepeats - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_DATE64: - return nextShortRepeatsLongType - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_DOUBLE: - return nextShortRepeats - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("nextShortRepeats_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("nextShortRepeats_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("nextShortRepeats_type DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("nextShortRepeats_type DECIMAL128 should not in here!!!"); - default: - printf("nextShortRepeats_type switch no process!!!"); - } - - return 0; - } - - template - uint64_t OmniRleDecoderV2::nextShortRepeats(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(OmniVec); - + template + uint64_t OmniRleDecoderV2::nextShortRepeats(T *data, uint64_t offset, uint64_t numValues, bool *nulls) { if (runRead == runLength) { // extract the number of fixed bytes uint64_t byteSize = (firstByte >> 3) & 0x07; @@ -242,18 +233,16 @@ namespace omniruntime::reader { uint64_t nRead = std::min(runLength - runRead, numValues); - if (notNull) { + if (nulls) { for(uint64_t pos = offset; pos < offset + nRead; ++pos) { - if (notNull[pos]) { - vec->SetValue(static_cast(pos), static_cast(literals[0])); + if (!nulls[pos]) { + data[pos] = static_cast(literals[0]); ++runRead; - } else { - vec->SetNull(static_cast(pos)); } } } else { for(uint64_t pos = offset; pos < offset + nRead; ++pos) { - vec->SetValue(static_cast(pos), static_cast(literals[0])); + data[pos] = static_cast(literals[0]); ++runRead; } } @@ -261,97 +250,8 @@ namespace omniruntime::reader { return nRead; } - template - uint64_t OmniRleDecoderV2::nextShortRepeatsLongType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(OmniVec); - - if (runRead == runLength) { - // extract the number of fixed bytes - uint64_t byteSize = (firstByte >> 3) & 0x07; - byteSize += 1; - - runLength = firstByte & 0x07; - // run lengths values are stored only after MIN_REPEAT value is met - runLength += MIN_REPEAT; - runRead = 0; - - // read the repeated value which is store using fixed bytes - literals[0] = readLongBE(byteSize); - - if (isSigned) { - literals[0] = orc::unZigZag(static_cast(literals[0])); - } - } - - uint64_t nRead = std::min(runLength - runRead, numValues); - - if (notNull) { - for(uint64_t pos = offset; pos < offset + nRead; ++pos) { - if (notNull[pos]) { - vec->SetValue(pos, static_cast(literals[0])); - ++runRead; - } else { - vec->SetNull(pos); - } - } - } else { - int64_t values[nRead]; - std::fill(values, values + nRead, literals[0]); - vec->SetValues(offset, values, nRead); - runRead += nRead; - } - - return nRead; - } - - - uint64_t OmniRleDecoderV2::nextPatchedByType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_SHORT: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_INT: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_LONG: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DATE32: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DATE64: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DOUBLE: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("nextPatched_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("nextPatched_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("nextPatched_type DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("nextPatched_type DECIMAL128 should not in here!!!"); - default: - printf("nextPatched_type switch no process!!!"); - } - - return 0; - } - - template - uint64_t OmniRleDecoderV2::nextPatched(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { + template + uint64_t OmniRleDecoderV2::nextPatched(T *data, uint64_t offset, uint64_t numValues, bool *nulls) { if (runRead == runLength) { // extract the number of fixed bits unsigned char fbo = (firstByte >> 1) & 0x1f; @@ -445,53 +345,11 @@ namespace omniruntime::reader { } } - return copyDataFromBufferByType(OmniVec, offset, numValues, notNull, dataTypeId); + return copyDataFromBuffer(data, offset, numValues, nulls); } - uint64_t OmniRleDecoderV2::nextDeltaByType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_SHORT: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_INT: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_LONG: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DATE32: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DATE64: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DOUBLE: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("nextShortRepeats_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("nextShortRepeats_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("nextShortRepeats_type DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("nextShortRepeats_type DECIMAL128 should not in here!!!"); - default: - printf("nextShortRepeats_type switch no process!!!"); - } - - return 0; - } - - template - uint64_t OmniRleDecoderV2::nextDelta(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { + template + uint64_t OmniRleDecoderV2::nextDelta(T *data, uint64_t offset, uint64_t numValues, bool *nulls) { if (runRead == runLength) { // extract the number of fixed bits unsigned char fbo = (firstByte >> 1) &0x1f; @@ -552,7 +410,7 @@ namespace omniruntime::reader { } } - return copyDataFromBufferByType(OmniVec, offset, numValues, notNull, dataTypeId); + return copyDataFromBuffer(data, offset, numValues, nulls); } void OmniRleDecoderV2::readLongs(int64_t *data, uint64_t offset, uint64_t len, uint64_t fbs) { @@ -893,81 +751,19 @@ namespace omniruntime::reader { return; } - uint64_t OmniRleDecoderV2::copyDataFromBufferByType(omniruntime::vec::BaseVector*& tempOmnivec, uint64_t offset, - uint64_t numValues, const char* notNull, - omniruntime::type::DataTypeId dataTypeId) { - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return copyDataFromBuffer(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_SHORT: - return copyDataFromBuffer(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_INT: - return copyDataFromBuffer(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_LONG: - return copyDataFromBufferTo64bit(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_DATE32: - return copyDataFromBuffer(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_DATE64: - return copyDataFromBufferTo64bit(tempOmnivec, offset, numValues, - notNull); - case omniruntime::type::OMNI_DOUBLE: - return copyDataFromBuffer(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("copyDataFromBuffer_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("copyDataFromBuffer_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("copyDataFromBuffer_type DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("copyDataFromBuffer_type DECIMAL128 should not in here!!!"); - default: - printf("copyDataFromBuffer switch no process!!!"); - } - - return 0; - } - - template - uint64_t OmniRleDecoderV2::copyDataFromBuffer(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t numValues, const char* notNull) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(OmniVec); + template + uint64_t OmniRleDecoderV2::copyDataFromBuffer(T *data, uint64_t offset, uint64_t numValues, bool *nulls) { uint64_t nRead = std::min(runLength - runRead, numValues); - if (notNull) { + if (nulls) { for (uint64_t i = offset; i < (offset + nRead); ++i) { - if (notNull[i]) { - vec->SetValue(static_cast(i), static_cast(literals[runRead++])); - } else { - vec->SetNull(static_cast(i)); + if (!nulls[i]) { + data[i] = static_cast(literals[runRead++]); } } } else { for (uint64_t i = offset; i < (offset + nRead); ++i) { - vec->SetValue(static_cast(i), static_cast(literals[runRead++])); - } - } - return nRead; - } - - template - uint64_t OmniRleDecoderV2::copyDataFromBufferTo64bit(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t numValues, const char* notNull) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(OmniVec); - uint64_t nRead = std::min(runLength - runRead, numValues); - if (notNull) { - for (uint64_t i = offset; i < (offset + nRead); ++i) { - if (notNull[i]) { - vec->SetValue(static_cast(i), static_cast(literals[runRead++])); - } else { - vec->SetNull(static_cast(i)); - } + data[i] = static_cast(literals[runRead++]); } - } else { - vec->SetValues(static_cast(offset), literals.data() + runRead, static_cast(nRead)); - runRead += nRead; } return nRead; } diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh index ea0112cff..8576f4682 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh @@ -49,58 +49,35 @@ namespace omniruntime::reader { * direct read VectorBatch in next * @param omnivec the BaseVector to push * @param numValues the numValues to push - * @param notNull the nullarrays to push + * @param nulls the nullarrays to push * @param baseTp the orcType to push * @param omniTypeId the int* of omniType to push */ - void next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId); + void next(omniruntime::vec::BaseVector *omnivec, uint64_t numValues, bool *nulls, int omniTypeId); - void next(int64_t* data, uint64_t numValues, const char* notNull) { - orc::RleDecoderV2::next(data, numValues, notNull); - } - - uint64_t nextDirect(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, uint64_t numValues, - const char* const notNull, omniruntime::type::DataTypeId dataTypeId); - - uint64_t nextShortRepeatsByType(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, uint64_t numValues, - const char* const notNull, omniruntime::type::DataTypeId dataTypeId); - - template - uint64_t nextShortRepeats(omniruntime::vec::BaseVector*& omnivec, uint64_t offset, uint64_t numValues, - const char* notNull); - - template - uint64_t nextShortRepeatsLongType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull); + void next(int64_t *data, uint64_t numValues, bool *nulls); + void next(int32_t *data, uint64_t numValues, bool *nulls); - uint64_t nextPatchedByType(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, uint64_t numValues, - const char* const notNull, omniruntime::type::DataTypeId dataTypeId); + void next(int16_t *data, uint64_t numValues, bool *nulls); - template - uint64_t nextPatched(omniruntime::vec::BaseVector*& omnivec, uint64_t offset, uint64_t numValues, - const char* notNull, omniruntime::type::DataTypeId dataTypeId); + template + void next(T *data, uint64_t numValues, bool *nulls); - template - uint64_t nextDelta(omniruntime::vec::BaseVector*& omnivec, uint64_t offset, uint64_t numValues, - const char* notNull, omniruntime::type::DataTypeId dataTypeId); + template + uint64_t nextShortRepeats(T *data, uint64_t offset, uint64_t numValues, bool *nulls); - uint64_t nextDeltaByType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId); + template + uint64_t nextDirect(T *data, uint64_t offset, uint64_t numValues, bool *nulls); - uint64_t copyDataFromBufferByType(omniruntime::vec::BaseVector*& tempOmnivec, uint64_t offset, - uint64_t numValues, const char* notNull, - omniruntime::type::DataTypeId dataTypeId); + template + uint64_t nextPatched(T *data, uint64_t offset, uint64_t numValues, bool *nulls); - template - uint64_t copyDataFromBufferTo64bit(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t numValues, const char* notNull); + template + uint64_t nextDelta(T *data, uint64_t offset, uint64_t numValues, bool *nulls); - template - uint64_t copyDataFromBuffer(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t numValues, const char* notNull); + template + uint64_t copyDataFromBuffer(T *data, uint64_t offset, uint64_t numValues, bool *nulls); void readLongs(int64_t *data, uint64_t offset, uint64_t len, uint64_t fbs); -- Gitee From cd4cdc3267aab6c3496f55be61fda49f7c37ad27 Mon Sep 17 00:00:00 2001 From: zhousipei Date: Sat, 8 Feb 2025 10:40:15 +0800 Subject: [PATCH 6/6] clean code --- .../cpp/src/orcfile/OmniByteRLE.cc | 16 +++++---- .../cpp/src/orcfile/OmniByteRLE.hh | 26 +++++++------- .../cpp/src/orcfile/OmniColReader.cc | 36 ++++++++----------- .../cpp/src/orcfile/OmniColReader.hh | 11 +++--- .../cpp/src/orcfile/OmniRLEv2.hh | 2 ++ 5 files changed, 47 insertions(+), 44 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc index 7b9897ee8..93bb03f4a 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc @@ -97,11 +97,11 @@ namespace omniruntime::reader { lastByte = data[position + bytesRead - 1]; remainingBits = bytesRead * 8 - nonNulls; // expand the array backwards so that we don't clobber the data - uint64_t bitLeft = bytesRead * 8 - remainingBits; + uint64_t bitsLeft = bytesRead * 8 - remainingBits; if (nulls) { for (int64_t i = static_cast(numValues) - 1; i >= static_cast(position); --i) { if (!nulls[i]) { - uint64_t shiftPosn = (-bitLeft) % 8; + uint64_t shiftPosn = (-bitsLeft) % 8; data[i] = !((data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x01); bitsLeft -= 1; } else { @@ -122,16 +122,20 @@ namespace omniruntime::reader { void OmniBooleanRleDecoder::next(omniruntime::vec::BaseVector *omnivec, uint64_t numValues, bool *nulls, int omniTypeId) { switch (omniTypeId) { - case omniruntime::type::OMNI_BOOLEAN: + case omniruntime::type::OMNI_BOOLEAN: { auto boolValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( static_cast*>(omnivec)); return next(boolValues, numValues, nulls); - break; + } default: throw std::runtime_error("OmniBooleanRleDecoder not support type: " + omniTypeId); } } + void OmniBooleanRleDecoder::next(char *data, uint64_t numValues, bool *nulls) { + next(data, numValues, nulls); + } + template void OmniBooleanRleDecoder::next(T *data, uint64_t numValues, bool *nulls) { // next spot to fill in @@ -142,7 +146,7 @@ namespace omniruntime::reader { while (remainingBits > 0 && position < numValues) { if (!nulls[position]) { remainingBits -= 1; - data[position] = static_cast((static_cast(lastByte) >> remainingBits) & 0x1)); + data[position] = static_cast((static_cast(lastByte) >> remainingBits) & 0x1); } else { data[position] = 0; } @@ -151,7 +155,7 @@ namespace omniruntime::reader { } else { while (remainingBits > 0 && position < numValues) { remainingBits -= 1; - data[position++] = static_cast((static_cast(lastByte) >> remainingBits) & 0x1)); + data[position++] = static_cast((static_cast(lastByte) >> remainingBits) & 0x1); } } diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh index c5e5ddb43..95bf9f347 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh @@ -44,16 +44,16 @@ namespace omniruntime::reader { virtual void next(char* data, uint64_t numValues, char* notNull); protected: - inline void nextBuffer(); - inline signed char readByte(); - inline void readHeader(); - - std::unique_ptr inputStream; - size_t remainingValues; - char value; - const char* bufferStart; - const char* bufferEnd; - bool repeating; + inline void nextBuffer(); + inline signed char readByte(); + inline void readHeader(); + + std::unique_ptr inputStream; + size_t remainingValues; + char value; + const char* bufferStart; + const char* bufferEnd; + bool repeating; }; class OmniBooleanRleDecoder: public OmniByteRleDecoder { @@ -85,9 +85,11 @@ namespace omniruntime::reader { template void next(T *data, uint64_t numValues, bool *nulls); + void next(char *data, uint64_t numValues, bool *nulls); + protected: - size_t remainingBits; - char lastByte; + size_t remainingBits; + char lastByte; }; } #endif diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc index 009a56f23..4734eb07f 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc @@ -82,13 +82,12 @@ namespace omniruntime::reader { : ColumnReader(type, stripe) { std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_PRESENT, true); if (stream.get()) { - notNullDecoder = createOmniBooleanRleDecoder(std::move(stream)); + notNullDecoder = std::make_unique(std::move(stream)); } } uint64_t OmniColumnReader::skip(uint64_t numValues) { - OmniByteRleDecoder *decoder = notNullDecoder.get(); - if (decoder) { + if (notNullDecoder) { // pass through the values that we want to skip and count how many are non-null const uint64_t MAX_BUFFER_SIZE = 32768; uint64_t bufferSize = std::min(MAX_BUFFER_SIZE, numValues); @@ -97,7 +96,7 @@ namespace omniruntime::reader { uint64_t remaining = numValues; while (remaining > 0) { uint64_t chunkSize = std::min(remaining, bufferSize); - decoder->next(buffer, chunkSize, nullptr); + notNullDecoder->next(buffer, chunkSize, nullptr); remaining -= chunkSize; // update non-null count for (uint64_t i = 0; i < chunkSize; i++) { @@ -112,7 +111,7 @@ namespace omniruntime::reader { } void OmniColumnReader::seekToRowGroup(std::unordered_map &positions) { - if (notNullDecoder.get()) { + if (notNullDecoder) { notNullDecoder->seek(positions.at(columnId)); } } @@ -181,12 +180,9 @@ namespace omniruntime::reader { } inline void readNulls(OmniColumnReader *colReader, uint64_t numValues, bool *incomingNulls, bool *nulls) { - OmniByteRleDecoder* decoder = reinterpret_cast(colReader->notNullDecoder.get()); - - if (decoder) { - decoder->nextNulls(nulls, numValues, incomingNulls); + if (colReader->notNullDecoder) { + colReader->notNullDecoder->nextNulls(nulls, numValues, incomingNulls); // check to see if there are nulls in this batch - } } else if (incomingNulls) { // if we don't have a notNull stream, copy the incomingNulls // To do finished @@ -298,7 +294,7 @@ namespace omniruntime::reader { template void OmniStructColumnReader::nextInternal(std::vector &vecs, uint64_t numValues, - char *incomingNulls, const orc::Type& baseTp, int* omniTypeId) { + bool *incomingNulls, const orc::Type& baseTp, int* omniTypeId) { if (encoded) { std::string message("OmniStructColumnReader::nextInternal encoded is not finished!"); @@ -391,17 +387,17 @@ namespace omniruntime::reader { auto dataTypeId = static_cast(omniTypeId); switch (dataTypeId) { case omniruntime::type::OMNI_DATE32: { - auto intValues = omniruntime::vec::unsafe::UnsafeBaseVector::GetRawValues( + auto intValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( static_cast*>(vec)); return nextByType(intValues, numValues, hasNull ? nulls : nullptr); } case omniruntime::type::OMNI_DATE64: { - auto longValues = omniruntime::vec::unsafe::UnsafeBaseVector::GetRawValues( + auto longValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( static_cast*>(vec)); return nextByType(longValues, numValues, hasNull ? nulls : nullptr); } case omniruntime::type::OMNI_TIMESTAMP: { - auto longValues = omniruntime::vec::unsafe::UnsafeBaseVector::GetRawValues( + auto longValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( static_cast*>(vec)); return nextByType(longValues, numValues, hasNull ? nulls : nullptr); } @@ -464,7 +460,7 @@ namespace omniruntime::reader { auto dataTypeId = static_cast(omniTypeId); switch (dataTypeId) { case omniruntime::type::OMNI_DOUBLE: { - auto doubleValues = omniruntime::vec::unsafe::UnsafeBaseVector::GetRawValues( + auto doubleValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( static_cast*>(vec)); return nextByType(doubleValues, numValues, hasNull ? nulls : nullptr); } @@ -520,7 +516,7 @@ namespace omniruntime::reader { omniruntime::vec::LargeStringContainer>*>(vec); if (hasNull) { for(uint64_t i=0; i < numValues; ++i) { - if (!null[i]) { + if (!nulls[i]) { int64_t entry = outputLengths[i]; if (entry < 0 || static_cast(entry) >= dictionaryCount ) { throw orc::ParseError("Entry index out of range in StringDictionaryColumn"); @@ -570,7 +566,7 @@ namespace omniruntime::reader { lengthRle->next(lengthPtr, numValues, nulls); // figure out the total length of data we need from the blob stream - const size_t totalLength = computeSize(lengthPtr, notNull, numValues); + const size_t totalLength = computeSize(lengthPtr, nulls, numValues); // Load data from the blob stream into our buffer until we have enough // to get the rest directly out of the stream's buffer. @@ -630,8 +626,6 @@ namespace omniruntime::reader { filledSlots += 1; } } - - omnivec = newVector.release(); } void OmniDecimal64ColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { @@ -646,7 +640,7 @@ namespace omniruntime::reader { auto vector = reinterpret_cast*>(vec); if (hasNull) { for(size_t i = 0; i < numValues; ++i) { - if (!null[i]) { + if (!nulls[i]) { int64_t value = 0; readInt64(value, static_cast(scaleBuffer[i])); vector->SetValue(static_cast(i), static_cast(value)); @@ -690,7 +684,7 @@ namespace omniruntime::reader { __int128_t dst = value.getHighBits(); dst <<= 64; dst |= value.getLowBits(); - newVector->SetValue(i, omniruntime::type::Decimal128(dst)); + vector->SetValue(i, omniruntime::type::Decimal128(dst)); } } } diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh index 8ac694b5b..42cc23b57 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh @@ -32,7 +32,7 @@ namespace omniruntime::reader { class OmniColumnReader: public orc::ColumnReader { public: - OmniColumnReader(const orc::Type& type, orc::StripeStream& stripe); + OmniColumnReader(const orc::Type& type, orc::StripeStreams& stripe); virtual ~OmniColumnReader() {} @@ -57,6 +57,8 @@ namespace omniruntime::reader { * @param positions a list of PositionProviders storing the positions */ virtual void seekToRowGroup(std::unordered_map &positions); + + std::unique_ptr notNullDecoder; }; class OmniStructColumnReader: public OmniColumnReader { @@ -150,7 +152,7 @@ namespace omniruntime::reader { void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; template - void nextByType(T *data, uint64_t numValues, bool *nulls; + void nextByType(T *data, uint64_t numValues, bool *nulls); private: std::unique_ptr secondsRle; @@ -169,8 +171,7 @@ namespace omniruntime::reader { uint64_t skip(uint64_t numValues) override; - void seekToRowGroup( - std::unordered_map& positions) override; + void seekToRowGroup(std::unordered_map& positions) override; }; class OmniDoubleColumnReader: public OmniColumnReader { @@ -183,7 +184,7 @@ namespace omniruntime::reader { void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; template - void nextByType(T *data, uint64_t numValues, bool *nulls; + void nextByType(T *data, uint64_t numValues, bool *nulls); void seekToRowGroup( std::unordered_map& positions) override; diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh index 8576f4682..0d9a814a4 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh @@ -61,6 +61,8 @@ namespace omniruntime::reader { void next(int16_t *data, uint64_t numValues, bool *nulls); + void next(bool *data, uint64_t numValues, bool *nulls); + template void next(T *data, uint64_t numValues, bool *nulls); -- Gitee