From dcb8fd938f688e8d4bb14418bf7d9e3c90f57495 Mon Sep 17 00:00:00 2001 From: fhyb_llb Date: Tue, 18 Mar 2025 08:31:12 +0000 Subject: [PATCH 1/2] change memcpy_s to memcpy --- .../cpp/src/common/PredicateCondition.h | 11 ++---- .../cpp/src/common/PredicateUtil.cpp | 4 +-- .../cpp/src/jni/OrcColumnarBatchJniWriter.cpp | 18 +++------- .../cpp/src/orcfile/OmniByteRLE.cc | 4 +-- .../cpp/src/orcfile/OmniColReader.cc | 9 +++-- .../cpp/src/orcfile/OmniRLEv2.cc | 2 +- .../cpp/src/parquet/ParquetDecoder.cpp | 4 +-- .../cpp/src/parquet/ParquetDecoder.h | 6 ++-- .../src/parquet/ParquetTypedRecordReader.cpp | 2 +- .../src/parquet/ParquetTypedRecordReader.h | 2 +- .../cpp/src/common/common.h | 2 +- .../cpp/src/io/Compression.cc | 5 ++- .../cpp/src/io/MemoryPool.cc | 6 ++-- .../cpp/src/shuffle/splitter.cpp | 34 ++++++++----------- 14 files changed, 43 insertions(+), 66 deletions(-) diff --git a/omnioperator/omniop-native-reader/cpp/src/common/PredicateCondition.h b/omnioperator/omniop-native-reader/cpp/src/common/PredicateCondition.h index 06751c1fa..c9a89b51f 100644 --- a/omnioperator/omniop-native-reader/cpp/src/common/PredicateCondition.h +++ b/omnioperator/omniop-native-reader/cpp/src/common/PredicateCondition.h @@ -172,10 +172,7 @@ namespace common { auto vectorSize = vector->GetSize(); switch (op) { case TRUE: { - errno_t opTrueRet = memset_s(bitMark, BitUtil::Nbytes(bitSize), -1, BitUtil::Nbytes(bitSize)); - if (UNLIKELY(opTrueRet != EOK)) { - throw OmniException("OPERATOR_RUNTIME_ERROR", "LeafPredicateCondition TRUE memset_s fail."); - } + memset(bitMark, -1, BitUtil::Nbytes(bitSize)); break; } case EQUAL_TO: { @@ -214,11 +211,7 @@ namespace common { break; } case IS_NULL: { - errno_t opIsNullRet = memcpy_s(bitMark, BitUtil::Nbytes(vectorSize), - UnsafeBaseVector::GetNulls(vector), BitUtil::Nbytes(vectorSize)); - if (UNLIKELY(opIsNullRet != EOK)) { - throw OmniException("OPERATOR_RUNTIME_ERROR", "LeafPredicateCondition IS_NULL memcpy_s fail."); - } + memcpy(bitMark, UnsafeBaseVector::GetNulls(vector), BitUtil::Nbytes(vectorSize)); break; } default: diff --git a/omnioperator/omniop-native-reader/cpp/src/common/PredicateUtil.cpp b/omnioperator/omniop-native-reader/cpp/src/common/PredicateUtil.cpp index 215ce7ece..71c44329d 100644 --- a/omnioperator/omniop-native-reader/cpp/src/common/PredicateUtil.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/common/PredicateUtil.cpp @@ -140,7 +140,7 @@ namespace common { // 该列过滤出来都是全部为空的情况 if (isAllNull) { auto *nulls = reinterpret_cast(UnsafeBaseVector::GetNulls(selectedBaseVector)); - memset_s(nulls, BitUtil::Nbytes(selectedBaseVector->GetSize()), -1, BitUtil::Nbytes(selectedBaseVector->GetSize())); + memset(nulls, -1, BitUtil::Nbytes(selectedBaseVector->GetSize())); return; } // 该列过滤出来都是全部不为空的情况 @@ -153,7 +153,7 @@ namespace common { continue; } if (mask == 255) { - memcpy_s(destValues + index, batchStep * sizeof(RAW_DATA_TYPE), srcValues + j, batchStep * sizeof(RAW_DATA_TYPE)); + memcpy(destValues + index, srcValues + j, batchStep * sizeof(RAW_DATA_TYPE)); index += batchStep; continue; } diff --git a/omnioperator/omniop-native-reader/cpp/src/jni/OrcColumnarBatchJniWriter.cpp b/omnioperator/omniop-native-reader/cpp/src/jni/OrcColumnarBatchJniWriter.cpp index ea72d6e6a..1044a8fb9 100644 --- a/omnioperator/omniop-native-reader/cpp/src/jni/OrcColumnarBatchJniWriter.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/jni/OrcColumnarBatchJniWriter.cpp @@ -181,13 +181,8 @@ void WriteVector(JNIEnv *env, BaseVector *vec, ColumnVectorBatch *fieldBatch, bo } index = 0; if(sizeof(T) == sizeof(values[0])){ - errno_t err; - err = memcpy_s(values, sizeof(T) * (endPos - startPos), - omniruntime::vec::unsafe::UnsafeVector::GetRawValues(vector) + startPos, - sizeof(T) * (endPos - startPos)); - if (err != EOK) { - env->ThrowNew(runtimeExceptionClass,"Get values from vector failed"); - } + memcpy(values, omniruntime::vec::unsafe::UnsafeVector::GetRawValues(vector) + startPos, + sizeof(T) * (endPos - startPos)); } else { for (long j = startPos; j < endPos; j++) { values[index] = vector->GetValue(j); @@ -245,13 +240,8 @@ void WriteDecimal64VectorBatch(JNIEnv *env, BaseVector *vec, ColumnVectorBatch * index++; } } - errno_t err; - err = memcpy_s(values,sizeof(values[0]) * (endPos - startPos), - omniruntime::vec::unsafe::UnsafeVector::GetRawValues(vector) + startPos, - sizeof(vector->GetValue(0)) * (endPos - startPos)); - if (err != EOK) { - env->ThrowNew(runtimeExceptionClass,"Get values from vector failed"); - } + memcpy(values, omniruntime::vec::unsafe::UnsafeVector::GetRawValues(vector) + startPos, + sizeof(vector->GetValue(0)) * (endPos - startPos)); } void WriteVarCharVectorBatch(BaseVector *baseVector, ColumnVectorBatch *fieldBatch, bool isSplitWrite = false, diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc index bad64ad72..e66ef02dd 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniByteRLE.cc @@ -316,7 +316,7 @@ namespace omniruntime::reader { } } } else { - memset_s(data + position, count, value, count); + memset(data + position, value, count); consumed = count; } } else { @@ -336,7 +336,7 @@ namespace omniruntime::reader { uint64_t copyBytes = std::min(static_cast(count - i), static_cast(bufferEnd - bufferStart)); - memcpy_s(data + position + i, copyBytes, bufferStart, copyBytes); + memcpy(data + position + i, bufferStart, copyBytes); bufferStart += copyBytes; i += copyBytes; } diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc index 38b43fd64..77ca51811 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniColReader.cc @@ -188,8 +188,7 @@ namespace omniruntime::reader { } else if (incomingNulls) { // if we don't have a notNull stream, copy the incomingNulls // To do finished - memcpy_s(reinterpret_cast(nulls), BitUtil::Nbytes(numValues), incomingNulls, - BitUtil::Nbytes(numValues)); + memcpy(reinterpret_cast(nulls), incomingNulls, BitUtil::Nbytes(numValues)); return; } } @@ -243,7 +242,7 @@ namespace omniruntime::reader { if (posn + length > bufferSize) { throw orc::ParseError("Corrupt dictionary blob in StringDictionaryColumn"); } - memcpy_s(buffer + posn, bufferSize - posn, chunk, static_cast(length)); + memcpy(buffer + posn, chunk, static_cast(length)); posn += length; } } @@ -582,7 +581,7 @@ namespace omniruntime::reader { size_t bytesBuffered = 0; char ptr[totalLength]; while (bytesBuffered + lastBufferLength < totalLength) { - memcpy_s(ptr + bytesBuffered, totalLength - bytesBuffered, lastBuffer, lastBufferLength); + memcpy(ptr + bytesBuffered, lastBuffer, lastBufferLength); bytesBuffered += lastBufferLength; const void* readBuffer; int readLength; @@ -595,7 +594,7 @@ namespace omniruntime::reader { if (bytesBuffered < totalLength) { size_t moreBytes = totalLength - bytesBuffered; - memcpy_s(ptr + bytesBuffered, moreBytes, lastBuffer, moreBytes); + memcpy(ptr + bytesBuffered, lastBuffer, moreBytes); lastBuffer += moreBytes; lastBufferLength -= moreBytes; } diff --git a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc index 7e269de5b..3a9f6f45f 100644 --- a/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc +++ b/omnioperator/omniop-native-reader/cpp/src/orcfile/OmniRLEv2.cc @@ -816,7 +816,7 @@ namespace omniruntime::reader { } } } else { - memcpy_s(data + offset, nRead * sizeof(int64_t), literals.data() + runRead, nRead * sizeof(int64_t)); + memcpy(data + offset, literals.data() + runRead, nRead * sizeof(int64_t)); runRead += nRead; } } else { diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp index 293719ebb..b5c1d712d 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.cpp @@ -86,7 +86,7 @@ namespace omniruntime::reader { int32_t* bytes_offsets = reinterpret_cast(byte_array_offsets_->mutable_data()); for (int i = 0; i < dictionary_length_; ++i) { - memcpy_s(bytes_data + offset, total_size - offset, dict_values[i].ptr, dict_values[i].len); + memcpy(bytes_data + offset, dict_values[i].ptr, dict_values[i].len); bytes_offsets[i] = offset; dict_values[i].ptr = bytes_data + offset; offset += dict_values[i].len; @@ -107,7 +107,7 @@ namespace omniruntime::reader { /*shrink_to_fit=*/false)); uint8_t* bytes_data = byte_array_data_->mutable_data(); for (int32_t i = 0, offset = 0; i < dictionary_length_; ++i, offset += fixed_len) { - memcpy_s(bytes_data + offset, total_size - offset, dict_values[i].ptr, fixed_len); + memcpy(bytes_data + offset, dict_values[i].ptr, fixed_len); dict_values[i].ptr = bytes_data + offset; } } diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h index e94586b6f..ec94c8297 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetDecoder.h @@ -61,7 +61,7 @@ namespace omniruntime::reader { template inline int SpacedExpand(T* buffer, int num_values, int null_count, uint8_t* nulls, int64_t nullsOffset) { int idx_decode = num_values - null_count; - memset_s(static_cast(buffer + idx_decode), null_count * sizeof(T), 0, null_count * sizeof(T)); + memset(static_cast(buffer + idx_decode), 0, null_count * sizeof(T)); if (idx_decode == 0) { // All nulls, nothing more to do return num_values; @@ -380,7 +380,7 @@ namespace omniruntime::reader { ::parquet::ParquetException::EofException(); } if (bytes_to_decode > 0) { - memcpy_s(out, data_size, data, bytes_to_decode); + memcpy(out, data, bytes_to_decode); } return static_cast(bytes_to_decode); } @@ -427,7 +427,7 @@ namespace omniruntime::reader { ::parquet::ParquetException::EofException(); } - memcpy_s(reinterpret_cast(out), bytes_to_decode, data, bytes_to_decode); + memcpy(reinterpret_cast(out), data, bytes_to_decode); return static_cast(bytes_to_decode); } diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp index 87ecefe21..b6e8d14d8 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.cpp @@ -404,7 +404,7 @@ static inline uint64_t UInt64FromBigEndian(const uint8_t* bytes, int32_t length) // Using memcpy instead of special casing for length // and doing the conversion in 16, 32 parts, which could // possibly create unaligned memory access on certain platforms - memcpy_s(reinterpret_cast(&result) + 8 - length, length, bytes, length); + memcpy(reinterpret_cast(&result) + 8 - length, bytes, length); return ::arrow::bit_util::FromBigEndian(result); } diff --git a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.h b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.h index 64c4302f0..cca9b6103 100644 --- a/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.h +++ b/omnioperator/omniop-native-reader/cpp/src/parquet/ParquetTypedRecordReader.h @@ -489,7 +489,7 @@ namespace omniruntime::reader { vec_ = new Vector(capacity); auto capacity_bytes = capacity * byte_width_; if (parquet_vec_ != nullptr) { - memset_s(parquet_vec_, capacity_bytes, 0, capacity_bytes); + memset(parquet_vec_, 0, capacity_bytes); } else { parquet_vec_ = new uint8_t[capacity_bytes]; } diff --git a/omnioperator/omniop-spark-extension/cpp/src/common/common.h b/omnioperator/omniop-spark-extension/cpp/src/common/common.h index 5b72ba5c7..1578b8514 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/common/common.h +++ b/omnioperator/omniop-spark-extension/cpp/src/common/common.h @@ -67,7 +67,7 @@ int32_t BytesGen(uint64_t offsetsAddr, std::string &nullStr, uint64_t valuesAddr } if (len != 0) { - memcpy_s((char *) (values + offsets[i]), len, addr, len); + memcpy((char *) (values + offsets[i]), addr, len); valueTotalLen += len; } } diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc index 01f89bc45..c31d93658 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/Compression.cc @@ -190,9 +190,8 @@ namespace spark { char * header = outputBuffer + outputPosition - totalCompressedSize - 3; if (totalCompressedSize >= static_cast(bufferSize)) { writeHeader(header, static_cast(bufferSize), true); - memcpy_s( + memcpy( header + 3, - static_cast(bufferSize), rawInputBuffer.data(), static_cast(bufferSize)); @@ -398,7 +397,7 @@ namespace spark { } int sizeToWrite = std::min(totalSizeToWrite, outputSize - outputPosition); - memcpy_s(dst, static_cast(sizeToWrite), dataToWrite, static_cast(sizeToWrite)); + memcpy(dst, dataToWrite, static_cast(sizeToWrite)); outputPosition += sizeToWrite; dataToWrite += sizeToWrite; diff --git a/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc b/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc index 4c4b25feb..876ecc890 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc +++ b/omnioperator/omniop-spark-extension/cpp/src/io/MemoryPool.cc @@ -92,7 +92,7 @@ namespace spark { if (buf) { T* buf_old = buf; buf = reinterpret_cast(memoryPool.malloc(sizeof(T) * newCapacity)); - memcpy_s(buf, sizeof(T) * currentSize, buf_old, sizeof(T) * currentSize); + memcpy(buf, buf_old, sizeof(T) * currentSize); memoryPool.free(reinterpret_cast(buf_old)); } else { buf = reinterpret_cast(memoryPool.malloc(sizeof(T) * newCapacity)); @@ -114,7 +114,7 @@ namespace spark { void DataBuffer::resize(uint64_t newSize) { reserve(newSize); if (newSize > currentSize) { - memset_s(buf + currentSize, newSize - currentSize, 0, newSize - currentSize); + memset(buf + currentSize, 0, newSize - currentSize); } currentSize = newSize; } @@ -132,7 +132,7 @@ namespace spark { void DataBuffer::resize(uint64_t newSize) { reserve(newSize); if (newSize > currentSize) { - memset_s(buf + currentSize, newSize - currentSize, 0, newSize - currentSize); + memset(buf + currentSize, 0, newSize - currentSize); } currentSize = newSize; } diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index c794e53b4..e8919de0c 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -25,7 +25,7 @@ SplitOptions SplitOptions::Defaults() { return SplitOptions(); } // 计算分区id,每个batch初始化 int Splitter::ComputeAndCountPartitionId(VectorBatch& vb) { auto num_rows = vb.GetRowCount(); - memset_s(partition_id_cnt_cur_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); + memset(partition_id_cnt_cur_, 0, num_partitions_ * sizeof(int32_t)); partition_id_.resize(num_rows); if (singlePartitionFlag) { @@ -121,7 +121,7 @@ int Splitter::AllocatePartitionBuffers(int32_t partition_id, int32_t new_size) { int Splitter::SplitFixedWidthValueBuffer(VectorBatch& vb) { const auto num_rows = vb.GetRowCount(); for (uint col = 0; col < fixed_width_array_idx_.size(); ++col) { - memset_s(partition_buffer_idx_offset_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); + memset(partition_buffer_idx_offset_, 0, num_partitions_ * sizeof(int32_t)); auto col_idx_vb = fixed_width_array_idx_[col]; auto col_idx_schema = singlePartitionFlag ? col_idx_vb : (col_idx_vb - 1); const auto& dst_addrs = partition_fixed_width_value_addrs_[col]; @@ -352,7 +352,7 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ std::shared_ptr validity_buffer ( new Buffer((uint8_t *)ptr_tmp, partition_id_cnt_cur_[pid], new_size)); dst_addrs[pid] = const_cast(validity_buffer->data_); - memset_s(validity_buffer->data_, new_size, 0, new_size); + memset(validity_buffer->data_, 0, new_size); partition_fixed_width_buffers_[col][pid][0] = std::move(validity_buffer); fixed_nullBuffer_size_[pid] += new_size; } @@ -361,7 +361,7 @@ int Splitter::SplitFixedWidthValidityBuffer(VectorBatch& vb){ // 计算并填充数据 auto src_addr = const_cast((uint8_t *)( reinterpret_cast(omniruntime::vec::unsafe::UnsafeBaseVector::GetNulls(vb.Get(col_idx))))); - memset_s(partition_buffer_idx_offset_, num_partitions_ * sizeof(int32_t), 0, num_partitions_ * sizeof(int32_t)); + memset(partition_buffer_idx_offset_, 0, num_partitions_ * sizeof(int32_t)); const auto num_rows = vb.GetRowCount(); for (auto row = 0; row < num_rows; ++row) { auto pid = partition_id_[row]; @@ -773,14 +773,12 @@ void Splitter::SerializingFixedColumns(int32_t partitionId, } if ((onceCopyLen - destCopyedLength) >= (cacheBatchSize - splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp])) { memCopyLen = cacheBatchSize - splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp]; - memcpy_s((uint8_t*)(ptr_value->data_) + destCopyedLength, - memCopyLen, + memcpy((uint8_t*)(ptr_value->data_) + destCopyedLength, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][1]->data_ + splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp], memCopyLen); // (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])) 等比例计算null数组偏移 if (partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0] != nullptr) { - memcpy_s((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), - memCopyLen / (1 << column_type_id_[colIndexTmpSchema]), + memcpy((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_ + (splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] / (1 << column_type_id_[colIndexTmpSchema])), memCopyLen / (1 << column_type_id_[colIndexTmpSchema])); // 释放内存 @@ -796,15 +794,13 @@ void Splitter::SerializingFixedColumns(int32_t partitionId, splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] = 0; // 初始化下一个cacheBatch的起始偏移 } else { memCopyLen = onceCopyLen - destCopyedLength; - memcpy_s((uint8_t*)(ptr_value->data_) + destCopyedLength, - memCopyLen, + memcpy((uint8_t*)(ptr_value->data_) + destCopyedLength, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][1]->data_ + splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp], memCopyLen); // (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])) 等比例计算null数组偏移 if(partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0] != nullptr) { - memcpy_s((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), - memCopyLen / (1 << column_type_id_[colIndexTmpSchema]), + memcpy((uint8_t*)(ptr_validity->data_) + (destCopyedLength / (1 << column_type_id_[colIndexTmpSchema])), partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_ + (splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] / (1 << column_type_id_[colIndexTmpSchema])), memCopyLen / (1 << column_type_id_[colIndexTmpSchema])); } @@ -913,7 +909,7 @@ int32_t Splitter::ProtoWritePartition(int32_t partition_id, std::unique_ptr(vecBatchProto->ByteSizeLong())); if (bufferStream->Next(&bufferOut, &sizeOut)) { - memcpy_s(bufferOut, sizeof(vecBatchProtoSize), &vecBatchProtoSize, sizeof(vecBatchProtoSize)); + memcpy(bufferOut, &vecBatchProtoSize, sizeof(vecBatchProtoSize)); if (sizeof(vecBatchProtoSize) < static_cast(sizeOut)) { bufferStream->BackUp(sizeOut - sizeof(vecBatchProtoSize)); } @@ -980,7 +976,7 @@ int32_t Splitter::ProtoWritePartitionByRow(int32_t partition_id, std::unique_ptr } uint32_t protoRowBatchSize = reversebytes_uint32t(static_cast(protoRowBatch->ByteSizeLong())); if (bufferStream->Next(&bufferOut, &sizeOut)) { - memcpy_s(bufferOut, sizeof(protoRowBatchSize), &protoRowBatchSize, sizeof(protoRowBatchSize)); + memcpy(bufferOut, &protoRowBatchSize, sizeof(protoRowBatchSize)); if (sizeof(protoRowBatchSize) < static_cast(sizeOut)) { bufferStream->BackUp(sizeOut - sizeof(protoRowBatchSize)); } @@ -1064,7 +1060,7 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptrSerializeToZeroCopyStream(bufferStream.get()); @@ -1136,7 +1132,7 @@ int Splitter::protoSpillPartitionByRow(int32_t partition_id, std::unique_ptrSerializeToZeroCopyStream(bufferStream.get()); @@ -1165,7 +1161,7 @@ int Splitter::WriteDataFileProto() { for (auto pid = 0; pid < num_partitions_; ++pid) { protoSpillPartition(pid, bufferStream); } - memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); + memset(partition_id_cnt_cache_, 0, num_partitions_ * sizeof(uint64_t)); outStream->close(); return 0; } @@ -1235,7 +1231,7 @@ void Splitter::MergeSpilled() { } } - memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); + memset(partition_id_cnt_cache_, 0, num_partitions_ * sizeof(uint64_t)); ReleaseVarcharVector(); num_row_splited_ = 0; cached_vectorbatch_size_ = 0; @@ -1308,7 +1304,7 @@ void Splitter::WriteSplit() { ProtoWritePartition(pid, bufferOutPutStream, bufferOut, sizeOut); } - memset_s(partition_id_cnt_cache_, num_partitions_ * sizeof(uint64_t), 0, num_partitions_ * sizeof(uint64_t)); + memset(partition_id_cnt_cache_, 0, num_partitions_ * sizeof(uint64_t)); ReleaseVarcharVector(); num_row_splited_ = 0; cached_vectorbatch_size_ = 0; -- Gitee From fa8469b0794b90572e2b49547cb54d7b3bb06b4e Mon Sep 17 00:00:00 2001 From: fhyb_llb Date: Tue, 18 Mar 2025 08:38:47 +0000 Subject: [PATCH 2/2] numa binding --- .../boostkit/spark/ColumnarPlugin.scala | 22 ++++++- .../boostkit/spark/ColumnarPluginConfig.scala | 14 +++++ .../sql/execution/util/ExecutorManager.scala | 58 +++++++++++++++++++ 3 files changed, 92 insertions(+), 2 deletions(-) create mode 100644 omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/util/ExecutorManager.scala diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala index b080d1a4b..dce4fb47a 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala @@ -18,7 +18,7 @@ package com.huawei.boostkit.spark -import com.huawei.boostkit.spark.ColumnarPluginConfig.{ENABLE_OMNI_COLUMNAR_TO_ROW, ENABLE_OMNI_TUNED} +import com.huawei.boostkit.spark.ColumnarPluginConfig.{ENABLED_NUMA_BINDING, ENABLE_OMNI_COLUMNAR_TO_ROW, NUMA_BINDING_CORE_RANGE, ENABLE_OMNI_TUNED} import com.huawei.boostkit.spark.Constant.OMNI_IS_ADAPTIVE_CONTEXT import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.util.{ModifyUtilAdaptor, PhysicalPlanSelector} @@ -47,6 +47,7 @@ import org.apache.spark.sql.util.ShimUtil import nova.hetu.omniruntime.memory.MemoryManager import org.apache.spark.SparkEnv import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.util.ExecutorManager import org.apache.spark.sql.types.{BooleanType, DataType, DateType, DoubleType, IntegerType, LongType, ShortType} import java.io.BufferedReader @@ -1020,7 +1021,7 @@ class ColumnarPlugin extends (SparkSessionExtensions => Unit) with Logging { } } -private class OmniTaskStartExecutorPlugin extends ExecutorPlugin with Logging{ +private class OmniTaskStartExecutorPlugin extends ExecutorPlugin with Logging { private def initTunedOptimization(): Unit = { val enableTunedOptimization = SparkEnv.get.conf.get(ENABLE_OMNI_TUNED.key, "false").toBoolean @@ -1068,8 +1069,25 @@ private class OmniTaskStartExecutorPlugin extends ExecutorPlugin with Logging{ } } + private def initNumaBind(): Unit = { + val enableNumaBinding: Boolean = SparkEnv.get.conf.get(ENABLED_NUMA_BINDING.key, "false").toBoolean + val bindingInfo = if (!enableNumaBinding) { + NumaBindingInfo(enableNumaBinding = false) + } else { + val tmp = SparkEnv.get.conf.get(NUMA_BINDING_CORE_RANGE.key, "") + if (tmp.isEmpty) { + NumaBindingInfo(enableNumaBinding = false) + } else { + val coreRangeList: Array[String] = tmp.split('|').map(_.trim) + NumaBindingInfo(enableNumaBinding = true, coreRangeList) + } + } + ExecutorManager.tryTaskSet(bindingInfo) + } + override def init(ctx: PluginContext, extraConf: java.util.Map[String, String]): Unit = { initTunedOptimization() + initNumaBind() } override def onTaskStart(): Unit = { diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index 7ac1a14fd..9b13f48b7 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -24,6 +24,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.shuffle.sort.ColumnarShuffleManager import org.apache.spark.sql.internal.SQLConf +case class NumaBindingInfo(enableNumaBinding: Boolean, totalCoreRange: Array[String] = null, numCoresPerExecutor: Int = -1) {} + class ColumnarPluginConfig(conf: SQLConf) extends Logging { def columnarShuffleStr: String = conf .getConfString("spark.shuffle.manager", "sort") @@ -697,4 +699,16 @@ object ColumnarPluginConfig { .doc("enable tuned-adm optimization") .booleanConf .createWithDefault(false) + + val ENABLED_NUMA_BINDING = + buildConf("spark.omni.sql.columnar.numaBinding") + .internal() + .booleanConf + .createWithDefault(false) + + val NUMA_BINDING_CORE_RANGE = + buildConf("spark.omni.sql.columnar.coreRange") + .internal() + .stringConf + .createOptional } diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/util/ExecutorManager.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/util/ExecutorManager.scala new file mode 100644 index 000000000..2286b8955 --- /dev/null +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/util/ExecutorManager.scala @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.util + +import com.huawei.boostkit.spark.{ColumnarPluginConfig, NumaBindingInfo} +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils +import org.apache.spark.{SparkContext, SparkEnv} + +import java.lang.management.ManagementFactory + +object ExecutorManager extends Logging { + var isTaskSet: Boolean = false + + def getExecutorIds(sc: SparkContext): Seq[String] = sc.getExecutorIds + + def tryTaskSet(numaInfo: NumaBindingInfo): Any = synchronized { + if (numaInfo.enableNumaBinding && !isTaskSet) { + val cmd_output = Utils.executeAndGetOutput(Seq("bash", "-c", "ps -ef | grep YarnCoarseGrainedExecutorBackend")) + val getExecutorId = """--executor-id (\d+)""".r + val executorIdOnLocalNode = { + val tmp = for (m <- getExecutorId.findAllMatchIn(cmd_output)) yield m.group(1) + tmp.toList.distinct + } + val executorId = SparkEnv.get.executorId + val coreRange = numaInfo.totalCoreRange + val shouldBindNumaIdx = executorIdOnLocalNode.indexOf(executorId) % coreRange.size + System.out.println( + s"executorId is $executorId, executorIdOnLocalNode is $executorIdOnLocalNode") + val taskSetCmd = s"taskset -cpa ${coreRange(shouldBindNumaIdx)} ${getProcessId()}" + System.out.println(taskSetCmd) + + isTaskSet = true + Utils.executeCommand(Seq("bash", "-c", taskSetCmd)) + } + } + + def getProcessId(): Int = { + val runtimeMXBean = ManagementFactory.getRuntimeMXBean() + runtimeMXBean.getName().split("@")(0).toInt + } + +} -- Gitee