From ee71723219ec46919de0f1419a381e32dad51636 Mon Sep 17 00:00:00 2001 From: chen-guang-wang <18767185082@163.com> Date: Sat, 27 Aug 2022 10:31:04 +0800 Subject: [PATCH] shuffle debug log refactor for log4j jni --- .../cpp/src/common/debug.h | 6 +++-- .../cpp/src/shuffle/splitter.cpp | 27 +++++++++---------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/common/debug.h b/omnioperator/omniop-spark-extension/cpp/src/common/debug.h index 39415e255..54b204224 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/common/debug.h +++ b/omnioperator/omniop-spark-extension/cpp/src/common/debug.h @@ -30,7 +30,7 @@ #define LogsTrace(format, ...) #endif - +#if defined(TRACE_RUNTIME) || defined(DEBUG_RUNTIME) #define LogsDebug(format, ...) \ do { \ if (static_cast(LogType::LOG_DEBUG) >= GetLogLevel()) { \ @@ -40,7 +40,9 @@ Log(logString, LogType::LOG_DEBUG); \ } \ } while (0) - +#else +#define LogsDebug(format, ...) +#endif #define LogsInfo(format, ...) \ do { \ diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 5a3932d64..598f0ad9e 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -622,8 +622,7 @@ int Splitter::SerializingFixedColumns(int32_t partitionId, int fixColIndexTmp, SplitRowInfo* splitRowInfoTmp) { - LogsDebug(" Fix col :%d th...", fixColIndexTmp); - LogsDebug(" partition_cached_vectorbatch_[%d].size: %ld", partitionId, partition_cached_vectorbatch_[partitionId].size()); + LogsDebug(" Fix col :%d th, partition_cached_vectorbatch_[%d].size: %ld", fixColIndexTmp, partitionId, partition_cached_vectorbatch_[partitionId].size()); if (splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp] < partition_cached_vectorbatch_[partitionId].size()) { auto colIndexTmpSchema = 0; colIndexTmpSchema = singlePartitionFlag ? fixed_width_array_idx_[fixColIndexTmp] : fixed_width_array_idx_[fixColIndexTmp] - 1; @@ -663,7 +662,6 @@ int Splitter::SerializingFixedColumns(int32_t partitionId, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_ + (splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] / (1 << column_type_id_[colIndexTmpSchema])), memCopyLen / (1 << column_type_id_[colIndexTmpSchema])); // 释放内存 - LogsDebug(" free memory Partition[%d] cacheindex[col%d]:%d ", partitionId, fixColIndexTmp, splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]); options_.allocator->free(partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->data_, partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][0]->capacity_); options_.allocator->free(partition_cached_vectorbatch_[partitionId][splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp]][fixColIndexTmp][1]->data_, @@ -683,8 +681,8 @@ int Splitter::SerializingFixedColumns(int32_t partitionId, destCopyedLength = onceCopyLen; // copy目标完成,结束while循环 splitRowInfoTmp->cacheBatchCopyedLen[fixColIndexTmp] += memCopyLen; } - LogsDebug(" memCopyedLen=%d.", memCopyLen); - LogsDebug(" splitRowInfoTmp.cacheBatchIndex[fix_col%d]=%d splitRowInfoTmp.cacheBatchCopyedLen[fix_col%d]=%d ", + LogsDebug(" memCopyedLen=%d, splitRowInfoTmp.cacheBatchIndex[fix_col%d]=%d splitRowInfoTmp.cacheBatchCopyedLen[fix_col%d]=%d ", + memCopyLen, fixColIndexTmp, splitRowInfoTmp->cacheBatchIndex[fixColIndexTmp], fixColIndexTmp, @@ -720,15 +718,16 @@ int Splitter::SerializingBinaryColumns(int32_t partitionId, spark::Vec& vec, int } int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptr &bufferStream) { - LogsDebug(" Spill Pid:%d.", partition_id); SplitRowInfo splitRowInfoTmp; splitRowInfoTmp.copyedRow = 0; splitRowInfoTmp.remainCopyRow = partition_id_cnt_cache_[partition_id]; splitRowInfoTmp.cacheBatchIndex.resize(fixed_width_array_idx_.size()); splitRowInfoTmp.cacheBatchCopyedLen.resize(fixed_width_array_idx_.size()); - LogsDebug(" remainCopyRow %d ", splitRowInfoTmp.remainCopyRow); auto partition_cache_batch_num = partition_cached_vectorbatch_[partition_id].size(); - LogsDebug(" partition_cache_batch_num %lu ", partition_cache_batch_num); + LogsDebug(" Spill Pid %d , remainCopyRow %d , partition_cache_batch_num %lu .", + partition_id, + splitRowInfoTmp.remainCopyRow, + partition_cache_batch_num); int curBatch = 0; // 变长cache batch下标,split已按照options_.spill_batch_row_num切割完成 total_spill_row_num_ += splitRowInfoTmp.remainCopyRow; while (0 < splitRowInfoTmp.remainCopyRow) { @@ -763,8 +762,9 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptrmutable_vectype(); vt->set_typeid_(CastShuffleTypeIdToVecType(vector_batch_col_types_[indexSchema])); - LogsDebug("precision[indexSchema %d]: %d ", indexSchema, input_col_types.inputDataPrecisions[indexSchema]); - LogsDebug("scale[indexSchema %d]: %d ", indexSchema, input_col_types.inputDataScales[indexSchema]); + LogsDebug("precision[indexSchema %d]: %d , scale[indexSchema %d]: %d ", + indexSchema, input_col_types.inputDataPrecisions[indexSchema], + indexSchema, input_col_types.inputDataScales[indexSchema]); if(vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL128 || vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL64){ vt->set_precision(input_col_types.inputDataPrecisions[indexSchema]); vt->set_scale(input_col_types.inputDataScales[indexSchema]); @@ -822,9 +822,8 @@ int Splitter::WriteDataFileProto() { } void Splitter::MergeSpilled() { - LogsDebug(" Merge Spilled Tmp File."); std::unique_ptr outStream = writeLocalFile(options_.data_file); - LogsDebug(" MergeSpilled target dir: %s ", options_.data_file.c_str()); + LogsDebug(" Merge Spilled Tmp File: %s ", options_.data_file.c_str()); WriterOptions options; options.setCompression(options_.compression_type); options.setCompressionBlockSize(options_.compress_block_size); @@ -892,7 +891,6 @@ int Splitter::SpillToTmpFile() { std::shared_ptr ptrTmp = CaculateSpilledTmpFilePartitionOffsets(); spilled_tmp_files_info_[options_.next_spilled_file_dir] = ptrTmp; - LogsDebug(" free vectorBatch memory... "); auto cache_vectorBatch_num = vectorBatch_cache_.size(); for (auto i = 0; i < cache_vectorBatch_num; ++i) { ReleaseVectorBatch(*vectorBatch_cache_[i]); @@ -961,11 +959,10 @@ std::string Splitter::NextSpilledFileDir() { } int Splitter::Stop() { - LogsDebug(" Spill For Splitter Stopped."); TIME_NANO_OR_RAISE(total_spill_time_, SpillToTmpFile()); TIME_NANO_OR_RAISE(total_write_time_, MergeSpilled()); TIME_NANO_OR_RAISE(total_write_time_, DeleteSpilledTmpFile()); - LogsDebug("total_spill_row_num_: %ld ", total_spill_row_num_); + LogsDebug(" Spill For Splitter Stopped. total_spill_row_num_: %ld ", total_spill_row_num_); if (nullptr == vecBatchProto) { throw std::runtime_error("delete nullptr error for free protobuf vecBatch memory"); } -- Gitee