diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc index 1f3dfbbc90fe9e8a5689c6c76461910723878b43..93bb03f4a0061a610b6135b005f6d020a05329ad 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc @@ -24,6 +24,7 @@ namespace omniruntime::reader { const int MINIMUM_REPEAT = 3; const int MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT; + const uint32_t BITS_OF_BYTE = 8; void OmniBooleanRleDecoder::seek(orc::PositionProvider& location) { OmniByteRleDecoder::seek(location); @@ -54,86 +55,115 @@ namespace omniruntime::reader { } } - void OmniBooleanRleDecoder::next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId) { - auto dataTypeId = static_cast(omniTypeId); - std::unique_ptr tempOmnivec = makeNewVector(numValues, baseTp, dataTypeId); - auto pushOmniVec = tempOmnivec.get(); - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - nextByType(pushOmniVec, numValues, notNull, baseTp, omniTypeId); - break; - case omniruntime::type::OMNI_SHORT: - throw std::runtime_error("OmniBooleanRleDecoder SHORT not finished!!!"); - break; - case omniruntime::type::OMNI_INT: - throw std::runtime_error("OmniBooleanRleDecoder INT not finished!!!"); - break; - case omniruntime::type::OMNI_LONG: - throw std::runtime_error("OmniBooleanRleDecoder LONG not finished!!!"); - break; - case omniruntime::type::OMNI_TIMESTAMP: - throw std::runtime_error("OmniBooleanRleDecoder TIMESTAMP not finished!!!"); - break; - case omniruntime::type::OMNI_DATE32: - throw std::runtime_error("OmniBooleanRleDecoder DATE32 not finished!!!"); - break; - case omniruntime::type::OMNI_DATE64: - throw std::runtime_error("OmniBooleanRleDecoder DATE64 not finished!!!"); - break; - case omniruntime::type::OMNI_DOUBLE: - throw std::runtime_error("OmniBooleanRleDecoder DOUBLE not finished!!!"); - break; - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("OmniBooleanRleDecoder CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("OmniBooleanRleDecoder VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("OmniBooleanRleDecoder DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("OmniBooleanRleDecoder DECIMAL64 should not in here!!!"); - default: - printf("OmniBooleanRleDecoder switch no process!!!"); + void OmniBooleanRleDecoder::nextNulls(bool *data, uint64_t numValues, bool *nulls) { + // next spot to fill in + uint64_t position = 0; + // use up any remaining bits + if (nulls) { + while (remainingBits > 0 && position < numValues) { + if (!nulls[position]) { + remainingBits -= 1; + data[position] = !((static_cast(lastByte) >> remainingBits) & 0x1); + } else { + data[position] = true; + } + position += 1; + } + } else { + while (remainingBits > 0 && position < numValues) { + remainingBits -= 1; + data[position] = !((static_cast(lastByte) >> remainingBits) & 0x1); + position += 1; + } } + // count the number of nulls remaining + uint64_t nonNulls = numValues - position; + if (nulls) { + for (uint64_t i = position; i < numValues; ++i) { + if (nulls[i]) { + nonNulls -= 1; + } + } + } + // fill in the remaining values + if (nonNulls == 0) { + while (position < numValues) { + data[position++] = true; + } + } else if (position < numValues) { + // read the new bytes into array + uint64_t bytesRead = (nonNulls + 7) / 8; + OmniByteRleDecoder::next(reinterpret_cast(data + position), bytesRead, nullptr); + lastByte = data[position + bytesRead - 1]; + remainingBits = bytesRead * 8 - nonNulls; + // expand the array backwards so that we don't clobber the data + uint64_t bitsLeft = bytesRead * 8 - remainingBits; + if (nulls) { + for (int64_t i = static_cast(numValues) - 1; i >= static_cast(position); --i) { + if (!nulls[i]) { + uint64_t shiftPosn = (-bitsLeft) % 8; + data[i] = !((data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x01); + bitsLeft -= 1; + } else { + data[i] = true; + } + } + } else { + for (int64_t i = static_cast(numValues) - 1; i >= static_cast(position); --i) { + uint64_t shiftPosn = (-bitsLeft) % 8; + data[i] = !((data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x01); + bitsLeft -= 1; + } + } + } + } - omnivec = tempOmnivec.release(); + + void OmniBooleanRleDecoder::next(omniruntime::vec::BaseVector *omnivec, uint64_t numValues, bool *nulls, + int omniTypeId) { + switch (omniTypeId) { + case omniruntime::type::OMNI_BOOLEAN: { + auto boolValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(omnivec)); + return next(boolValues, numValues, nulls); + } + default: + throw std::runtime_error("OmniBooleanRleDecoder not support type: " + omniTypeId); + } } - template - void OmniBooleanRleDecoder::nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(omnivec); + void OmniBooleanRleDecoder::next(char *data, uint64_t numValues, bool *nulls) { + next(data, numValues, nulls); + } + template + void OmniBooleanRleDecoder::next(T *data, uint64_t numValues, bool *nulls) { // next spot to fill in uint64_t position = 0; // use up any remaining bits - if (notNull) { - while(remainingBits > 0 && position < numValues) { - if (notNull[position]) { + if (nulls) { + while (remainingBits > 0 && position < numValues) { + if (!nulls[position]) { remainingBits -= 1; - vec->SetValue(static_cast(position), static_cast((static_cast(lastByte) >> - remainingBits) & 0x1)); + data[position] = static_cast((static_cast(lastByte) >> remainingBits) & 0x1); } else { - vec->SetNull(static_cast(position)); + data[position] = 0; } position += 1; } } else { - while(remainingBits > 0 && position < numValues) { + while (remainingBits > 0 && position < numValues) { remainingBits -= 1; - vec->SetValue(static_cast(position++), static_cast((static_cast(lastByte) >> - remainingBits) & 0x1)); + data[position++] = static_cast((static_cast(lastByte) >> remainingBits) & 0x1); } } // count the number of nonNulls remaining uint64_t nonNulls = numValues - position; - if (notNull) { - for(uint64_t i = position; i < numValues; ++i) { - if (!notNull[i]) { + if (nulls) { + for (uint64_t i = position; i < numValues; ++i) { + if (!nulls[i]) { nonNulls -= 1; } } @@ -142,35 +172,31 @@ namespace omniruntime::reader { // fill in the remaining values if (nonNulls == 0) { while (position < numValues) { - vec->SetNull(static_cast(position++)); + data[position++] = 0; } } else if (position < numValues) { // read the new bytes into the array uint64_t bytesRead = (nonNulls + 7) / 8; - auto *values = reinterpret_cast(omniruntime::vec::VectorHelper::UnsafeGetValues(omnivec)); - OmniByteRleDecoder::next(values + position, bytesRead, nullptr); - lastByte = static_cast(vec->GetValue(position + bytesRead - 1)); + OmniByteRleDecoder::next(reinterpret_cast(data + position), bytesRead, nullptr); + lastByte = data[position + bytesRead - 1]; remainingBits = bytesRead * 8 - nonNulls; // expand the array backwards so that we don't clobber the data - uint64_t bitsLeft = bytesRead * 8- remainingBits; - if (notNull) { - for (int64_t i =static_cast(numValues) - 1; - i >= static_cast(position); --i) { - if (notNull[i]) { + uint64_t bitsLeft = bytesRead * 8 - remainingBits; + if (nulls) { + for (int64_t i = static_cast(numValues) - 1; i >= static_cast(position); --i) { + if (!nulls[i]) { uint64_t shiftPosn = (-bitsLeft) % 8; - auto value = static_cast(vec->GetValue(position + (bitsLeft - 1) / 8)) >> shiftPosn; - vec->SetValue(static_cast(i), static_cast(value & 0x1)); + data[i] = static_cast((data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1); bitsLeft -= 1; } else { - vec->SetNull(static_cast(i)); + data[i] = 0; } } } else { - for(int64_t i = static_cast(numValues) - 1; + for (int64_t i = static_cast(numValues) - 1; i >= static_cast(position); --i, --bitsLeft) { uint64_t shiftPosn = (-bitsLeft) % 8; - auto value = static_cast(vec->GetValue(position + (bitsLeft - 1) / 8)) >> shiftPosn; - vec->SetValue(static_cast(i), static_cast(value & 0x1)); + data[i] = static_cast((data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1); } } } @@ -329,129 +355,5 @@ namespace omniruntime::reader { } } - void OmniByteRleDecoder::next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId) { - auto dataTypeId = static_cast(omniTypeId); - std::unique_ptr tempOmnivec = makeNewVector(numValues, baseTp, dataTypeId); - auto pushOmniVec = tempOmnivec.get(); - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_SHORT: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_INT: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_LONG: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_TIMESTAMP: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DATE32: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DATE64: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DOUBLE: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("OmniByteRleDecoder CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("OmniByteRleDecoder VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("OmniByteRleDecoder DECIMAL64 not finished!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("OmniByteRleDecoder DECIMAL64 not finished!!!"); - default: - printf("OmniByteRleDecoder swtich no process!!!"); - } - - omnivec = tempOmnivec.release(); - } - - template - void OmniByteRleDecoder::nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(omnivec); - - uint64_t position = 0; - // skip over null values - while (notNull && position < numValues && !notNull[position]) { - position += 1; - } - while (position < numValues) { - // if we are out of values, read more - if (remainingValues == 0) { - readHeader(); - } - // how many do we read out of this block? - size_t count = std::min(static_cast(numValues - position), - remainingValues); - uint64_t consumed = 0; - if (repeating) { - if (notNull) { - for(uint64_t i=0; i < count; ++i) { - if (notNull[position + i]) { - vec->SetValue(static_cast(position + i), static_cast(value)); - consumed += 1; - } else { - vec->SetNull(static_cast(position + i)); - } - } - } else { - for (uint64_t i = position; i < position + count; ++i) { - vec->SetValue(static_cast(i), static_cast(value)); - } - consumed = count; - } - } else { - if (notNull) { - for(uint64_t i = 0; i < count; ++i) { - if (notNull[position + i]) { - vec->SetValue(static_cast(position + i), static_cast(readByte())); - consumed += 1; - } else { - vec->SetNull(static_cast(position + i)); - } - } - } else { - uint64_t i = 0; - while (i < count) { - if (bufferStart == bufferEnd) { - nextBuffer(); - } - uint64_t copyBytes = - std::min(static_cast(count - i), - static_cast(bufferEnd - bufferStart)); - vec->SetValues(static_cast(position + i), bufferStart, static_cast(copyBytes)); - bufferStart += copyBytes; - i += copyBytes; - } - consumed = count; - } - } - remainingValues -= consumed; - position += count; - // skip over any null values - while (notNull && position < numValues && !notNull[position]) { - position += 1; - } - } - } //OmniByteRleDecoder end } diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh index 0ed2153472a82cdc040999fab814b88e593c2627..95bf9f34765ed11363e70803484c2a3ea674ba9a 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.hh @@ -43,24 +43,17 @@ namespace omniruntime::reader { */ virtual void next(char* data, uint64_t numValues, char* notNull); - virtual void next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId); - - template - void nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp); - protected: - inline void nextBuffer(); - inline signed char readByte(); - inline void readHeader(); - - std::unique_ptr inputStream; - size_t remainingValues; - char value; - const char* bufferStart; - const char* bufferEnd; - bool repeating; + inline void nextBuffer(); + inline signed char readByte(); + inline void readHeader(); + + std::unique_ptr inputStream; + size_t remainingValues; + char value; + const char* bufferStart; + const char* bufferEnd; + bool repeating; }; class OmniBooleanRleDecoder: public OmniByteRleDecoder { @@ -80,19 +73,23 @@ namespace omniruntime::reader { virtual void skip(uint64_t numValues); /** - * Read a number of values into the batch. + * Read nulls flag to data. + */ + void nextNulls(bool *data, uint64_t numValues, bool *nulls); + + /** + * Read a number of values into the batch by nulls. */ + void next(omniruntime::vec::BaseVector *omnivec, uint64_t numValues, bool *nulls, int omniTypeId); - virtual void next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId); + template + void next(T *data, uint64_t numValues, bool *nulls); - template - void nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId); + void next(char *data, uint64_t numValues, bool *nulls); protected: - size_t remainingBits; - char lastByte; + size_t remainingBits; + char lastByte; }; } #endif diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc index 1335118385af6673a8c23b6de2a4a6c324998ac0..4734eb07f54f7184f429ef3bd4fed11c603e895a 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc @@ -25,6 +25,8 @@ #include "util/omni_exception.h" using omniruntime::vec::VectorBatch; +using omniruntime::vec::BaseVector; +using omniruntime::exception::OmniException; using orc::ColumnReader; using orc::ByteRleDecoder; using orc::Type; @@ -43,13 +45,13 @@ namespace omniruntime::reader { switch (static_cast(kind)) { case orc::proto::ColumnEncoding_Kind_DIRECT: case orc::proto::ColumnEncoding_Kind_DICTIONARY: - return orc::RleVersion_1; + return orc::RleVersion_1; case orc::proto::ColumnEncoding_Kind_DIRECT_V2: case orc::proto::ColumnEncoding_Kind_DICTIONARY_V2: - return orc::RleVersion_2; - default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", - "Unknown encoding in omniConvertRleVersion"); + return orc::RleVersion_2; + default: + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "Unknown encoding in omniConvertRleVersion"); } } @@ -57,26 +59,63 @@ namespace omniruntime::reader { RleVersion version, MemoryPool& pool) { switch (static_cast(version)) { case orc::RleVersion_1: - // should not use - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "RleVersion_1 should not use!!!"); + // should not use + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "RleVersion_1 Not supported yet"); case orc::RleVersion_2: - return std::unique_ptr(new OmniRleDecoderV2(std::move(input), - isSigned, pool)); - default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "Not implemented yet"); + return std::unique_ptr(new OmniRleDecoderV2(std::move(input), isSigned, pool)); + default: + throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "Not implemented yet"); } } - std::unique_ptr createOmniBooleanRleDecoder - (std::unique_ptr input) { + std::unique_ptr createOmniBooleanRleDecoder(std::unique_ptr input) { OmniBooleanRleDecoder* decoder = new OmniBooleanRleDecoder(std::move(input)); return std::unique_ptr(reinterpret_cast(decoder)); } - std::unique_ptr createOmniByteRleDecoder - (std::unique_ptr input) { + std::unique_ptr createOmniByteRleDecoder(std::unique_ptr input) { return std::unique_ptr(new OmniByteRleDecoder(std::move(input))); } + + OmniColumnReader::OmniColumnReader(const Type &type, StripeStreams &stripe) + : ColumnReader(type, stripe) { + std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_PRESENT, true); + if (stream.get()) { + notNullDecoder = std::make_unique(std::move(stream)); + } + } + + uint64_t OmniColumnReader::skip(uint64_t numValues) { + if (notNullDecoder) { + // pass through the values that we want to skip and count how many are non-null + const uint64_t MAX_BUFFER_SIZE = 32768; + uint64_t bufferSize = std::min(MAX_BUFFER_SIZE, numValues); + // buffer, 0: null; 1: non-null + char buffer[MAX_BUFFER_SIZE]; + uint64_t remaining = numValues; + while (remaining > 0) { + uint64_t chunkSize = std::min(remaining, bufferSize); + notNullDecoder->next(buffer, chunkSize, nullptr); + remaining -= chunkSize; + // update non-null count + for (uint64_t i = 0; i < chunkSize; i++) { + if (!buffer[i]) { + // minus null + numValues -= 1; + } + } + } + } + return numValues; + } + + void OmniColumnReader::seekToRowGroup(std::unordered_map &positions) { + if (notNullDecoder) { + notNullDecoder->seek(positions.at(columnId)); + } + } + /** * Create a reader for the given stripe. @@ -88,91 +127,68 @@ namespace omniruntime::reader { case orc::INT: case orc::LONG: case orc::SHORT: - return std::unique_ptr( - new OmniIntegerColumnReader(type, stripe)); + return std::make_unique(type, stripe); case orc::BINARY: case orc::CHAR: case orc::STRING: case orc::VARCHAR: - switch (static_cast(stripe.getEncoding(type.getColumnId()).kind())){ + switch (static_cast(stripe.getEncoding(type.getColumnId()).kind())) { case orc::proto::ColumnEncoding_Kind_DICTIONARY: case orc::proto::ColumnEncoding_Kind_DICTIONARY_V2: - return std::unique_ptr( - new OmniStringDictionaryColumnReader(type, stripe)); + return std::make_unique(type, stripe); case orc::proto::ColumnEncoding_Kind_DIRECT: case orc::proto::ColumnEncoding_Kind_DIRECT_V2: - return std::unique_ptr( - new OmniStringDirectColumnReader(type, stripe)); + return std::make_unique(type, stripe); default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", - "omniBuildReader unhandled string encoding"); - } + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "omniBuildReader unhandled string encoding"); + } - case orc::BOOLEAN: - return std::unique_ptr(new OmniBooleanColumnReader(type, stripe)); + case orc::BOOLEAN: + return std::make_unique(type, stripe); - case orc::BYTE: - return std::unique_ptr(new OmniByteColumnReader(type, stripe)); + case orc::BYTE: + return std::make_unique(type, stripe); case orc::STRUCT: - return std::unique_ptr( - new OmniStructColumnReader(type, stripe, julianPtr)); + return std::make_unique(type, stripe, julianPtr); case orc::TIMESTAMP: return std::unique_ptr (new OmniTimestampColumnReader(type, stripe, false, julianPtr)); case orc::TIMESTAMP_INSTANT: - return std::unique_ptr - (new OmniTimestampColumnReader(type, stripe, true, julianPtr)); + return std::make_unique(type, stripe, true, julianPtr); case orc::DECIMAL: - // Is this a Hive 0.11 or 0.12 file? - if (type.getPrecision() == 0) { - return std::unique_ptr - (new OmniDecimalHive11ColumnReader(type, stripe)); - } else if (type.getPrecision() <= - OmniDecimal64ColumnReader::MAX_PRECISION_64) { - return std::unique_ptr - (new OmniDecimal64ColumnReader(type, stripe)); - } else { - return std::unique_ptr - (new OmniDecimal128ColumnReader(type, stripe)); - } + // Is this a Hive 0.11 or 0.12 file? + if (type.getPrecision() == 0) { + return std::make_unique(type, stripe); + } else if (type.getPrecision() <= OmniDecimal64ColumnReader::MAX_PRECISION_64) { + return std::make_unique(type, stripe); + } else { + return std::make_unique(type, stripe); + } - case orc::FLOAT: - case orc::DOUBLE: - return std::unique_ptr( - new OmniDoubleColumnReader(type, stripe)); + case orc::FLOAT: + case orc::DOUBLE: + return std::make_unique(type, stripe); - default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "omniBuildReader unhandled type"); + default: + throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "omniBuildReader unhandled type"); } } - inline void readNulls(ColumnReader* colReader, uint64_t numValues, char* incomingMask, char* nulls, bool& hasNull) { - ByteRleDecoder* decoder = colReader->notNullDecoder.get(); - // TO do 需要将char*转换为bool*数组, 可以优化 - if (decoder) { - decoder->next(nulls, numValues, incomingMask); + inline void readNulls(OmniColumnReader *colReader, uint64_t numValues, bool *incomingNulls, bool *nulls) { + if (colReader->notNullDecoder) { + colReader->notNullDecoder->nextNulls(nulls, numValues, incomingNulls); // check to see if there are nulls in this batch - for(uint64_t i=0; i < numValues; ++i) { - auto ptr = nulls; - if (!(ptr[i])) { - // To do hasNull is protected - hasNull = true; - return; - } - } - } else if (incomingMask) { - // if we don't have a notNull stream, copy the incomingMask + } else if (incomingNulls) { + // if we don't have a notNull stream, copy the incomingNulls // To do finished - hasNull = true; - memcpy_s(nulls, numValues, incomingMask, numValues); + memcpy_s(nulls, numValues, incomingNulls, numValues); return; } - // To do hasNull is protected - hasNull = false; } void scaleInt128(orc::Int128& value, uint32_t scale, uint32_t currentScale) { @@ -233,28 +249,29 @@ namespace omniruntime::reader { * OmniStructColumnReader funcs */ OmniStructColumnReader::OmniStructColumnReader(const Type& type, StripeStreams& stripe, - common::JulianGregorianRebase *julianPtr): ColumnReader(type, stripe) { + common::JulianGregorianRebase *julianPtr): OmniColumnReader(type, stripe) { // count the number of selected sub-columns const std::vector selectedColumns = stripe.getSelectedColumns(); switch (static_cast(stripe.getEncoding(columnId).kind())) { - case orc::proto::ColumnEncoding_Kind_DIRECT: - for(unsigned int i=0; i < type.getSubtypeCount(); ++i) { + case orc::proto::ColumnEncoding_Kind_DIRECT: + for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) { const Type& child = *type.getSubtype(i); if (selectedColumns[static_cast(child.getColumnId())]) { children.push_back(omniBuildReader(child, stripe, julianPtr)); } } break; - case orc::proto::ColumnEncoding_Kind_DIRECT_V2: - case orc::proto::ColumnEncoding_Kind_DICTIONARY: - case orc::proto::ColumnEncoding_Kind_DICTIONARY_V2: + case orc::proto::ColumnEncoding_Kind_DIRECT_V2: + case orc::proto::ColumnEncoding_Kind_DICTIONARY: + case orc::proto::ColumnEncoding_Kind_DICTIONARY_V2: default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "Unknown encoding for OmniStructColumnReader"); + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "Unknown encoding for OmniStructColumnReader"); } } uint64_t OmniStructColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); for(auto& ptr : children) { ptr->skip(numValues); } @@ -262,17 +279,13 @@ namespace omniruntime::reader { } void OmniStructColumnReader::next(void *&batch, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) { + const orc::Type& baseTp, int* omniTypeId) { auto vecs = reinterpret_cast*>(batch); - nextInternal(*vecs, numValues, notNull, baseTp, omniTypeId); - } - - void OmniStructColumnReader::nextEncoded(orc::ColumnVectorBatch& rowBatch, uint64_t numValues, char *notNull) { - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "OmniStructColumnReader::nextEncoded not finished!!!"); + nextInternal(*vecs, numValues, nullptr, baseTp, omniTypeId); } void OmniStructColumnReader::seekToRowGroup(std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); for(auto& ptr : children) { ptr->seekToRowGroup(positions); @@ -281,34 +294,34 @@ namespace omniruntime::reader { template void OmniStructColumnReader::nextInternal(std::vector &vecs, uint64_t numValues, - char *notNull, const orc::Type& baseTp, int* omniTypeId) { + bool *incomingNulls, const orc::Type& baseTp, int* omniTypeId) { + + if (encoded) { + std::string message("OmniStructColumnReader::nextInternal encoded is not finished!"); + throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", message); + } bool hasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, hasNull); + bool nulls[numValues]; + readNulls(this, numValues, incomingNulls, nulls); + uint64_t i = 0; - uint64_t i=0; - notNull = hasNull ? nulls : nullptr; for(auto iter = children.begin(); iter != children.end(); ++iter, ++i) { - if (encoded) { - std::string message("OmniStructColumnReader::nextInternal encoded is not finished!!!"); - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", message); + const Type* orcType = baseTp.getSubtype(i); + omniruntime::type::DataTypeId dataTypeId; + if (omniTypeId == nullptr) { + dataTypeId = getDefaultOmniType(orcType); } else { - omniruntime::vec::BaseVector* tempVec = nullptr; - const Type* type = baseTp.getSubtype(i); - if (omniTypeId == nullptr) { - int tempOmniTypeId = getOmniTypeByOrcType(type); - (*iter)->next(reinterpret_cast(tempVec), numValues, notNull, *type, &tempOmniTypeId); - vecs.push_back(tempVec); - } else { - (*iter)->next(reinterpret_cast(tempVec), numValues, notNull, *type, &omniTypeId[i]); - vecs.push_back(tempVec); - } + dataTypeId = static_cast(omniTypeId[i]); } + auto omnivector = omniruntime::reader::makeNewVector(numValues, orcType, dataTypeId); + reinterpret_cast(&(*iter->get()))->next(omnivector.get(), numValues, + hasNull ? nulls : nullptr, dataTypeId); + vecs.push_back(omnivector.release()); } } - omniruntime::type::DataTypeId OmniStructColumnReader::getOmniTypeByOrcType(const Type* type) { - constexpr int32_t OMNI_MAX_DECIMAL64_DIGITS = 18; + omniruntime::type::DataTypeId OmniStructColumnReader::getDefaultOmniType(const Type* type) { + constexpr int32_t OMNI_MAX_DECIMAL64_DIGITS = 18; switch (type->getKind()) { case orc::TypeKind::BOOLEAN: return omniruntime::type::OMNI_BOOLEAN; @@ -337,83 +350,77 @@ namespace omniruntime::reader { return omniruntime::type::OMNI_DECIMAL64; } default: - printf("no getOmniTypeByOrcType type to process!!!"); + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "Not Supported Type: " + type->getKind()); } - - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", - "OmniStructColumnReader::getOmniTypeByOrcType no type!!!"); - return omniruntime::type::OMNI_INVALID; } /** * all next funcs */ - void OmniIntegerColumnReader::next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) { - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - rle->next(reinterpret_cast(vec), numValues, HasNull ? nulls : nullptr, &baseTp, - *omniTypeId); - } - - void OmniBooleanColumnReader::next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) { - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - rle->next(reinterpret_cast(vec), numValues, HasNull ? nulls : nullptr, &baseTp, - *omniTypeId); - } - - void OmniByteColumnReader::next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) { - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - rle->next(reinterpret_cast(vec), numValues, HasNull ? nulls : nullptr, &baseTp, - *omniTypeId); - } - - void OmniTimestampColumnReader::next(void*& omnivec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) { - auto dataTypeId = static_cast(*omniTypeId); + void OmniIntegerColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + rle->next(vec, numValues, hasNull ? nulls : nullptr, omniTypeId); + } + + void OmniBooleanColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + if (omniTypeId != omniruntime::type::OMNI_BOOLEAN) { + throw OmniException("EXPRESSION_NOT_SUPPORT", "Not Supported Type: " + omniTypeId); + } + OmniBooleanRleDecoder *boolDecoder = reinterpret_cast(rle.get()); + boolDecoder->next(vec, numValues, hasNull ? nulls : nullptr, omniTypeId); + } + + void OmniByteColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + throw OmniException("EXPRESSION_NOT_SUPPORT", "Not Supported yet."); + } + + void OmniTimestampColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + auto dataTypeId = static_cast(omniTypeId); switch (dataTypeId) { - case omniruntime::type::OMNI_DATE32: - return nextByType(omnivec, numValues, notNull, baseTp, omniTypeId); - case omniruntime::type::OMNI_DATE64: - return nextByType(omnivec, numValues, notNull, baseTp, omniTypeId); - case omniruntime::type::OMNI_TIMESTAMP: - return nextByType(omnivec, numValues, notNull, baseTp, omniTypeId); + case omniruntime::type::OMNI_DATE32: { + auto intValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(vec)); + return nextByType(intValues, numValues, hasNull ? nulls : nullptr); + } + case omniruntime::type::OMNI_DATE64: { + auto longValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(vec)); + return nextByType(longValues, numValues, hasNull ? nulls : nullptr); + } + case omniruntime::type::OMNI_TIMESTAMP: { + auto longValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(vec)); + return nextByType(longValues, numValues, hasNull ? nulls : nullptr); + } default: - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", - "OmniTimestampColumnReader type not support!!!"); + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "OmniTimestampColumnReader type not support: " + dataTypeId); } } - template - void OmniTimestampColumnReader::nextByType(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = std::make_unique>(static_cast(numValues)); - - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - notNull = HasNull ? nulls : nullptr; + template + void OmniTimestampColumnReader::nextByType(T *data, uint64_t numValues, bool *nulls) { int64_t secsBuffer[numValues]; - secondsRle->next(secsBuffer, numValues, notNull); + secondsRle->next(secsBuffer, numValues, nulls); int64_t nanoBuffer[numValues]; - nanoRle->next(nanoBuffer, numValues, notNull); + nanoRle->next(nanoBuffer, numValues, nulls); // Construct the values - for(uint64_t i=0; i < numValues; i++) { - if (notNull == nullptr || notNull[i]) { + for(uint64_t i = 0; i < numValues; i++) { + if (nulls == nullptr || nulls[i]) { uint64_t zeros = nanoBuffer[i] & 0x7; nanoBuffer[i] >>= 3; if (zeros != 0) { - for(uint64_t j = 0; j <= zeros; ++j) { + for (uint64_t j = 0; j <= zeros; ++j) { nanoBuffer[i] *= 10; } } @@ -437,143 +444,79 @@ namespace omniruntime::reader { } if (julianPtr != nullptr) { - vec->SetValue(static_cast(i), static_cast( - julianPtr->RebaseJulianToGregorianMicros(secsBuffer[i] * 1000000L + nanoBuffer[i] / 1000L))); + data[i] = static_cast( + julianPtr->RebaseJulianToGregorianMicros(secsBuffer[i] * 1000000L + nanoBuffer[i] / 1000L)); } else { - vec->SetValue(static_cast(i), static_cast(secsBuffer[i] * 1000000L + nanoBuffer[i] / 1000L)); + data[i] = static_cast(secsBuffer[i] * 1000000L + nanoBuffer[i] / 1000L); } - } else { - vec->SetNull(static_cast(i)); } } - - omnivec = vec.release(); } - void OmniDoubleColumnReader::next(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - auto dataTypeId = static_cast(*omniTypeId); - std::unique_ptr tempOmnivec = makeNewVector(numValues, &baseTp, dataTypeId); - auto pushOmniVec = tempOmnivec.get(); + void OmniDoubleColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + auto dataTypeId = static_cast(omniTypeId); switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_SHORT: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_INT: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_LONG: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_TIMESTAMP: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DATE32: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DATE64: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_DOUBLE: - nextByType - (pushOmniVec, numValues, notNull, baseTp); - break; - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("OmniDoubleColumnReader_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("OmniDoubleColumnReader_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("OmniDoubleColumnReader_type DECIMAL64 not finished!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("OmniDoubleColumnReader_type DECIMAL64 not finished!!!"); + case omniruntime::type::OMNI_DOUBLE: { + auto doubleValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(vec)); + return nextByType(doubleValues, numValues, hasNull ? nulls : nullptr); + } default: - printf("OmniDoubleColumnReader_type swtich no process!!!"); + throw omniruntime::exception::OmniException( + "EXPRESSION_NOT_SUPPORT", "OmniDoubleColumnReader type not support: " + dataTypeId); } - - omnivec = tempOmnivec.release(); } - template - void OmniDoubleColumnReader::nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp) { - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - // update the notNull from the parent class - notNull = HasNull ? nulls : nullptr; - - using namespace omniruntime::type; - using T = typename NativeType::type; - - auto vec = reinterpret_cast*>(omnivec); - + template + void OmniDoubleColumnReader::nextByType(T *data, uint64_t numValues, bool *nulls) { if (columnKind == orc::FLOAT) { - if(notNull) { - for(size_t i=0; i < numValues; ++i) { - if(notNull[i]) { - vec->SetValue(static_cast(i), static_cast(readFloat())); - } else { - vec->SetNull(i); + if(nulls) { + for(size_t i = 0; i < numValues; ++i) { + if(!nulls[i]) { + data[i] = static_cast(readFloat()); } } } else { - for(size_t i=0; i < numValues; ++i) { - vec->SetValue(static_cast(i), static_cast(readFloat())); + for(size_t i = 0; i < numValues; ++i) { + data[i] = static_cast(readFloat()); } } } else { - if (notNull) { - for(size_t i=0; i < numValues; ++i) { - if (notNull[i]) { - vec->SetValue(static_cast(i), static_cast(readDouble())); - } else { - vec->SetNull(i); + if (nulls) { + for(size_t i = 0; i < numValues; ++i) { + if (!nulls[i]) { + data[i] = static_cast(readDouble()); } } } else { - for(size_t i=0; i < numValues; ++i) { - vec->SetValue(static_cast(i), static_cast(readDouble())); + for(size_t i = 0; i < numValues; ++i) { + data[i] = static_cast(readDouble()); } } } } - void OmniStringDictionaryColumnReader::next(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - bool HasNull = false; - auto newVector = std::make_unique>>(numValues); - - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - // update the notNull from the parent class - notNull = HasNull ? nulls : nullptr; - - - bool is_char = false; - if (baseTp.getKind() == orc::TypeKind::CHAR) { - is_char = true; - } + void OmniStringDictionaryColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, + int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); char *blob = dictionary->dictionaryBlob.data(); int64_t *dictionaryOffsets = dictionary->dictionaryOffset.data(); int64_t outputLengths[numValues]; - rle->next(outputLengths, numValues, notNull); + rle->next(outputLengths, numValues, nulls); uint64_t dictionaryCount = dictionary->dictionaryOffset.size() - 1; - if (notNull) { + + auto varcharVector = reinterpret_cast>*>(vec); + if (hasNull) { for(uint64_t i=0; i < numValues; ++i) { - if (notNull[i]) { + if (!nulls[i]) { int64_t entry = outputLengths[i]; if (entry < 0 || static_cast(entry) >= dictionaryCount ) { throw orc::ParseError("Entry index out of range in StringDictionaryColumn"); @@ -583,13 +526,13 @@ namespace omniruntime::reader { auto len = dictionaryOffsets[entry+1] - dictionaryOffsets[entry]; char* ptr = blob + dictionaryOffsets[entry]; - if (is_char) { + if (isChar) { FindLastNotEmpty(ptr, len); } auto data = std::string_view(ptr, len); - newVector->SetValue(i, data); + varcharVector->SetValue(i, data); } else { - newVector->SetNull(i); + varcharVector->SetNull(i); } } } else { @@ -603,39 +546,27 @@ namespace omniruntime::reader { auto len = dictionaryOffsets[entry+1] - dictionaryOffsets[entry]; char* ptr = blob + dictionaryOffsets[entry]; - if (is_char) { + if (isChar) { FindLastNotEmpty(ptr, len); } auto data = std::string_view(ptr, len); - newVector->SetValue(i, data); + varcharVector->SetValue(i, data); } } - - omnivec = newVector.release(); } - void OmniStringDirectColumnReader::next(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - bool HasNull = false; - auto newVector = std::make_unique>>(numValues); - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - // update the notNull from the parent class - notNull = HasNull ? nulls : nullptr; + void OmniStringDirectColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); int64_t lengthPtr[numValues]; - bool is_char = false; - if (baseTp.getKind() == orc::TypeKind::CHAR) { - is_char = true; - } - // read the length vector - lengthRle->next(lengthPtr, numValues, notNull); + lengthRle->next(lengthPtr, numValues, nulls); // figure out the total length of data we need from the blob stream - const size_t totalLength = computeSize(lengthPtr, notNull, numValues); + const size_t totalLength = computeSize(lengthPtr, nulls, numValues); // Load data from the blob stream into our buffer until we have enough // to get the rest directly out of the stream's buffer. @@ -660,22 +591,24 @@ namespace omniruntime::reader { lastBufferLength -= moreBytes; } + auto varcharVector = reinterpret_cast>*>(vec); size_t filledSlots = 0; char* tempPtr = ptr; - if (notNull) { + if (hasNull) { while (filledSlots < numValues) { - if (notNull[filledSlots]) { + if (!nulls[filledSlots]) { //求出长度,如果为char,则需要去除最后的空格 auto len = lengthPtr[filledSlots]; - if (is_char) { + if (isChar) { FindLastNotEmpty(tempPtr, len); } auto data = std::string_view(tempPtr, len); - newVector->SetValue(filledSlots, data); + varcharVector->SetValue(filledSlots, data); tempPtr += lengthPtr[filledSlots]; } else { - newVector->SetNull(filledSlots); + varcharVector->SetNull(filledSlots); } filledSlots += 1; } @@ -683,106 +616,92 @@ namespace omniruntime::reader { while (filledSlots < numValues) { //求出长度,如果为char,则需要去除最后的空格 auto len = lengthPtr[filledSlots]; - if (is_char) { + if (isChar) { FindLastNotEmpty(tempPtr, len); } auto data = std::string_view(tempPtr, len); - newVector->SetValue(filledSlots, data); + varcharVector->SetValue(filledSlots, data); tempPtr += lengthPtr[filledSlots]; filledSlots += 1; } } - - omnivec = newVector.release(); } - void OmniDecimal64ColumnReader::next(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - auto newVector = std::make_unique>(numValues); - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - notNull = HasNull ? nulls : nullptr; + void OmniDecimal64ColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + // read the next group of scales int64_t scaleBuffer[numValues]; - scaleDecoder->next(scaleBuffer, numValues, notNull); - + scaleDecoder->next(scaleBuffer, numValues, nulls); - if (notNull) { - for(size_t i=0; i < numValues; ++i) { - if (notNull[i]) { + auto vector = reinterpret_cast*>(vec); + if (hasNull) { + for(size_t i = 0; i < numValues; ++i) { + if (!nulls[i]) { int64_t value = 0; readInt64(value, static_cast(scaleBuffer[i])); - newVector->SetValue(static_cast(i), static_cast(value)); - } else { - newVector->SetNull(static_cast(i)); + vector->SetValue(static_cast(i), static_cast(value)); } } } else { - for(size_t i=0; i < numValues; ++i) { + for(size_t i = 0; i < numValues; ++i) { int64_t value = 0; readInt64(value, static_cast(scaleBuffer[i])); - newVector->SetValue(static_cast(i), static_cast(value)); + vector->SetValue(static_cast(i), static_cast(value)); } } - - omnivec = newVector.release(); } - void OmniDecimal128ColumnReader::next(void*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - auto newVector = std::make_unique>(numValues); - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - notNull = HasNull ? nulls : nullptr; + void OmniDecimal128ColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); + // read the next group of scales int64_t scaleBuffer[numValues]; - scaleDecoder->next(scaleBuffer, numValues, notNull); - if (notNull) { - for(size_t i=0; i < numValues; ++i) { - if (notNull[i]) { + scaleDecoder->next(scaleBuffer, numValues, nulls); + + auto vector = reinterpret_cast*>(vec); + if (hasNull) { + for(size_t i = 0; i < numValues; ++i) { + if (!nulls[i]) { orc::Int128 value = 0; readInt128(value, static_cast(scaleBuffer[i])); __int128_t dst = value.getHighBits(); dst <<= 64; dst |= value.getLowBits(); - newVector->SetValue(i, omniruntime::type::Decimal128(dst)); - } else { - newVector->SetNull(i); + vector->SetValue(i, omniruntime::type::Decimal128(dst)); } } } else { - for(size_t i=0; i < numValues; ++i) { + for(size_t i = 0; i < numValues; ++i) { orc::Int128 value = 0; readInt128(value, static_cast(scaleBuffer[i])); __int128_t dst = value.getHighBits(); dst <<= 64; dst |= value.getLowBits(); - newVector->SetValue(i, omniruntime::type::Decimal128(dst)); + vector->SetValue(i, omniruntime::type::Decimal128(dst)); } } - - omnivec = newVector.release(); } - void OmniDecimalHive11ColumnReader::next(void*&vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) { - auto newVector = std::make_unique>(numValues); - bool HasNull = false; - char nulls[numValues]; - readNulls(this, numValues, notNull, nulls, HasNull); - notNull = HasNull ? nulls : nullptr; + void OmniDecimalHive11ColumnReader::next(BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) { + auto nulls = omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vec); + readNulls(this, numValues, incomingNulls, nulls); + bool hasNull = vec->HasNull(); // read the next group of scales int64_t scaleBuffer[numValues]; - scaleDecoder->next(scaleBuffer, numValues, notNull); + scaleDecoder->next(scaleBuffer, numValues, nulls); - if (notNull) { + auto vector = reinterpret_cast*>(vec); + if (hasNull) { for (size_t i = 0; i < numValues; ++i) { - if (notNull[i]) { + if (!nulls[i]) { orc::Int128 value = 0; if (!readInt128(value, static_cast(scaleBuffer[i]))) { if (throwOnOverflow) { @@ -791,17 +710,15 @@ namespace omniruntime::reader { *errorStream << "Warning: " << "Hive 0.11 decimal with more than 38 digits " << "replaced by NULL. \n"; - newVector->SetNull(i); + vector->SetNull(i); } } else { __int128_t dst = value.getHighBits(); dst <<= 64; dst |= value.getLowBits(); - newVector->SetValue(i, omniruntime::type::Decimal128(dst)); + vector->SetValue(i, omniruntime::type::Decimal128(dst)); } - } else { - newVector->SetNull(i); - } + } } } else { for (size_t i = 0; i < numValues; ++i) { @@ -813,25 +730,24 @@ namespace omniruntime::reader { *errorStream << "Warning: " << "Hive 0.11 decimal with more than 38 digits " << "replaced by NULL. \n"; - newVector->SetNull(i); + vector->SetNull(i); } } else { __int128_t dst = value.getHighBits(); dst <<= 64; dst |= value.getLowBits(); - newVector->SetValue(i, omniruntime::type::Decimal128(dst)); + vector->SetValue(i, omniruntime::type::Decimal128(dst)); } } } } - OmniIntegerColumnReader::OmniIntegerColumnReader(const Type& type, - StripeStreams& stripe): ColumnReader(type, stripe) { + OmniIntegerColumnReader::OmniIntegerColumnReader(const Type& type, StripeStreams& stripe) + : OmniColumnReader(type, stripe) { RleVersion vers = omniConvertRleVersion(stripe.getEncoding(columnId).kind()); - std::unique_ptr stream = - stripe.getStream(columnId, orc::proto::Stream_Kind_DATA, true); + std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_DATA, true); if (stream == nullptr) - throw omniruntime::exception::OmniException("EXPRESSION_NOT_SUPPORT", "DATA stream not found in Integer column"); + throw OmniException("EXPRESSION_NOT_SUPPORT", "DATA stream not found in Integer column"); rle = createOmniRleDecoder(std::move(stream), true, vers, memoryPool); } @@ -840,14 +756,14 @@ namespace omniruntime::reader { } uint64_t OmniIntegerColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); rle->skip(numValues); return numValues; } void OmniIntegerColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); rle->seek(positions.at(columnId)); } @@ -876,7 +792,7 @@ namespace omniruntime::reader { OmniDecimal64ColumnReader::OmniDecimal64ColumnReader(const Type& type, StripeStreams& stripe - ): ColumnReader(type, stripe) { + ): OmniColumnReader(type, stripe) { scale = static_cast(type.getScale()); precision = static_cast(type.getPrecision()); valueStream = stripe.getStream(columnId, orc::proto::Stream_Kind_DATA, true); @@ -897,7 +813,7 @@ namespace omniruntime::reader { } uint64_t OmniDecimal64ColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); uint64_t skipped = 0; while (skipped < numValues) { readBuffer(); @@ -911,7 +827,7 @@ namespace omniruntime::reader { void OmniDecimal64ColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); valueStream->seek(positions.at(columnId)); scaleDecoder->seek(positions.at(columnId)); // clear buffer state after seek @@ -998,7 +914,7 @@ namespace omniruntime::reader { StripeStreams& stripe, bool isInstantType, common::JulianGregorianRebase *julianPtr - ): ColumnReader(type, stripe), + ): OmniColumnReader(type, stripe), writerTimezone(isInstantType ? orc::getTimezoneByName("GMT") : stripe.getWriterTimezone()), @@ -1025,7 +941,7 @@ namespace omniruntime::reader { } uint64_t OmniTimestampColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); secondsRle->skip(numValues); nanoRle->skip(numValues); return numValues; @@ -1033,13 +949,13 @@ namespace omniruntime::reader { void OmniTimestampColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); secondsRle->seek(positions.at(columnId)); nanoRle->seek(positions.at(columnId)); } OmniStringDirectColumnReader::OmniStringDirectColumnReader(const Type& type, StripeStreams& stripe) - : ColumnReader(type, stripe) { + : OmniColumnReader(type, stripe) { RleVersion rleVersion = omniConvertRleVersion(stripe.getEncoding(columnId).kind()); std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_LENGTH, true); @@ -1052,6 +968,9 @@ namespace omniruntime::reader { throw orc::ParseError("DATA stream not found in StringDirectColumn"); lastBuffer = nullptr; lastBufferLength = 0; + if (type.getKind() == orc::TypeKind::CHAR) { + isChar = true; + } } OmniStringDirectColumnReader::~OmniStringDirectColumnReader() { @@ -1060,7 +979,7 @@ namespace omniruntime::reader { uint64_t OmniStringDirectColumnReader::skip(uint64_t numValues) { const size_t BUFFER_SIZE = 1024; - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); int64_t buffer[BUFFER_SIZE]; uint64_t done = 0; size_t totalBytes = 0; @@ -1091,18 +1010,16 @@ namespace omniruntime::reader { return numValues; } - size_t OmniStringDirectColumnReader::computeSize(const int64_t* lengths, - const char* notNull, - uint64_t numValues) { + size_t OmniStringDirectColumnReader::computeSize(const int64_t* lengths, bool *nulls, uint64_t numValues) { size_t totalLength = 0; - if (notNull) { - for(size_t i=0; i < numValues; ++i) { - if (notNull[i]) { + if (nulls) { + for(size_t i = 0; i < numValues; ++i) { + if (!nulls[i]) { totalLength += static_cast(lengths[i]); } } } else { - for(size_t i=0; i < numValues; ++i) { + for(size_t i = 0; i < numValues; ++i) { totalLength += static_cast(lengths[i]); } } @@ -1111,7 +1028,7 @@ namespace omniruntime::reader { void OmniStringDirectColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); blobStream->seek(positions.at(columnId)); lengthRle->seek(positions.at(columnId)); // clear buffer state after seek @@ -1120,8 +1037,7 @@ namespace omniruntime::reader { } OmniStringDictionaryColumnReader::OmniStringDictionaryColumnReader(const Type& type, StripeStreams& stripe) - : ColumnReader(type, stripe), - dictionary(new orc::StringDictionary(stripe.getMemoryPool())) { + : OmniColumnReader(type, stripe), dictionary(new orc::StringDictionary(stripe.getMemoryPool())) { RleVersion rleVersion = omniConvertRleVersion(stripe.getEncoding(columnId) .kind()); uint32_t dictSize = stripe.getEncoding(columnId).dictionarysize(); @@ -1156,6 +1072,9 @@ namespace omniruntime::reader { "DICTIONARY_DATA stream not found in StringDictionaryColumn"); } omniReadFully(dictionary->dictionaryBlob.data(), blobSize, blobStream.get()); + if (type.getKind() == orc::TypeKind::CHAR) { + isChar = true; + } } OmniStringDictionaryColumnReader::~OmniStringDictionaryColumnReader() { @@ -1163,19 +1082,19 @@ namespace omniruntime::reader { } uint64_t OmniStringDictionaryColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); rle->skip(numValues); return numValues; } void OmniStringDictionaryColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); rle->seek(positions.at(columnId)); } - OmniBooleanColumnReader::OmniBooleanColumnReader(const orc::Type& type, - orc::StripeStreams& stripe): ColumnReader(type, stripe){ + OmniBooleanColumnReader::OmniBooleanColumnReader(const orc::Type& type, orc::StripeStreams& stripe) + : OmniColumnReader(type, stripe){ std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_DATA, true); if (stream == nullptr) @@ -1188,7 +1107,7 @@ namespace omniruntime::reader { } uint64_t OmniBooleanColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); rle->skip(numValues); return numValues; } @@ -1196,14 +1115,13 @@ namespace omniruntime::reader { void OmniBooleanColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); rle->seek(positions.at(columnId)); } - OmniByteColumnReader::OmniByteColumnReader(const Type& type, - StripeStreams& stripe - ): ColumnReader(type, stripe){ + OmniByteColumnReader::OmniByteColumnReader(const Type& type, StripeStreams& stripe) + : OmniColumnReader(type, stripe){ std::unique_ptr stream = stripe.getStream(columnId, orc::proto::Stream_Kind_DATA, true); if (stream == nullptr) @@ -1216,21 +1134,21 @@ namespace omniruntime::reader { } uint64_t OmniByteColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); rle->skip(numValues); return numValues; } void OmniByteColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); rle->seek(positions.at(columnId)); } OmniDoubleColumnReader::OmniDoubleColumnReader(const Type& type, StripeStreams& stripe - ): ColumnReader(type, stripe), + ): OmniColumnReader(type, stripe), columnKind(type.getKind()), bytesPerValue((type.getKind() == orc::FLOAT) ? 4 : 8), @@ -1246,7 +1164,7 @@ namespace omniruntime::reader { } uint64_t OmniDoubleColumnReader::skip(uint64_t numValues) { - numValues = ColumnReader::skip(numValues); + numValues = OmniColumnReader::skip(numValues); if (static_cast(bufferEnd - bufferPointer) >= bytesPerValue * numValues) { @@ -1269,7 +1187,7 @@ namespace omniruntime::reader { void OmniDoubleColumnReader::seekToRowGroup( std::unordered_map& positions) { - ColumnReader::seekToRowGroup(positions); + OmniColumnReader::seekToRowGroup(positions); inputStream->seek(positions.at(columnId)); // clear buffer state after seek bufferEnd = nullptr; diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh index f3dbdeb6822dc2436cc8ed5f0023e8c883d2f9e3..42cc23b57d43332f2371a8de885ecc9011045b61 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.hh @@ -29,50 +29,80 @@ #include "common/JulianGregorianRebase.h" namespace omniruntime::reader { - class OmniStructColumnReader: public orc::ColumnReader { + + class OmniColumnReader: public orc::ColumnReader { + public: + OmniColumnReader(const orc::Type& type, orc::StripeStreams& stripe); + + virtual ~OmniColumnReader() {} + + /** + * Skip number of specified rows. + */ + virtual uint64_t skip(uint64_t numValues); + + /** + * Read OmniVector in OmniTypeId, which contains specified rows. + */ + virtual void next( + omniruntime::vec::BaseVector *omniVector, + uint64_t numValues, + bool *incomingNulls, + int omniTypeId) { + throw std::runtime_error("next() in base class should not be called"); + } + + /** + * Seek to beginning of a row group in the current stripe + * @param positions a list of PositionProviders storing the positions + */ + virtual void seekToRowGroup(std::unordered_map &positions); + + std::unique_ptr notNullDecoder; + }; + + class OmniStructColumnReader: public OmniColumnReader { private: - std::vector> children; + std::vector> children; public: - OmniStructColumnReader(const orc::Type& type, orc::StripeStreams& stipe, common::JulianGregorianRebase *julianPtr); + OmniStructColumnReader(const orc::Type& type, orc::StripeStreams& stipe, + common::JulianGregorianRebase *julianPtr); uint64_t skip(uint64_t numValues) override; /** * direct read VectorBatch in next * @param omniVecBatch the VectorBatch to push - * @param numValues the VectorBatch to push - * @param notNull the VectorBatch to push - * @param baseTp the vectorBatch to push + * @param numValues the numValues of VectorBatch + * @param notNull the notNull array indicates value not null + * @param baseTp the orc type * @param omniTypeId the omniTypeId to push */ void next(void *&omniVecBatch, uint64_t numValues, char *notNull, const orc::Type& baseTp, int* omniTypeId) override; - void nextEncoded(orc::ColumnVectorBatch& rowBatch, - uint64_t numValues, - char *notNull) override; - void seekToRowGroup( std::unordered_map& positions) override; private: /** - * direct read VectorBatch in next + * direct read VectorBatch in next for omni * @param omniVecBatch the VectorBatch to push - * @param numValues the VectorBatch to push - * @param notNull the VectorBatch to push - * @param baseTp the vectorBatch to push + * @param numValues the numValues of VectorBatch + * @param notNull the notNull array indicates value not null + * @param baseTp the orc type * @param omniTypeId the omniTypeId to push */ template - void nextInternal(std::vector &vecs, uint64_t numValues, char *notNull, + void nextInternal(std::vector &vecs, uint64_t numValues, bool *incomingNulls, const orc::Type& baseTp, int* omniTypeId); - omniruntime::type::DataTypeId getOmniTypeByOrcType(const orc::Type* type); + // Get default omni type from orc type. + omniruntime::type::DataTypeId getDefaultOmniType(const orc::Type *type); }; - class OmniBooleanColumnReader: public orc::ColumnReader { + class OmniBooleanColumnReader: public OmniColumnReader { protected: std::unique_ptr rle; @@ -82,13 +112,12 @@ namespace omniruntime::reader { uint64_t skip(uint64_t numValues) override; - void next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; void seekToRowGroup(std::unordered_map& positions) override; }; - class OmniByteColumnReader: public orc::ColumnReader { + class OmniByteColumnReader: public OmniColumnReader { protected: std::unique_ptr rle; @@ -98,14 +127,12 @@ namespace omniruntime::reader { uint64_t skip(uint64_t numValues) override; - void next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; - void seekToRowGroup( - std::unordered_map& positions) override; + void seekToRowGroup(std::unordered_map& positions) override; }; - class OmniIntegerColumnReader: public orc::ColumnReader { + class OmniIntegerColumnReader: public OmniColumnReader { protected: std::unique_ptr rle; @@ -115,20 +142,17 @@ namespace omniruntime::reader { uint64_t skip(uint64_t numValues) override; - void next(void*& vec, uint64_t numValues, char *notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; - void seekToRowGroup( - std::unordered_map& positions) override; + void seekToRowGroup(std::unordered_map& positions) override; }; - class OmniTimestampColumnReader: public orc::ColumnReader { + class OmniTimestampColumnReader: public OmniColumnReader { public: - void next(void*& vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; - template - void nextByType(void*& omnivec, uint64_t numValues, char* notNull, const orc::Type& baseTp, int* omniTypeId); + template + void nextByType(T *data, uint64_t numValues, bool *nulls); private: std::unique_ptr secondsRle; @@ -147,23 +171,20 @@ namespace omniruntime::reader { uint64_t skip(uint64_t numValues) override; - void seekToRowGroup( - std::unordered_map& positions) override; + void seekToRowGroup(std::unordered_map& positions) override; }; - class OmniDoubleColumnReader: public orc::ColumnReader { + class OmniDoubleColumnReader: public OmniColumnReader { public: OmniDoubleColumnReader(const orc::Type& type, orc::StripeStreams& stripe); ~OmniDoubleColumnReader() override; uint64_t skip(uint64_t numValues) override; - void next(void*& vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; - template - void nextByType(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type& baseTp); + template + void nextByType(T *data, uint64_t numValues, bool *nulls); void seekToRowGroup( std::unordered_map& positions) override; @@ -207,14 +228,14 @@ namespace omniruntime::reader { }; - class OmniStringDictionaryColumnReader: public orc::ColumnReader { + class OmniStringDictionaryColumnReader: public OmniColumnReader { public: - void next(void*& vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; private: std::shared_ptr dictionary; std::unique_ptr rle; + bool isChar = false; public: OmniStringDictionaryColumnReader(const orc::Type& type, orc::StripeStreams& stipe); @@ -222,30 +243,28 @@ namespace omniruntime::reader { uint64_t skip(uint64_t numValues) override; - void seekToRowGroup( - std::unordered_map& positions) override; + void seekToRowGroup(std::unordered_map& positions) override; }; - class OmniStringDirectColumnReader: public orc::ColumnReader { + class OmniStringDirectColumnReader: public OmniColumnReader { public: - void next(void*& vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; private: std::unique_ptr lengthRle; std::unique_ptr blobStream; const char *lastBuffer; size_t lastBufferLength; + bool isChar = false; /** * Compute the total length of the values. * @param lengths the array of lengths - * @param notNull the array of notNull flags + * @param nulls the array of nulls flags * @param numValues the lengths of the arrays * @return the total number of bytes for the non-null values */ - size_t computeSize(const int64_t *lengths, const char *notNull, - uint64_t numValues); + size_t computeSize(const int64_t *lengths, bool *nulls, uint64_t numValues); public: OmniStringDirectColumnReader(const orc::Type& type, orc::StripeStreams& stipe); @@ -257,10 +276,9 @@ namespace omniruntime::reader { std::unordered_map& positions) override; }; - class OmniDecimal64ColumnReader: public orc::ColumnReader { + class OmniDecimal64ColumnReader: public OmniColumnReader { public: - void next(void*&vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; public: static const uint32_t MAX_PRECISION_64 = 18; @@ -327,8 +345,7 @@ namespace omniruntime::reader { class OmniDecimal128ColumnReader : public OmniDecimal64ColumnReader { public: - void next(void*&vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; public: OmniDecimal128ColumnReader(const orc::Type& type, orc::StripeStreams& stipe); @@ -349,8 +366,7 @@ namespace omniruntime::reader { OmniDecimalHive11ColumnReader(const orc::Type& type, orc::StripeStreams& stipe); ~OmniDecimalHive11ColumnReader() override; - void next(void*&vec, uint64_t numValues, char* notNull, - const orc::Type& baseTp, int* omniTypeId) override; + void next(omniruntime::vec::BaseVector *vec, uint64_t numValues, bool *incomingNulls, int omniTypeId) override; }; std::unique_ptr omniBuildReader(const orc::Type& type, diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc index a5c515d3cb11759f7950d4e143c25b70627abe0b..5419409d6aa691acc51b14bc9073d0aad1715583 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc @@ -22,23 +22,23 @@ using omniruntime::vec::VectorBatch; namespace omniruntime::reader { - std::unique_ptr makeFixLenthVector(uint64_t numValues, + std::unique_ptr makeFixedLengthVector(uint64_t numValues, omniruntime::type::DataTypeId dataTypeId) { switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return std::make_unique>(numValues); - case omniruntime::type::OMNI_SHORT: - return std::make_unique>(numValues); - case omniruntime::type::OMNI_INT: - return std::make_unique>(numValues); - case omniruntime::type::OMNI_LONG: - return std::make_unique>(numValues); - case omniruntime::type::OMNI_DATE32: - return std::make_unique>(numValues); - case omniruntime::type::OMNI_DATE64: - return std::make_unique>(numValues); - default: - throw std::runtime_error("Not support for this type: " + dataTypeId); + case omniruntime::type::OMNI_BOOLEAN: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_SHORT: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_INT: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_LONG: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_DATE32: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_DATE64: + return std::make_unique>(numValues); + default: + throw std::runtime_error("MakeFixedLengthVector Not support for this type: " + dataTypeId); } } @@ -48,7 +48,31 @@ namespace omniruntime::reader { case omniruntime::type::OMNI_DOUBLE: return std::make_unique>(numValues); default: - throw std::runtime_error("Not support double vector for this type: " + dataTypeId); + throw std::runtime_error("MakeDoubleVector Not support double vector for this type: " + dataTypeId); + } + } + + std::unique_ptr makeDecimalVector(uint64_t numValues, + omniruntime::type::DataTypeId dataTypeId) { + switch (dataTypeId) { + case omniruntime::type::OMNI_DECIMAL64: + return std::make_unique>(numValues); + case omniruntime::type::OMNI_DECIMAL128: + return std::make_unique>(numValues); + default: + throw std::runtime_error("makeDecimalVector Not support vector for this type: " + dataTypeId); + } + } + + std::unique_ptr makeVarcharVector(uint64_t numValues, + omniruntime::type::DataTypeId dataTypeId) { + switch (dataTypeId) { + case omniruntime::type::OMNI_CHAR: + case omniruntime::type::OMNI_VARCHAR: + return std::make_unique>>(numValues); + default: + throw std::runtime_error("MakeVarcharVector Not support vector for this type: " + dataTypeId); } } @@ -62,36 +86,74 @@ namespace omniruntime::reader { case orc::TypeKind::TIMESTAMP: case orc::TypeKind::TIMESTAMP_INSTANT: case orc::TypeKind::LONG: - return makeFixLenthVector(numValues, dataTypeId); + return makeFixedLengthVector(numValues, dataTypeId); case orc::TypeKind::DOUBLE: return makeDoubleVector(numValues, dataTypeId); case orc::TypeKind::CHAR: - throw std::runtime_error("CHAR not finished!!!"); case orc::TypeKind::STRING: case orc::TypeKind::VARCHAR: - throw std::runtime_error("VARCHAR not finished!!!"); + return makeVarcharVector(numValues, dataTypeId); case orc::TypeKind::DECIMAL: - throw std::runtime_error("DECIMAL should not in here!!!"); + return makeDecimalVector(numValues, dataTypeId); default: { - throw std::runtime_error("Not support For This Type: " + baseTp->getKind()); + throw std::runtime_error("Not support For This ORC Type: " + baseTp->getKind()); } } } - void OmniRleDecoderV2::next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId) { - uint64_t nRead = 0; + void OmniRleDecoderV2::next(int64_t *data, uint64_t numValues, bool *nulls) { + next(data, numValues, nulls); + } + + void OmniRleDecoderV2::next(int32_t *data, uint64_t numValues, bool *nulls) { + next(data, numValues, nulls); + } - auto dataTypeId = static_cast(omniTypeId); - std::unique_ptr tempOmnivec = makeNewVector(numValues, baseTp, dataTypeId); - auto pushOmniVec = tempOmnivec.get(); + void OmniRleDecoderV2::next(int16_t *data, uint64_t numValues, bool *nulls) { + next(data, numValues, nulls); + } + + void OmniRleDecoderV2::next(bool *data, uint64_t numValues, bool *nulls) { + next(data, numValues, nulls); + } + + void OmniRleDecoderV2::next(omniruntime::vec::BaseVector *omnivec, uint64_t numValues, bool *nulls, int omniTypeId) { + switch (omniTypeId) { + case omniruntime::type::OMNI_BOOLEAN: { + auto boolValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(omnivec)); + return next(boolValues, numValues, nulls); + } + case omniruntime::type::OMNI_SHORT: { + auto shortValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(omnivec)); + return next(shortValues, numValues, nulls); + } + case omniruntime::type::OMNI_INT: + case omniruntime::type::OMNI_DATE32: { + auto intValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(omnivec)); + return next(intValues, numValues, nulls); + } + case omniruntime::type::OMNI_LONG: + case omniruntime::type::OMNI_DATE64: { + auto longValues = omniruntime::vec::unsafe::UnsafeVector::GetRawValues( + static_cast*>(omnivec)); + return next(longValues, numValues, nulls); + } + default: + throw std::runtime_error("OmniRleDecoderV2 Not support For This Type: " + omniTypeId); + } + } + + template + void OmniRleDecoderV2::next(T *data, uint64_t numValues, bool *nulls) { + uint64_t nRead = 0; while (nRead < numValues) { // SKip any nulls before attempting to read first byte. - while (notNull && !notNull[nRead]) { - tempOmnivec->SetNull(nRead); + while (nulls && nulls[nRead]) { if (++nRead == numValues) { - omnivec = tempOmnivec.release(); return; //ended with null values } } @@ -104,30 +166,27 @@ namespace omniruntime::reader { uint64_t offset = nRead, length = numValues - nRead; orc::EncodingType enc = static_cast((firstByte >> 6) & 0x03); - switch (static_cast(enc)) { + switch (static_cast(enc)) { case orc::SHORT_REPEAT: - nRead += nextShortRepeatsByType(pushOmniVec, offset, length, notNull, dataTypeId); + nRead += nextShortRepeats(data, offset, length, nulls); break; case orc::DIRECT: - nRead += nextDirect(pushOmniVec, offset, length, notNull, dataTypeId); + nRead += nextDirect(data, offset, length, nulls); break; case orc::PATCHED_BASE: - nRead += nextPatchedByType(pushOmniVec, offset, length, notNull, dataTypeId); + nRead += nextPatched(data, offset, length, nulls); break; case orc::DELTA: - nRead += nextDeltaByType(pushOmniVec, offset, length, notNull, dataTypeId); + nRead += nextDelta(data, offset, length, nulls); break; default: throw orc::ParseError("unknown encoding"); } } - - omnivec = tempOmnivec.release(); } - uint64_t OmniRleDecoderV2::nextDirect(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { + template + uint64_t OmniRleDecoderV2::nextDirect(T *data, uint64_t offset, uint64_t numValues, bool *nulls) { if (runRead == runLength) { // extract the number of fixed bits unsigned char fbo = (firstByte >> 1) & 0x1f; @@ -140,7 +199,7 @@ namespace omniruntime::reader { runLength += 1; runRead = 0; - readLongsByType(OmniVec, 0, runLength, offset , numValues, bitSize, dataTypeId, notNull); + readLongs(literals.data(), 0, runLength, bitSize); if (isSigned) { for (uint64_t i = 0; i < runLength; ++i) { @@ -149,56 +208,11 @@ namespace omniruntime::reader { } } - return copyDataFromBufferByType(OmniVec, offset, numValues, notNull, dataTypeId); + return copyDataFromBuffer(data, offset, numValues, nulls); } - uint64_t OmniRleDecoderV2::nextShortRepeatsByType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return nextShortRepeats - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_SHORT: - return nextShortRepeats - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_INT: - return nextShortRepeats - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_LONG: - return nextShortRepeatsLongType - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_DATE32: - return nextShortRepeats - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_DATE64: - return nextShortRepeatsLongType - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_DOUBLE: - return nextShortRepeats - (OmniVec, offset, numValues, notNull); - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("nextShortRepeats_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("nextShortRepeats_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("nextShortRepeats_type DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("nextShortRepeats_type DECIMAL128 should not in here!!!"); - default: - printf("nextShortRepeats_type switch no process!!!"); - } - - return 0; - } - - template - uint64_t OmniRleDecoderV2::nextShortRepeats(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(OmniVec); - + template + uint64_t OmniRleDecoderV2::nextShortRepeats(T *data, uint64_t offset, uint64_t numValues, bool *nulls) { if (runRead == runLength) { // extract the number of fixed bytes uint64_t byteSize = (firstByte >> 3) & 0x07; @@ -219,18 +233,16 @@ namespace omniruntime::reader { uint64_t nRead = std::min(runLength - runRead, numValues); - if (notNull) { + if (nulls) { for(uint64_t pos = offset; pos < offset + nRead; ++pos) { - if (notNull[pos]) { - vec->SetValue(static_cast(pos), static_cast(literals[0])); + if (!nulls[pos]) { + data[pos] = static_cast(literals[0]); ++runRead; - } else { - vec->SetNull(static_cast(pos)); } } } else { for(uint64_t pos = offset; pos < offset + nRead; ++pos) { - vec->SetValue(static_cast(pos), static_cast(literals[0])); + data[pos] = static_cast(literals[0]); ++runRead; } } @@ -238,97 +250,8 @@ namespace omniruntime::reader { return nRead; } - template - uint64_t OmniRleDecoderV2::nextShortRepeatsLongType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(OmniVec); - - if (runRead == runLength) { - // extract the number of fixed bytes - uint64_t byteSize = (firstByte >> 3) & 0x07; - byteSize += 1; - - runLength = firstByte & 0x07; - // run lengths values are stored only after MIN_REPEAT value is met - runLength += MIN_REPEAT; - runRead = 0; - - // read the repeated value which is store using fixed bytes - literals[0] = readLongBE(byteSize); - - if (isSigned) { - literals[0] = orc::unZigZag(static_cast(literals[0])); - } - } - - uint64_t nRead = std::min(runLength - runRead, numValues); - - if (notNull) { - for(uint64_t pos = offset; pos < offset + nRead; ++pos) { - if (notNull[pos]) { - vec->SetValue(pos, static_cast(literals[0])); - ++runRead; - } else { - vec->SetNull(pos); - } - } - } else { - int64_t values[nRead]; - std::fill(values, values + nRead, literals[0]); - vec->SetValues(offset, values, nRead); - runRead += nRead; - } - - return nRead; - } - - - uint64_t OmniRleDecoderV2::nextPatchedByType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_SHORT: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_INT: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_LONG: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DATE32: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DATE64: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DOUBLE: - return nextPatched - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("nextPatched_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("nextPatched_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("nextPatched_type DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("nextPatched_type DECIMAL128 should not in here!!!"); - default: - printf("nextPatched_type switch no process!!!"); - } - - return 0; - } - - template - uint64_t OmniRleDecoderV2::nextPatched(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { + template + uint64_t OmniRleDecoderV2::nextPatched(T *data, uint64_t offset, uint64_t numValues, bool *nulls) { if (runRead == runLength) { // extract the number of fixed bits unsigned char fbo = (firstByte >> 1) & 0x1f; @@ -372,7 +295,7 @@ namespace omniruntime::reader { base = -base; } - orc::RleDecoderV2::readLongs(literals.data(), 0, runLength, bitSize); + readLongs(literals.data(), 0, runLength, bitSize); // any remaining bits are thrown out resetReadLongs(); @@ -385,7 +308,7 @@ namespace omniruntime::reader { "(patchBitSize + pgw > 64)!"); } uint32_t cfb = orc::getClosestFixedBits(patchBitSize + pgw); - orc::RleDecoderV2::readLongs(unpackedPatch.data(), 0, pl, cfb); + readLongs(unpackedPatch.data(), 0, pl, cfb); // any remaining bits are thrown out resetReadLongs(); @@ -422,53 +345,11 @@ namespace omniruntime::reader { } } - return copyDataFromBufferByType(OmniVec, offset, numValues, notNull, dataTypeId); - } - - uint64_t OmniRleDecoderV2::nextDeltaByType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_SHORT: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_INT: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_LONG: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DATE32: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DATE64: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_DOUBLE: - return nextDelta - (OmniVec, offset, numValues, notNull, dataTypeId); - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("nextShortRepeats_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("nextShortRepeats_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("nextShortRepeats_type DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("nextShortRepeats_type DECIMAL128 should not in here!!!"); - default: - printf("nextShortRepeats_type switch no process!!!"); - } - - return 0; + return copyDataFromBuffer(data, offset, numValues, nulls); } - template - uint64_t OmniRleDecoderV2::nextDelta(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId) { + template + uint64_t OmniRleDecoderV2::nextDelta(T *data, uint64_t offset, uint64_t numValues, bool *nulls) { if (runRead == runLength) { // extract the number of fixed bits unsigned char fbo = (firstByte >> 1) &0x1f; @@ -515,7 +396,7 @@ namespace omniruntime::reader { // value to result buffer. if the delta base value is negative then it // is a decreasing sequence else an increasing sequence. // read deltas using the literals buffer. - orc::RleDecoderV2::readLongs(literals.data(), 2, runLength - 2, bitSize); + readLongs(literals.data(), 2, runLength - 2, bitSize); if (deltaBase < 0) { for (uint64_t i = 2; i < runLength; ++i) { @@ -529,82 +410,36 @@ namespace omniruntime::reader { } } - return copyDataFromBufferByType(OmniVec, offset, numValues, notNull, dataTypeId); + return copyDataFromBuffer(data, offset, numValues, nulls); } - void OmniRleDecoderV2::readLongsByType(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t numValues, uint64_t fbs, - omniruntime::type::DataTypeId dataTypeId, const char* const notNull) { - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_SHORT: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_INT: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_LONG: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_DATE32: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_DATE64: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_DOUBLE: - return readLongs - (OmniVec, literals.data(), offset, len, omniOffset, numValues, fbs, notNull); - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("copyDataFromBuffer_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("copyDataFromBuffer_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("copyDataFromBuffer_type DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("copyDataFromBuffer_type DECIMAL128 should not in here!!!"); - default: - printf("copyDataFromBuffer switch no process!!!"); - } - - return; - } - - template - void OmniRleDecoderV2::readLongs(omniruntime::vec::BaseVector*& OmniVec, int64_t *data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t numValues, uint64_t fbs, - const char* const notNull) { + void OmniRleDecoderV2::readLongs(int64_t *data, uint64_t offset, uint64_t len, uint64_t fbs) { switch (fbs) { case 4: - return unrolledUnpack4(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack4(data, offset, len); case 8: - return unrolledUnpack8(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack8(data, offset, len); case 16: - return unrolledUnpack16(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack16(data, offset, len); case 24: - return unrolledUnpack24(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack24(data, offset, len); case 32: - return unrolledUnpack32(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack32(data, offset, len); case 40: - return unrolledUnpack40(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack40(data, offset, len); case 48: - return unrolledUnpack48(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack48(data, offset, len); case 56: - return unrolledUnpack56(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack56(data, offset, len); case 64: - return unrolledUnpack64(OmniVec, data, offset, len, omniOffset, numValues, notNull); + return unrolledUnpack64(data, offset, len); default: // Fallback to the default implementation for deprecated bit size. - return plainUnpackLongs(OmniVec, data, offset, len, omniOffset, numValues, notNull, fbs); + return plainUnpackLongs(data, offset, len, fbs); } } - template - void OmniRleDecoderV2::plainUnpackLongs(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull, uint64_t fbs) { + void OmniRleDecoderV2::plainUnpackLongs(int64_t *data, uint64_t offset, uint64_t len, uint64_t fbs) { for (uint64_t i = offset; i < (offset + len); i++) { uint64_t result = 0; uint64_t bitsLeftToRead = fbs; @@ -629,10 +464,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack64(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack64(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -671,10 +503,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack56(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack56(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -711,10 +540,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack48(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack48(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -749,10 +575,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack40(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack40(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -785,10 +608,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack32(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack32(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -820,10 +640,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack24(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack24(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -853,10 +670,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack16(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack16(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -884,10 +698,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack8(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack8(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Exhaust the buffer @@ -907,10 +718,7 @@ namespace omniruntime::reader { return; } - template - void OmniRleDecoderV2::unrolledUnpack4(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull) { + void OmniRleDecoderV2::unrolledUnpack4(int64_t *data, uint64_t offset, uint64_t len) { uint64_t curIdx = offset; while (curIdx < offset + len) { // Make sure bitsLeft is 0 before the loop. bitsLeft can only be 0, 4, or 8. @@ -943,82 +751,20 @@ namespace omniruntime::reader { return; } - uint64_t OmniRleDecoderV2::copyDataFromBufferByType(omniruntime::vec::BaseVector*& tempOmnivec, uint64_t offset, - uint64_t numValues, const char* notNull, - omniruntime::type::DataTypeId dataTypeId) { - switch (dataTypeId) { - case omniruntime::type::OMNI_BOOLEAN: - return copyDataFromBuffer(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_SHORT: - return copyDataFromBuffer(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_INT: - return copyDataFromBuffer(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_LONG: - return copyDataFromBufferTo64bit(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_DATE32: - return copyDataFromBuffer(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_DATE64: - return copyDataFromBufferTo64bit(tempOmnivec, offset, numValues, - notNull); - case omniruntime::type::OMNI_DOUBLE: - return copyDataFromBuffer(tempOmnivec, offset, numValues, notNull); - case omniruntime::type::OMNI_CHAR: - throw std::runtime_error("copyDataFromBuffer_type CHAR not finished!!!"); - case omniruntime::type::OMNI_VARCHAR: - throw std::runtime_error("copyDataFromBuffer_type VARCHAR not finished!!!"); - case omniruntime::type::OMNI_DECIMAL64: - throw std::runtime_error("copyDataFromBuffer_type DECIMAL64 should not in here!!!"); - case omniruntime::type::OMNI_DECIMAL128: - throw std::runtime_error("copyDataFromBuffer_type DECIMAL128 should not in here!!!"); - default: - printf("copyDataFromBuffer switch no process!!!"); - } - - return 0; - } - - template - uint64_t OmniRleDecoderV2::copyDataFromBuffer(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t numValues, const char* notNull) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(OmniVec); + template + uint64_t OmniRleDecoderV2::copyDataFromBuffer(T *data, uint64_t offset, uint64_t numValues, bool *nulls) { uint64_t nRead = std::min(runLength - runRead, numValues); - if (notNull) { + if (nulls) { for (uint64_t i = offset; i < (offset + nRead); ++i) { - if (notNull[i]) { - vec->SetValue(static_cast(i), static_cast(literals[runRead++])); - } else { - vec->SetNull(static_cast(i)); + if (!nulls[i]) { + data[i] = static_cast(literals[runRead++]); } } } else { for (uint64_t i = offset; i < (offset + nRead); ++i) { - vec->SetValue(static_cast(i), static_cast(literals[runRead++])); + data[i] = static_cast(literals[runRead++]); } } return nRead; } - - template - uint64_t OmniRleDecoderV2::copyDataFromBufferTo64bit(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t numValues, const char* notNull) { - using namespace omniruntime::type; - using T = typename NativeType::type; - auto vec = reinterpret_cast*>(OmniVec); - uint64_t nRead = std::min(runLength - runRead, numValues); - if (notNull) { - for (uint64_t i = offset; i < (offset + nRead); ++i) { - if (notNull[i]) { - vec->SetValue(static_cast(i), static_cast(literals[runRead++])); - } else { - vec->SetNull(static_cast(i)); - } - } - } else { - vec->SetValues(static_cast(offset), literals.data() + runRead, static_cast(nRead)); - runRead += nRead; - } - return nRead; - } } \ No newline at end of file diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh index d7377958ed45519a046721a65e7a9733ffd57585..0d9a814a4fbb486e003d389a40cf4d923cff9416 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.hh @@ -25,12 +25,18 @@ namespace omniruntime::reader { - std::unique_ptr makeFixLenthVector(uint64_t numValues, + std::unique_ptr makeFixedLengthVector(uint64_t numValues, omniruntime::type::DataTypeId dataTypeId); std::unique_ptr makeDoubleVector(uint64_t numValues, omniruntime::type::DataTypeId dataTypeId); + std::unique_ptr makeVarcharVector(uint64_t numValues, + omniruntime::type::DataTypeId dataTypeId); + + std::unique_ptr makeDecimalVector(uint64_t numValues, + omniruntime::type::DataTypeId dataTypeId); + std::unique_ptr makeNewVector(uint64_t numValues, const orc::Type* baseTp, omniruntime::type::DataTypeId dataTypeId); @@ -43,117 +49,59 @@ namespace omniruntime::reader { * direct read VectorBatch in next * @param omnivec the BaseVector to push * @param numValues the numValues to push - * @param notNull the nullarrays to push + * @param nulls the nullarrays to push * @param baseTp the orcType to push * @param omniTypeId the int* of omniType to push */ - void next(omniruntime::vec::BaseVector*& omnivec, uint64_t numValues, char* notNull, - const orc::Type* baseTp, int omniTypeId); + void next(omniruntime::vec::BaseVector *omnivec, uint64_t numValues, bool *nulls, int omniTypeId); - void next(int64_t* data, uint64_t numValues, const char* notNull) { - orc::RleDecoderV2::next(data, numValues, notNull); - } + void next(int64_t *data, uint64_t numValues, bool *nulls); + + void next(int32_t *data, uint64_t numValues, bool *nulls); + + void next(int16_t *data, uint64_t numValues, bool *nulls); + + void next(bool *data, uint64_t numValues, bool *nulls); + + template + void next(T *data, uint64_t numValues, bool *nulls); + + template + uint64_t nextShortRepeats(T *data, uint64_t offset, uint64_t numValues, bool *nulls); + + template + uint64_t nextDirect(T *data, uint64_t offset, uint64_t numValues, bool *nulls); + + template + uint64_t nextPatched(T *data, uint64_t offset, uint64_t numValues, bool *nulls); + + template + uint64_t nextDelta(T *data, uint64_t offset, uint64_t numValues, bool *nulls); + + template + uint64_t copyDataFromBuffer(T *data, uint64_t offset, uint64_t numValues, bool *nulls); + + void readLongs(int64_t *data, uint64_t offset, uint64_t len, uint64_t fbs); + + void unrolledUnpack4(int64_t *data, uint64_t offset, uint64_t len); + + void unrolledUnpack8(int64_t *data, uint64_t offset, uint64_t len); + + void unrolledUnpack16(int64_t *data, uint64_t offset, uint64_t len); + + void unrolledUnpack24(int64_t *data, uint64_t offset, uint64_t len); + + void unrolledUnpack32(int64_t *data, uint64_t offset, uint64_t len); + + void unrolledUnpack40(int64_t *data, uint64_t offset, uint64_t len); + + void unrolledUnpack48(int64_t *data, uint64_t offset, uint64_t len); + + void unrolledUnpack56(int64_t *data, uint64_t offset, uint64_t len); + + void unrolledUnpack64(int64_t *data, uint64_t offset, uint64_t len); - uint64_t nextDirect(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, uint64_t numValues, - const char* const notNull, omniruntime::type::DataTypeId dataTypeId); - - uint64_t nextShortRepeatsByType(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, uint64_t numValues, - const char* const notNull, omniruntime::type::DataTypeId dataTypeId); - - template - uint64_t nextShortRepeats(omniruntime::vec::BaseVector*& omnivec, uint64_t offset, uint64_t numValues, - const char* notNull); - - template - uint64_t nextShortRepeatsLongType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull); - - - uint64_t nextPatchedByType(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, uint64_t numValues, - const char* const notNull, omniruntime::type::DataTypeId dataTypeId); - - template - uint64_t nextPatched(omniruntime::vec::BaseVector*& omnivec, uint64_t offset, uint64_t numValues, - const char* notNull, omniruntime::type::DataTypeId dataTypeId); - - template - uint64_t nextDelta(omniruntime::vec::BaseVector*& omnivec, uint64_t offset, uint64_t numValues, - const char* notNull, omniruntime::type::DataTypeId dataTypeId); - - uint64_t nextDeltaByType(omniruntime::vec::BaseVector*& OmniVec, - uint64_t offset, uint64_t numValues, const char* const notNull, - omniruntime::type::DataTypeId dataTypeId); - - uint64_t copyDataFromBufferByType(omniruntime::vec::BaseVector*& tempOmnivec, uint64_t offset, - uint64_t numValues, const char* notNull, - omniruntime::type::DataTypeId dataTypeId); - - template - uint64_t copyDataFromBufferTo64bit(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t numValues, const char* notNull); - - template - uint64_t copyDataFromBuffer(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t numValues, const char* notNull); - - void readLongsByType(omniruntime::vec::BaseVector*& OmniVec, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t numValues, uint64_t fbs, - omniruntime::type::DataTypeId dataTypeId, const char* const notNull); - - template - void readLongs(omniruntime::vec::BaseVector*& OmniVec, int64_t *data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t numValues, uint64_t fbs, - const char* const notNull); - - template - void unrolledUnpack4(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); - - template - void unrolledUnpack8(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); - - template - void unrolledUnpack16(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); - - template - void unrolledUnpack24(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); - - template - void unrolledUnpack32(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); - - template - void unrolledUnpack40(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); - - template - void unrolledUnpack48(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); - - template - void unrolledUnpack56(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); - - template - void unrolledUnpack64(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull); - - template - void plainUnpackLongs(omniruntime::vec::BaseVector*& OmniVec, int64_t* data, uint64_t offset, - uint64_t len, uint64_t omniOffset, uint64_t omniNumValues, - const char* const notNull, uint64_t fbs); + void plainUnpackLongs(int64_t *data, uint64_t offset, uint64_t len, uint64_t fbs); }; }