diff --git a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.cpp b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.cpp index d1ef824c4a3032e3305ac5d7b16cc7838f5f8684..6486af2c25db45399b387453c3ba4bac846758bd 100644 --- a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.cpp +++ b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.cpp @@ -166,7 +166,7 @@ bool OckMergeReader::GetMergeVectorBatch(uint8_t *&startAddress, uint32_t remain return true; } -bool OckMergeReader::CopyPartDataToVector(uint8_t *&nulls, uint8_t *&values, uint32_t &remainingSize, +bool OckMergeReader::CopyPartDataToVector(uint8_t *&nulls, uint32_t &nullsOffset, uint8_t *&values, uint32_t &remainingSize, uint32_t &remainingCapacity, OckVectorPtr &srcVector) { uint32_t srcSize = srcVector->GetSize(); @@ -174,12 +174,14 @@ bool OckMergeReader::CopyPartDataToVector(uint8_t *&nulls, uint8_t *&values, uin LOG_ERROR("Not eneough resource. remainingSize %d, srcSize %d.", remainingSize, srcSize); return false; } - errno_t ret = memcpy_s(nulls, remainingSize, srcVector->GetValueNulls(), srcSize); - if (UNLIKELY(ret != EOK)) { - LOG_ERROR("Failed to copy null vector"); - return false; + + uint8_t *srcNulls = reinterpret_cast(srcVector->GetValueNulls()); + for (uint32_t i = 0; i < srcSize; i++) { + if (srcNulls[i]) { + omniruntime::BitUtil::SetBit(nulls, nullsOffset + i); + } } - nulls += srcSize; + nullsOffset += srcSize; remainingSize -= srcSize; uint32_t srcCapacity = srcVector->GetCapacityInBytes(); @@ -188,7 +190,7 @@ bool OckMergeReader::CopyPartDataToVector(uint8_t *&nulls, uint8_t *&values, uin return false; } if (srcCapacity > 0) { - ret = memcpy_s(values, remainingCapacity, srcVector->GetValues(), srcCapacity); + errno_t ret = memcpy_s(values, remainingCapacity, srcVector->GetValues(), srcCapacity); if (UNLIKELY(ret != EOK)) { LOG_ERROR("Failed to copy values vector"); return false; @@ -224,13 +226,14 @@ bool OckMergeReader::CopyDataToVector(BaseVector *dstVector, uint32_t colIndex) remainingCapacity = GetDataSize(colIndex) * remainingSize; } + uint32_t nullsOffset = 0; for (uint32_t cnt = 0; cnt < mMergeCnt; ++cnt) { if (UNLIKELY(srcVector == nullptr)) { LOG_ERROR("Invalid src vector"); return false; } - if (UNLIKELY(!CopyPartDataToVector(nullsAddress, valuesAddress, remainingSize, remainingCapacity, srcVector))) { + if (UNLIKELY(!CopyPartDataToVector(nullsAddress, nullsOffset, valuesAddress, remainingSize, remainingCapacity, srcVector))) { return false; } diff --git a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.h b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.h index 838dd6a8d6e78b3557764869f1240c47b48aa398..385d3711ebc77407c549d115ed5606754bad4e26 100644 --- a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.h +++ b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.h @@ -16,7 +16,7 @@ public: bool Initialize(const int32_t *typeIds, uint32_t colNum); bool GetMergeVectorBatch(uint8_t *&address, uint32_t remain, uint32_t maxRowNum, uint32_t maxSize); - bool CopyPartDataToVector(uint8_t *&nulls, uint8_t *&values, uint32_t &remainingSize, uint32_t &remainingCapacity, + bool CopyPartDataToVector(uint8_t *&nulls, uint32_t &nullsOffset, uint8_t *&values, uint32_t &remainingSize, uint32_t &remainingCapacity, OckVectorPtr &srcVector); bool CopyDataToVector(omniruntime::vec::BaseVector *dstVector, uint32_t colIndex); diff --git a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/pom.xml b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/pom.xml index 7457d49fed92518e45adaaeff9629a4227a2c9e4..92fbd8d5cad772a6ecc72988a2067d3bed5e2476 100644 --- a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/pom.xml +++ b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/pom.xml @@ -6,7 +6,7 @@ com.huawei.ock omniop-spark-extension-ock - 25.0.0 + 24.0.0 cpp/ @@ -18,7 +18,7 @@ com.huawei.kunpeng - boostkit-omniop-spark + spark-extension-core 3.3.1-1.9.0 compile @@ -27,7 +27,7 @@ ock-omniop-shuffle-manager jar Huawei Open Computing Kit for Spark, shuffle manager - 25.0.0 + 24.0.0 diff --git a/omnioperator/omniop-spark-extension-ock/pom.xml b/omnioperator/omniop-spark-extension-ock/pom.xml index ed815067759e68581e413c16d8962061adb2d193..6ea9000a4a2dd1b4b9c9562a48c8f5ec9a6f04b6 100644 --- a/omnioperator/omniop-spark-extension-ock/pom.xml +++ b/omnioperator/omniop-spark-extension-ock/pom.xml @@ -8,7 +8,7 @@ omniop-spark-extension-ock pom Huawei Open Computing Kit for Spark - 25.0.0 + 24.0.0 3.3.1 @@ -20,7 +20,7 @@ spark-3.3 3.2.0 3.1.1 - 25.0.0 + 24.0.0 @@ -67,7 +67,7 @@ com.huawei.kunpeng - boostkit-omniop-spark + spark-extension-core 3.3.1-1.9.0