From b488d43f568bebe5f6bb1081571d3967694f6a52 Mon Sep 17 00:00:00 2001 From: lilongbin Date: Tue, 27 May 2025 11:17:12 +0800 Subject: [PATCH] ock adapter null flag --- .../cpp/src/shuffle/ock_merge_reader.cpp | 19 +++++++++++-------- .../cpp/src/shuffle/ock_merge_reader.h | 2 +- .../ock-omniop-shuffle/pom.xml | 6 +++--- .../omniop-spark-extension-ock/pom.xml | 6 +++--- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.cpp b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.cpp index d1ef824c4..6486af2c2 100644 --- a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.cpp +++ b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.cpp @@ -166,7 +166,7 @@ bool OckMergeReader::GetMergeVectorBatch(uint8_t *&startAddress, uint32_t remain return true; } -bool OckMergeReader::CopyPartDataToVector(uint8_t *&nulls, uint8_t *&values, uint32_t &remainingSize, +bool OckMergeReader::CopyPartDataToVector(uint8_t *&nulls, uint32_t &nullsOffset, uint8_t *&values, uint32_t &remainingSize, uint32_t &remainingCapacity, OckVectorPtr &srcVector) { uint32_t srcSize = srcVector->GetSize(); @@ -174,12 +174,14 @@ bool OckMergeReader::CopyPartDataToVector(uint8_t *&nulls, uint8_t *&values, uin LOG_ERROR("Not eneough resource. remainingSize %d, srcSize %d.", remainingSize, srcSize); return false; } - errno_t ret = memcpy_s(nulls, remainingSize, srcVector->GetValueNulls(), srcSize); - if (UNLIKELY(ret != EOK)) { - LOG_ERROR("Failed to copy null vector"); - return false; + + uint8_t *srcNulls = reinterpret_cast(srcVector->GetValueNulls()); + for (uint32_t i = 0; i < srcSize; i++) { + if (srcNulls[i]) { + omniruntime::BitUtil::SetBit(nulls, nullsOffset + i); + } } - nulls += srcSize; + nullsOffset += srcSize; remainingSize -= srcSize; uint32_t srcCapacity = srcVector->GetCapacityInBytes(); @@ -188,7 +190,7 @@ bool OckMergeReader::CopyPartDataToVector(uint8_t *&nulls, uint8_t *&values, uin return false; } if (srcCapacity > 0) { - ret = memcpy_s(values, remainingCapacity, srcVector->GetValues(), srcCapacity); + errno_t ret = memcpy_s(values, remainingCapacity, srcVector->GetValues(), srcCapacity); if (UNLIKELY(ret != EOK)) { LOG_ERROR("Failed to copy values vector"); return false; @@ -224,13 +226,14 @@ bool OckMergeReader::CopyDataToVector(BaseVector *dstVector, uint32_t colIndex) remainingCapacity = GetDataSize(colIndex) * remainingSize; } + uint32_t nullsOffset = 0; for (uint32_t cnt = 0; cnt < mMergeCnt; ++cnt) { if (UNLIKELY(srcVector == nullptr)) { LOG_ERROR("Invalid src vector"); return false; } - if (UNLIKELY(!CopyPartDataToVector(nullsAddress, valuesAddress, remainingSize, remainingCapacity, srcVector))) { + if (UNLIKELY(!CopyPartDataToVector(nullsAddress, nullsOffset, valuesAddress, remainingSize, remainingCapacity, srcVector))) { return false; } diff --git a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.h b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.h index 838dd6a8d..385d3711e 100644 --- a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.h +++ b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/cpp/src/shuffle/ock_merge_reader.h @@ -16,7 +16,7 @@ public: bool Initialize(const int32_t *typeIds, uint32_t colNum); bool GetMergeVectorBatch(uint8_t *&address, uint32_t remain, uint32_t maxRowNum, uint32_t maxSize); - bool CopyPartDataToVector(uint8_t *&nulls, uint8_t *&values, uint32_t &remainingSize, uint32_t &remainingCapacity, + bool CopyPartDataToVector(uint8_t *&nulls, uint32_t &nullsOffset, uint8_t *&values, uint32_t &remainingSize, uint32_t &remainingCapacity, OckVectorPtr &srcVector); bool CopyDataToVector(omniruntime::vec::BaseVector *dstVector, uint32_t colIndex); diff --git a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/pom.xml b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/pom.xml index 7457d49fe..92fbd8d5c 100644 --- a/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/pom.xml +++ b/omnioperator/omniop-spark-extension-ock/ock-omniop-shuffle/pom.xml @@ -6,7 +6,7 @@ com.huawei.ock omniop-spark-extension-ock - 25.0.0 + 24.0.0 cpp/ @@ -18,7 +18,7 @@ com.huawei.kunpeng - boostkit-omniop-spark + spark-extension-core 3.3.1-1.9.0 compile @@ -27,7 +27,7 @@ ock-omniop-shuffle-manager jar Huawei Open Computing Kit for Spark, shuffle manager - 25.0.0 + 24.0.0 diff --git a/omnioperator/omniop-spark-extension-ock/pom.xml b/omnioperator/omniop-spark-extension-ock/pom.xml index ed8150677..6ea9000a4 100644 --- a/omnioperator/omniop-spark-extension-ock/pom.xml +++ b/omnioperator/omniop-spark-extension-ock/pom.xml @@ -8,7 +8,7 @@ omniop-spark-extension-ock pom Huawei Open Computing Kit for Spark - 25.0.0 + 24.0.0 3.3.1 @@ -20,7 +20,7 @@ spark-3.3 3.2.0 3.1.1 - 25.0.0 + 24.0.0 @@ -67,7 +67,7 @@ com.huawei.kunpeng - boostkit-omniop-spark + spark-extension-core 3.3.1-1.9.0 -- Gitee