diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 598f0ad9ed93383ed099787b3dd4299b8c3fc669..4c9484bdd9e868aa67b15f47a18cb6faa13f5935 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -374,13 +374,6 @@ int Splitter::DoSplit(VectorBatch& vb) { first_vector_batch_ = true; } - for (auto col = 0; col < fixed_width_array_idx_.size(); ++col) { - auto col_idx = fixed_width_array_idx_[col]; - if (vb.GetVector(col_idx)->GetValueNulls() != nullptr) { - input_fixed_width_has_null_[col] = true; - } - } - // prepare partition buffers and spill if necessary for (auto pid = 0; pid < num_partitions_; ++pid) { if (fixed_width_array_idx_.size() > 0 && @@ -427,58 +420,61 @@ void Splitter::ToSplitterTypeId(int num_cols) { for (int i = 0; i < num_cols; ++i) { switch (input_col_types.inputVecTypeIds[i]) { - case OMNI_INT:{ - column_type_id_.push_back(ShuffleTypeId::SHUFFLE_4BYTE); - vector_batch_col_types_.push_back(OMNI_INT); + case OMNI_BOOLEAN: { + CastOmniToShuffleType(OMNI_BOOLEAN, SHUFFLE_1BYTE); break; } - case OMNI_LONG:{ - column_type_id_.push_back(ShuffleTypeId::SHUFFLE_8BYTE); - vector_batch_col_types_.push_back(OMNI_LONG); + case OMNI_SHORT: { + CastOmniToShuffleType(OMNI_SHORT, SHUFFLE_2BYTE); break; } - case OMNI_DOUBLE:{ - column_type_id_.push_back(ShuffleTypeId::SHUFFLE_8BYTE); - vector_batch_col_types_.push_back(OMNI_DOUBLE); + case OMNI_INT: { + CastOmniToShuffleType(OMNI_INT, SHUFFLE_4BYTE); break; } - case OMNI_DATE32:{ - column_type_id_.push_back(ShuffleTypeId::SHUFFLE_4BYTE); - vector_batch_col_types_.push_back(OMNI_DATE32); + case OMNI_LONG: { + CastOmniToShuffleType(OMNI_LONG, SHUFFLE_8BYTE); break; } - case OMNI_DATE64:{ - column_type_id_.push_back(ShuffleTypeId::SHUFFLE_8BYTE); - vector_batch_col_types_.push_back(OMNI_DATE64); + case OMNI_DOUBLE: { + CastOmniToShuffleType(OMNI_DOUBLE, SHUFFLE_8BYTE); break; } - case OMNI_DECIMAL64:{ - column_type_id_.push_back(ShuffleTypeId::SHUFFLE_8BYTE); - vector_batch_col_types_.push_back(OMNI_DECIMAL64); + case OMNI_DATE32: { + CastOmniToShuffleType(OMNI_DATE32, SHUFFLE_4BYTE); break; } - case OMNI_DECIMAL128:{ - column_type_id_.push_back(ShuffleTypeId::SHUFFLE_DECIMAL128); - vector_batch_col_types_.push_back(OMNI_DECIMAL128); + case OMNI_DATE64: { + CastOmniToShuffleType(OMNI_DATE64, SHUFFLE_8BYTE); break; } - case OMNI_CHAR:{ - column_type_id_.push_back(ShuffleTypeId::SHUFFLE_BINARY); - vector_batch_col_types_.push_back(OMNI_CHAR); + case OMNI_DECIMAL64: { + CastOmniToShuffleType(OMNI_DECIMAL64, SHUFFLE_8BYTE); break; } - case OMNI_VARCHAR:{ - column_type_id_.push_back(ShuffleTypeId::SHUFFLE_BINARY); - vector_batch_col_types_.push_back(OMNI_VARCHAR); + case OMNI_DECIMAL128: { + CastOmniToShuffleType(OMNI_DECIMAL128, SHUFFLE_DECIMAL128); break; } - default:{ - throw std::runtime_error("Unsupported DataTypeId."); + case OMNI_CHAR: { + CastOmniToShuffleType(OMNI_CHAR, SHUFFLE_BINARY); + break; + } + case OMNI_VARCHAR: { + CastOmniToShuffleType(OMNI_VARCHAR, SHUFFLE_BINARY); + break; } + default: throw std::runtime_error("Unsupported DataTypeId: " + input_col_types.inputVecTypeIds[i]); } } } +void Splitter::CastOmniToShuffleType(DataTypeId omniType, ShuffleTypeId shuffleType) +{ + vector_batch_col_types_.push_back(omniType); + column_type_id_.push_back(shuffleType); +} + int Splitter::Split_Init(){ num_row_splited_ = 0; cached_vectorbatch_size_ = 0; @@ -527,7 +523,6 @@ int Splitter::Split_Init(){ partition_fixed_width_validity_addrs_.resize(num_fixed_width); partition_fixed_width_value_addrs_.resize(num_fixed_width); partition_fixed_width_buffers_.resize(num_fixed_width); - input_fixed_width_has_null_.resize(num_fixed_width, false); for (auto i = 0; i < num_fixed_width; ++i) { partition_fixed_width_validity_addrs_[i].resize(num_partitions_); partition_fixed_width_value_addrs_[i].resize(num_partitions_); diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h index 6339bec516397d4610d28a5f9205ee3ab9616d55..ca6c33d6b13185404159db0abc02f3a58e9f4fe3 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.h @@ -85,6 +85,8 @@ class Splitter { void ToSplitterTypeId(int num_cols); + void CastOmniToShuffleType(DataTypeId omniType, ShuffleTypeId shuffleType); + void MergeSpilled(); std::vector partition_id_; // 记录当前vb每一行的pid diff --git a/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp b/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp index f67d9d4742e77f03aafb887528351a423876147c..1834345d54466d8e65f34eaea4ba2c99396440e0 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/shuffle/shuffle_test.cpp @@ -83,7 +83,7 @@ TEST_F (ShuffleTest, Split_SingleVarChar) { Test_splitter_split(splitterId, vb5); VectorBatch* vb6 = CreateVectorBatch_1row_varchar_withPid(1, "R"); Test_splitter_split(splitterId, vb6); - VectorBatch* vb7 = CreateVectorBatch_1row_varchar_withPid(3,"N"); + VectorBatch* vb7 = CreateVectorBatch_1row_varchar_withPid(3, "N"); Test_splitter_split(splitterId, vb7); Test_splitter_stop(splitterId); Test_splitter_close(splitterId); @@ -93,7 +93,7 @@ TEST_F (ShuffleTest, Split_SingleVarChar) { TEST_F (ShuffleTest, Split_Fixed_Cols) { tmpShuffleFilePath = tmpTestingDir + "/shuffle_split_fixed_cols"; - int32_t inputVecTypeIds[] = {OMNI_INT, OMNI_LONG, OMNI_DOUBLE}; + int32_t inputVecTypeIds[] = {OMNI_BOOLEAN, OMNI_SHORT, OMNI_INT, OMNI_LONG, OMNI_DOUBLE}; int colNumber = sizeof(inputVecTypeIds) / sizeof(inputVecTypeIds[0]); InputDataTypes inputDataTypes; inputDataTypes.inputVecTypeIds = inputVecTypeIds; @@ -109,8 +109,8 @@ TEST_F (ShuffleTest, Split_Fixed_Cols) { tmpShuffleFilePath, 0, tmpTestingDir); - for (uint64_t j = 0; j < 999; j++) { - VectorBatch* vb = CreateVectorBatch_3fixedCols_withPid(partitionNum, 999); + for (uint64_t j = 0; j < 1; j++) { + VectorBatch* vb = CreateVectorBatch_5fixedCols_withPid(partitionNum, 999); Test_splitter_split(splitterId, vb); } Test_splitter_stop(splitterId); @@ -121,7 +121,7 @@ TEST_F (ShuffleTest, Split_Fixed_Cols) { TEST_F (ShuffleTest, Split_Fixed_SinglePartition_SomeNullRow) { tmpShuffleFilePath = tmpTestingDir + "/shuffle_split_fixed_singlePartition_someNullRow"; - int32_t inputVecTypeIds[] = {OMNI_INT, OMNI_LONG, OMNI_DOUBLE, OMNI_VARCHAR}; + int32_t inputVecTypeIds[] = {OMNI_BOOLEAN, OMNI_SHORT, OMNI_INT, OMNI_LONG, OMNI_DOUBLE, OMNI_VARCHAR}; int colNumber = sizeof(inputVecTypeIds) / sizeof(inputVecTypeIds[0]); InputDataTypes inputDataTypes; inputDataTypes.inputVecTypeIds = inputVecTypeIds; @@ -223,8 +223,64 @@ TEST_F (ShuffleTest, Split_Mix_LargeSize) { delete[] inputDataTypes.inputDataScales; } -TEST_F (ShuffleTest, Split_Long_10WRows) { - tmpShuffleFilePath = tmpTestingDir + "/shuffle_split_long_10WRows"; +TEST_F (ShuffleTest, Split_Short_10WRows) { + tmpShuffleFilePath = tmpTestingDir + "/shuffle_split_short_10WRows"; + int32_t inputVecTypeIds[] = {OMNI_SHORT}; + int colNumber = sizeof(inputVecTypeIds) / sizeof(inputVecTypeIds[0]); + InputDataTypes inputDataTypes; + inputDataTypes.inputVecTypeIds = inputVecTypeIds; + inputDataTypes.inputDataPrecisions = new uint32_t[colNumber]; + inputDataTypes.inputDataScales = new uint32_t[colNumber]; + int partitionNum = 10; + int splitterId = Test_splitter_nativeMake("hash", + partitionNum, + inputDataTypes, + colNumber, + 4096, + "lz4", + tmpShuffleFilePath, + 0, + tmpTestingDir); + for (uint64_t j = 0; j < 100; j++) { + VectorBatch* vb = CreateVectorBatch_1FixCol_withPid(partitionNum, 1000, OMNI_SHORT); + Test_splitter_split(splitterId, vb); + } + Test_splitter_stop(splitterId); + Test_splitter_close(splitterId); + delete[] inputDataTypes.inputDataPrecisions; + delete[] inputDataTypes.inputDataScales; +} + +TEST_F (ShuffleTest, Split_Boolean_10WRows) { + tmpShuffleFilePath = tmpTestingDir + "/shuffle_split_boolean_10WRows"; + int32_t inputVecTypeIds[] = {OMNI_BOOLEAN}; + int colNumber = sizeof(inputVecTypeIds) / sizeof(inputVecTypeIds[0]); + InputDataTypes inputDataTypes; + inputDataTypes.inputVecTypeIds = inputVecTypeIds; + inputDataTypes.inputDataPrecisions = new uint32_t[colNumber]; + inputDataTypes.inputDataScales = new uint32_t[colNumber]; + int partitionNum = 10; + int splitterId = Test_splitter_nativeMake("hash", + partitionNum, + inputDataTypes, + colNumber, + 4096, + "lz4", + tmpShuffleFilePath, + 0, + tmpTestingDir); + for (uint64_t j = 0; j < 100; j++) { + VectorBatch* vb = CreateVectorBatch_1FixCol_withPid(partitionNum, 1000, OMNI_BOOLEAN); + Test_splitter_split(splitterId, vb); + } + Test_splitter_stop(splitterId); + Test_splitter_close(splitterId); + delete[] inputDataTypes.inputDataPrecisions; + delete[] inputDataTypes.inputDataScales; +} + +TEST_F (ShuffleTest, Split_Long_100WRows) { + tmpShuffleFilePath = tmpTestingDir + "/shuffle_split_long_100WRows"; int32_t inputVecTypeIds[] = {OMNI_LONG}; int colNumber = sizeof(inputVecTypeIds) / sizeof(inputVecTypeIds[0]); InputDataTypes inputDataTypes; @@ -242,7 +298,7 @@ TEST_F (ShuffleTest, Split_Long_10WRows) { 0, tmpTestingDir); for (uint64_t j = 0; j < 100; j++) { - VectorBatch* vb = CreateVectorBatch_1longCol_withPid(partitionNum, 10000); + VectorBatch* vb = CreateVectorBatch_1FixCol_withPid(partitionNum, 10000, OMNI_LONG); Test_splitter_split(splitterId, vb); } Test_splitter_stop(splitterId); diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp index 875484a23a0a734c71a6b62c0361fdad3387cacf..6ad03446e438a5441afe0e7fd0390075f783b8ce 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp @@ -46,6 +46,9 @@ VectorBatch* CreateInputData(const int32_t numRows, vecBatch->NewVectors(omniruntime::vec::GetProcessGlobalVecAllocator(), inputTypes); for (int i = 0; i < numCols; ++i) { switch (inputTypeIds[i]) { + case OMNI_BOOLEAN: + ((BooleanVector *)vecBatch->GetVector(i))->SetValues(0, (int32_t *)allData[i], numRows); + break; case OMNI_INT: ((IntVector *)vecBatch->GetVector(i))->SetValues(0, (int32_t *)allData[i], numRows); break; @@ -56,7 +59,7 @@ VectorBatch* CreateInputData(const int32_t numRows, ((DoubleVector *)vecBatch->GetVector(i))->SetValues(0, (double *)allData[i], numRows); break; case OMNI_SHORT: - ((IntVector *)vecBatch->GetVector(i))->SetValues(0, (int32_t *)allData[i], numRows); + ((ShortVector *)vecBatch->GetVector(i))->SetValues(0, (int16_t *)allData[i], numRows); break; case OMNI_VARCHAR: case OMNI_CHAR: { @@ -299,12 +302,12 @@ VectorBatch* CreateVectorBatch_4col_withPid(int parNum, int rowNum) { return in; } -VectorBatch* CreateVectorBatch_1longCol_withPid(int parNum, int rowNum) { +VectorBatch* CreateVectorBatch_1FixCol_withPid(int parNum, int rowNum, int32_t fixColType) { int partitionNum = parNum; const int32_t numCols = 2; int32_t* inputTypes = new int32_t[numCols]; inputTypes[0] = OMNI_INT; - inputTypes[1] = OMNI_LONG; + inputTypes[1] = fixColType; const int32_t numRows = rowNum; auto* col0 = new int32_t[numRows]; @@ -458,39 +461,49 @@ VectorBatch* CreateVectorBatch_4charCols_withPid(int parNum, int rowNum) { return in; } -VectorBatch* CreateVectorBatch_3fixedCols_withPid(int parNum, int rowNum) { +VectorBatch* CreateVectorBatch_5fixedCols_withPid(int parNum, int rowNum) { int partitionNum = parNum; // gen vectorBatch - const int32_t numCols = 4; + const int32_t numCols = 6; int32_t* inputTypes = new int32_t[numCols]; inputTypes[0] = OMNI_INT; - inputTypes[1] = OMNI_INT; - inputTypes[2] = OMNI_LONG; - inputTypes[3] = OMNI_DOUBLE; + inputTypes[1] = OMNI_BOOLEAN; + inputTypes[2] = OMNI_SHORT; + inputTypes[3] = OMNI_INT; + inputTypes[4] = OMNI_LONG; + inputTypes[5] = OMNI_DOUBLE; const int32_t numRows = rowNum; auto* col0 = new int32_t[numRows]; - auto* col1 = new int32_t[numRows]; - auto* col2 = new int64_t[numRows]; - auto* col3 = new double[numRows]; + auto* col1 = new bool[numRows]; + auto* col2 = new int16_t[numRows]; + auto* col3 = new int32_t[numRows]; + auto* col4 = new int64_t[numRows]; + auto* col5 = new double[numRows]; for (int i = 0; i < numRows; i++) { col0[i] = i % partitionNum; - col1[i] = i + 1; + col1[i] = (i % 2) == 0 ? true : false; col2[i] = i + 1; col3[i] = i + 1; + col4[i] = i + 1; + col5[i] = i + 1; } int64_t allData[numCols] = {reinterpret_cast(col0), reinterpret_cast(col1), reinterpret_cast(col2), - reinterpret_cast(col3)}; + reinterpret_cast(col3), + reinterpret_cast(col4), + reinterpret_cast(col5)}; VectorBatch* in = CreateInputData(numRows, numCols, inputTypes, allData); delete[] inputTypes; delete[] col0; delete[] col1; delete[] col2; delete[] col3; + delete[] col4; + delete[] col5; return in; } @@ -564,26 +577,34 @@ VectorBatch* CreateVectorBatch_2decimalCol_withPid(int partitionNum, int rowNum) VectorBatch* CreateVectorBatch_someNullRow_vectorBatch() { const int32_t numRows = 6; - int32_t data1[numRows] = {0, 1, 2, 0, 1, 2}; - int64_t data2[numRows] = {0, 1, 2, 3, 4, 5}; - double data3[numRows] = {0.0, 1.1, 2.2, 3.3, 4.4, 5.5}; - std::string data4[numRows] = {"abcde", "fghij", "klmno", "pqrst", "", ""}; - - auto vec0 = CreateVector(data1, numRows); - auto vec1 = CreateVector(data2, numRows); - auto vec2 = CreateVector(data3, numRows); - auto vec3 = CreateVarcharVector(VarcharDataType(5), data4, numRows); + bool data0[numRows] = {true, false, true, false, true, false}; + int16_t data1[numRows] = {0, 1, 2, 3, 4, 6}; + int32_t data2[numRows] = {0, 1, 2, 0, 1, 2}; + int64_t data3[numRows] = {0, 1, 2, 3, 4, 5}; + double data4[numRows] = {0.0, 1.1, 2.2, 3.3, 4.4, 5.5}; + std::string data5[numRows] = {"abcde", "fghij", "klmno", "pqrst", "", ""}; + + auto vec0 = CreateVector(data0, numRows); + auto vec1 = CreateVector(data1, numRows); + auto vec2 = CreateVector(data2, numRows); + auto vec3 = CreateVector(data3, numRows); + auto vec4 = CreateVector(data4, numRows); + auto vec5 = CreateVarcharVector(VarcharDataType(5), data5, numRows); for (int i = 0; i < numRows; i = i + 2) { vec0->SetValueNull(i); vec1->SetValueNull(i); vec2->SetValueNull(i); vec3->SetValueNull(i); + vec4->SetValueNull(i); + vec5->SetValueNull(i); } - VectorBatch *vecBatch = new VectorBatch(4); + VectorBatch *vecBatch = new VectorBatch(6); vecBatch->SetVector(0, vec0); vecBatch->SetVector(1, vec1); vecBatch->SetVector(2, vec2); vecBatch->SetVector(3, vec3); + vecBatch->SetVector(4, vec4); + vecBatch->SetVector(5, vec5); return vecBatch; } diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h index 428d81c46a9ccbd72a7def9a5c0b3dab574a40b7..50319a0311e8ba867b7789de03eb572d13dfd0ec 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h @@ -40,7 +40,7 @@ VectorBatch* CreateVectorBatch_1row_varchar_withPid(int pid, std::string inputCh VectorBatch* CreateVectorBatch_4col_withPid(int parNum, int rowNum); -VectorBatch* CreateVectorBatch_1longCol_withPid(int parNum, int rowNum); +VectorBatch* CreateVectorBatch_1FixCol_withPid(int parNum, int rowNum, int32_t fixColType); VectorBatch* CreateVectorBatch_2column_1row_withPid(int pid, std::string strVar, int intVar); @@ -48,7 +48,7 @@ VectorBatch* CreateVectorBatch_4varcharCols_withPid(int parNum, int rowNum); VectorBatch* CreateVectorBatch_4charCols_withPid(int parNum, int rowNum); -VectorBatch* CreateVectorBatch_3fixedCols_withPid(int parNum, int rowNum); +VectorBatch* CreateVectorBatch_5fixedCols_withPid(int parNum, int rowNum); VectorBatch* CreateVectorBatch_2dictionaryCols_withPid(int partitionNum); diff --git a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_100batch_4096rows_lz4 b/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_100batch_4096rows_lz4 deleted file mode 100644 index 18c123b569c7abd71053a558dde23e811039806b..0000000000000000000000000000000000000000 Binary files a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_100batch_4096rows_lz4 and /dev/null differ diff --git a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_1batch_100rows_snappy b/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_1batch_100rows_snappy deleted file mode 100644 index 712a82d4aadad6eb5f3650df0209e23b5d83e668..0000000000000000000000000000000000000000 Binary files a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_1batch_100rows_snappy and /dev/null differ diff --git a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_1batch_100rows_uncompressed b/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_1batch_100rows_uncompressed deleted file mode 100644 index 2f835a7c95af1c746f1e4a50f5ef5e26f9fafb1c..0000000000000000000000000000000000000000 Binary files a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_1batch_100rows_uncompressed and /dev/null differ diff --git a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_1batch_100rows_zlib b/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_1batch_100rows_zlib deleted file mode 100644 index e89b125edd874cc1fe287a0e19960b30d90a10fa..0000000000000000000000000000000000000000 Binary files a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_spilled_mix_1batch_100rows_zlib and /dev/null differ diff --git a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_split_fixed_singlePartition_someNullCol b/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_split_fixed_singlePartition_someNullCol deleted file mode 100644 index 3cec85e7ed2efb44dc54b888cdab203a8bb7b405..0000000000000000000000000000000000000000 Binary files a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_split_fixed_singlePartition_someNullCol and /dev/null differ diff --git a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_split_fixed_singlePartition_someNullRow b/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_split_fixed_singlePartition_someNullRow deleted file mode 100644 index 8e0c78f75e44cc82c9286330dd26777d21f4140d..0000000000000000000000000000000000000000 Binary files a/omnioperator/omniop-spark-extension/java/src/test/resources/test-data/shuffle_split_fixed_singlePartition_someNullRow and /dev/null differ diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerDisableCompressSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerDisableCompressSuite.scala index b4f8fa1d25b9cd6d67089c3848a1ea49c92dba49..ac220b8b34d8b51d0342f3e59049f561e24e2c63 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerDisableCompressSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerDisableCompressSuite.scala @@ -18,57 +18,229 @@ package org.apache.spark.shuffle -import java.io.FileInputStream +import java.io.{File, FileInputStream} import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer -import org.apache.spark.{SparkConf, SparkFunSuite} +import com.huawei.boostkit.spark.vectorized.PartitionInfo +import nova.hetu.omniruntime.`type`.{DataType, _} +import nova.hetu.omniruntime.vector._ +import org.apache.spark.{HashPartitioner, SparkConf, TaskContext} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.shuffle.sort.ColumnarShuffleHandle import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.OmniColumnVector import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils +import org.mockito.Answers.RETURNS_SMART_NULLS +import org.mockito.ArgumentMatchers.{any, anyInt, anyLong} +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.{doAnswer, when} +import org.mockito.invocation.InvocationOnMock -class ColumnShuffleSerializerDisableCompressSuite extends SparkFunSuite with SharedSparkSession { - - private var avgBatchNumRows: SQLMetric = _ - private var outputNumRows: SQLMetric = _ +class ColumnShuffleSerializerDisableCompressSuite extends SharedSparkSession { + @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ + @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ + @Mock(answer = RETURNS_SMART_NULLS) private var dependency + : ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = _ override def sparkConf: SparkConf = super.sparkConf - .setAppName("test ColumnarShuffleDeSerializer disable compressed") + .setAppName("test shuffle serializer disable compress") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.shuffle.compress", "false") + private var taskMetrics: TaskMetrics = _ + private var tempDir: File = _ + private var outputFile: File = _ + + private var shuffleHandle: ColumnarShuffleHandle[Int, ColumnarBatch] = _ + private val numPartitions = 1 + + protected var avgBatchNumRows: SQLMetric = _ + protected var outputNumRows: SQLMetric = _ + override def beforeEach(): Unit = { + super.beforeEach() + avgBatchNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer avg read batch num rows") outputNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer number of output rows") + + tempDir = Utils.createTempDir() + outputFile = File.createTempFile("shuffle", null, tempDir) + taskMetrics = new TaskMetrics + + MockitoAnnotations.initMocks(this) + + shuffleHandle = + new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) + + val types : Array[DataType] = Array[DataType]( + IntDataType.INTEGER, + ShortDataType.SHORT, + LongDataType.LONG, + DoubleDataType.DOUBLE, + new Decimal64DataType(18, 3), + new Decimal128DataType(28, 11), + VarcharDataType.VARCHAR, + BooleanDataType.BOOLEAN) + val inputTypes = DataTypeSerializer.serialize(types) + + when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) + when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) + when(dependency.partitionInfo).thenReturn( + new PartitionInfo("hash", numPartitions, types.length, inputTypes)) + when(dependency.dataSize) + .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "data size")) + when(dependency.bytesSpilled) + .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "shuffle bytes spilled")) + when(dependency.numInputRows) + .thenReturn(SQLMetrics.createMetric(spark.sparkContext, "number of input rows")) + when(dependency.splitTime) + .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_split")) + when(dependency.spillTime) + .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_spill")) + when(taskContext.taskMetrics()).thenReturn(taskMetrics) + when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) + + doAnswer { (invocationOnMock: InvocationOnMock) => + val tmp = invocationOnMock.getArguments()(3).asInstanceOf[File] + if (tmp != null) { + outputFile.delete + tmp.renameTo(outputFile) + } + null + }.when(blockResolver) + .writeIndexFileAndCommit(anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[File])) } - test("columnar shuffle deserialize no null uncompressed compressed") { - val input = getTestResourcePath("test-data/shuffle_spilled_mix_1batch_100rows_uncompressed") - val serializer = - new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() - val deserializedStream = - serializer.deserializeStream(new FileInputStream(input)) - - val kv = deserializedStream.asKeyValueIterator - var length = 0 - kv.foreach { - case (_, batch: ColumnarBatch) => - length += 1 - assert(batch.numRows == 100) - assert(batch.numCols == 4) - (0 until batch.numCols).foreach { i => - val valueVector = - batch - .column(i) - .asInstanceOf[OmniColumnVector] - .getVec - assert(valueVector.getSize == batch.numRows) - } - batch.close() + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(tempDir) + } finally { + super.afterEach() } - assert(length == 1) - deserializedStream.close() + } + + override def afterAll(): Unit = { + super.afterAll() + } + + test("write shuffle compress for none with null value last") { + val pidArray: Array[java.lang.Integer] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + val intArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, null) + val shortArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, null) + val longArray: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, + 17L, 18L, 19L, null) + val doubleArray: Array[java.lang.Double] = Array(0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.10, 11.11, 12.12, + 13.13, 14.14, 15.15, 16.16, 17.17, 18.18, 19.19, null) + val decimal64Array: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, + 17L, 18L, 19L, null) + val decimal128Array: Array[Array[Long]] = Array( + Array(0L, 0L), Array(1L, 1L), Array(2L, 2L), Array(3L, 3L), Array(4L, 4L), Array(5L, 5L), Array(6L, 6L), + Array(7L, 7L), Array(8L, 8L), Array(9L, 9L), Array(10L, 10L), Array(11L, 11L), Array(12L, 12L), Array(13L, 13L), + Array(14L, 14L), Array(15L, 15L), Array(16L, 16L), Array(17L, 17L), Array(18L, 18L), Array(19L, 19L), null) + val stringArray: Array[java.lang.String] = Array("", "a", "bb", "ccc", "dddd", "eeeee", "ffffff", "ggggggg", + "hhhhhhhh", "iiiiiiiii", "jjjjjjjjjj", "kkkkkkkkkkk", "llllllllllll", "mmmmmmmmmmmmm", "nnnnnnnnnnnnnn", + "ooooooooooooooo", "pppppppppppppppp", "qqqqqqqqqqqqqqqqq", "rrrrrrrrrrrrrrrrrr", "sssssssssssssssssss", null) + val booleanArray: Array[java.lang.Boolean] = Array(true, true, true, true, true, true, true, true, true, true, + false, false, false, false, false, false, false, false, false, false, null) + + val pidVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) + val intVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) + val shortVector0 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) + val longVector0 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) + val doubleVector0 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) + val decimal64Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) + val decimal128Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) + val varcharVector0 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) + val booleanVector0 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) + + val cb0 = ColumnarShuffleWriterSuite.makeColumnarBatch( + pidVector0.getVec.getSize, + List(pidVector0, intVector0, shortVector0, longVector0, doubleVector0, + decimal64Vector0, decimal128Vector0, varcharVector0, booleanVector0) + ) + + val pidVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) + val intVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) + val shortVector1 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) + val longVector1 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) + val doubleVector1 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) + val decimal64Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) + val decimal128Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) + val varcharVector1 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) + val booleanVector1 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) + + val cb1 = ColumnarShuffleWriterSuite.makeColumnarBatch( + pidVector1.getVec.getSize, + List(pidVector1, intVector1, shortVector1, longVector1, doubleVector1, + decimal64Vector1, decimal128Vector1, varcharVector1, booleanVector1) + ) + + def records: Iterator[(Int, ColumnarBatch)] = Iterator((0, cb0), (0, cb1)) + + val writer = new ColumnarShuffleWriter[Int, ColumnarBatch]( + blockResolver, + shuffleHandle, + 0L, // MapId + taskContext.taskMetrics().shuffleWriteMetrics) + + writer.write(records) + writer.stop(success = true) + + assert(writer.getPartitionLengths.sum === outputFile.length()) + assert(writer.getPartitionLengths.count(_ == 0L) === 0) + // should be (numPartitions - 2) zero length files + + val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics + assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) + assert(shuffleWriteMetrics.recordsWritten === records.length) + + assert(taskMetrics.diskBytesSpilled === 0) + assert(taskMetrics.memoryBytesSpilled === 0) + + val serializer = new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() + val deserializedStream = serializer.deserializeStream(new FileInputStream(outputFile)) + + try { + val kv = deserializedStream.asKeyValueIterator + var length = 0 + kv.foreach { + case (_, batch: ColumnarBatch) => + length += 1 + assert(batch.numRows == 42) + assert(batch.numCols == 8) + assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(0) == 0) + assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(19) == 19) + assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(0) == 0) + assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(19) == 19) + assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0) + assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19) + assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(0) == 0.0) + assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(19) == 19.19) + assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0L) + assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19L) + assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(0) sameElements Array(0L, 0L)) + assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(19) sameElements Array(19L, 19L)) + assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(0) sameElements "") + assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(19) sameElements "sssssssssssssssssss") + assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(0) == true) + assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(19) == false) + (0 until batch.numCols).foreach { i => + val valueVector = batch.column(i).asInstanceOf[OmniColumnVector].getVec + assert(valueVector.getSize == batch.numRows) + assert(valueVector.isNull(20)) + } + batch.close() + } + assert(length == 1) + } finally { + deserializedStream.close() + } + } } diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala index f6960b828f06cd064c4111505947944b8bd5c343..9c174409641ff600d3d2ed717093292f6fb000c9 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerLz4Suite.scala @@ -18,57 +18,230 @@ package org.apache.spark.shuffle -import java.io.FileInputStream +import java.io.{File, FileInputStream} import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer -import org.apache.spark.{SparkConf, SparkFunSuite} +import com.huawei.boostkit.spark.vectorized.PartitionInfo +import nova.hetu.omniruntime.`type`.{DataType, _} +import nova.hetu.omniruntime.vector._ +import org.apache.spark.{HashPartitioner, SparkConf, TaskContext} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.shuffle.sort.ColumnarShuffleHandle import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.OmniColumnVector import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils +import org.mockito.Answers.RETURNS_SMART_NULLS +import org.mockito.ArgumentMatchers.{any, anyInt, anyLong} +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.{doAnswer, when} +import org.mockito.invocation.InvocationOnMock -class ColumnShuffleSerializerLz4Suite extends SparkFunSuite with SharedSparkSession { - private var avgBatchNumRows: SQLMetric = _ - private var outputNumRows: SQLMetric = _ +class ColumnShuffleSerializerLz4Suite extends SharedSparkSession { + @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ + @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ + @Mock(answer = RETURNS_SMART_NULLS) private var dependency + : ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = _ override def sparkConf: SparkConf = super.sparkConf - .setAppName("test ColumnarShuffleDeSerializer") + .setAppName("test shuffle serializer for lz4") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "lz4") + private var taskMetrics: TaskMetrics = _ + private var tempDir: File = _ + private var outputFile: File = _ + + private var shuffleHandle: ColumnarShuffleHandle[Int, ColumnarBatch] = _ + private val numPartitions = 1 + + protected var avgBatchNumRows: SQLMetric = _ + protected var outputNumRows: SQLMetric = _ + override def beforeEach(): Unit = { + super.beforeEach() + avgBatchNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer avg read batch num rows") outputNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer number of output rows") + + tempDir = Utils.createTempDir() + outputFile = File.createTempFile("shuffle", null, tempDir) + taskMetrics = new TaskMetrics + + MockitoAnnotations.initMocks(this) + + shuffleHandle = + new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) + + val types : Array[DataType] = Array[DataType]( + IntDataType.INTEGER, + ShortDataType.SHORT, + LongDataType.LONG, + DoubleDataType.DOUBLE, + new Decimal64DataType(18, 3), + new Decimal128DataType(28, 11), + VarcharDataType.VARCHAR, + BooleanDataType.BOOLEAN) + val inputTypes = DataTypeSerializer.serialize(types) + + when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) + when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) + when(dependency.partitionInfo).thenReturn( + new PartitionInfo("hash", numPartitions, types.length, inputTypes)) + when(dependency.dataSize) + .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "data size")) + when(dependency.bytesSpilled) + .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "shuffle bytes spilled")) + when(dependency.numInputRows) + .thenReturn(SQLMetrics.createMetric(spark.sparkContext, "number of input rows")) + when(dependency.splitTime) + .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_split")) + when(dependency.spillTime) + .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_spill")) + when(taskContext.taskMetrics()).thenReturn(taskMetrics) + when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) + + doAnswer { (invocationOnMock: InvocationOnMock) => + val tmp = invocationOnMock.getArguments()(3).asInstanceOf[File] + if (tmp != null) { + outputFile.delete + tmp.renameTo(outputFile) + } + null + }.when(blockResolver) + .writeIndexFileAndCommit(anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[File])) + } + + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(tempDir) + } finally { + super.afterEach() + } + } + + override def afterAll(): Unit = { + super.afterAll() } - test("columnar shuffle deserialize no null lz4 compressed") { - val input = getTestResourcePath("test-data/shuffle_spilled_mix_100batch_4096rows_lz4") - val serializer = - new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() - val deserializedStream = - serializer.deserializeStream(new FileInputStream(input)) - - val kv = deserializedStream.asKeyValueIterator - var length = 0 - kv.foreach { - case (_, batch: ColumnarBatch) => - length += 1 - assert(batch.numRows == 4096) - assert(batch.numCols == 4) - (0 until batch.numCols).foreach { i => - val valueVector = - batch - .column(i) - .asInstanceOf[OmniColumnVector] - .getVec - assert(valueVector.getSize == batch.numRows) - } - batch.close() + test("write shuffle compress for lz4 with no null value") { + val pidArray: Array[java.lang.Integer] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + val intArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) + val shortArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) + val longArray: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, + 17L, 18L, 19L, 20L) + val doubleArray: Array[java.lang.Double] = Array(0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.10, 11.11, 12.12, + 13.13, 14.14, 15.15, 16.16, 17.17, 18.18, 19.19, 20.20) + val decimal64Array: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, + 17L, 18L, 19L, 20L) + val decimal128Array: Array[Array[Long]] = Array( + Array(0L, 0L), Array(1L, 1L), Array(2L, 2L), Array(3L, 3L), Array(4L, 4L), Array(5L, 5L), Array(6L, 6L), + Array(7L, 7L), Array(8L, 8L), Array(9L, 9L), Array(10L, 10L), Array(11L, 11L), Array(12L, 12L), Array(13L, 13L), + Array(14L, 14L), Array(15L, 15L), Array(16L, 16L), Array(17L, 17L), Array(18L, 18L), Array(19L, 19L), Array(20L, 20L)) + val stringArray: Array[java.lang.String] = Array("", "a", "bb", "ccc", "dddd", "eeeee", "ffffff", "ggggggg", + "hhhhhhhh", "iiiiiiiii", "jjjjjjjjjj", "kkkkkkkkkkk", "llllllllllll", "mmmmmmmmmmmmm", "nnnnnnnnnnnnnn", + "ooooooooooooooo", "pppppppppppppppp", "qqqqqqqqqqqqqqqqq", "rrrrrrrrrrrrrrrrrr", "sssssssssssssssssss", + "tttttttttttttttttttt") + val booleanArray: Array[java.lang.Boolean] = Array(true, true, true, true, true, true, true, true, true, true, + false, false, false, false, false, false, false, false, false, false, false) + + val pidVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) + val intVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) + val shortVector0 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) + val longVector0 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) + val doubleVector0 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) + val decimal64Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) + val decimal128Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) + val varcharVector0 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) + val booleanVector0 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) + + val cb0 = ColumnarShuffleWriterSuite.makeColumnarBatch( + pidVector0.getVec.getSize, + List(pidVector0, intVector0, shortVector0, longVector0, doubleVector0, + decimal64Vector0, decimal128Vector0, varcharVector0, booleanVector0) + ) + + val pidVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) + val intVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) + val shortVector1 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) + val longVector1 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) + val doubleVector1 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) + val decimal64Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) + val decimal128Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) + val varcharVector1 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) + val booleanVector1 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) + + val cb1 = ColumnarShuffleWriterSuite.makeColumnarBatch( + pidVector1.getVec.getSize, + List(pidVector1, intVector1, shortVector1, longVector1, doubleVector1, + decimal64Vector1, decimal128Vector1, varcharVector1, booleanVector1) + ) + + def records: Iterator[(Int, ColumnarBatch)] = Iterator((0, cb0), (0, cb1)) + + val writer = new ColumnarShuffleWriter[Int, ColumnarBatch]( + blockResolver, + shuffleHandle, + 0L, // MapId + taskContext.taskMetrics().shuffleWriteMetrics) + + writer.write(records) + writer.stop(success = true) + + assert(writer.getPartitionLengths.sum === outputFile.length()) + assert(writer.getPartitionLengths.count(_ == 0L) === 0) + // should be (numPartitions - 2) zero length files + + val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics + assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) + assert(shuffleWriteMetrics.recordsWritten === records.length) + + assert(taskMetrics.diskBytesSpilled === 0) + assert(taskMetrics.memoryBytesSpilled === 0) + + val serializer = new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() + val deserializedStream = serializer.deserializeStream(new FileInputStream(outputFile)) + + try { + val kv = deserializedStream.asKeyValueIterator + var length = 0 + kv.foreach { + case (_, batch: ColumnarBatch) => + length += 1 + assert(batch.numRows == 42) + assert(batch.numCols == 8) + assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(0) == 0) + assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(19) == 19) + assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(0) == 0) + assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(19) == 19) + assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0) + assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19) + assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(0) == 0.0) + assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(19) == 19.19) + assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0L) + assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19L) + assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(0) sameElements Array(0L, 0L)) + assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(19) sameElements Array(19L, 19L)) + assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(0) sameElements "") + assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(19) sameElements "sssssssssssssssssss") + assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(0) == true) + assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(19) == false) + (0 until batch.numCols).foreach { i => + val valueVector = batch.column(i).asInstanceOf[OmniColumnVector].getVec + assert(valueVector.getSize == batch.numRows) + } + batch.close() + } + assert(length == 1) + } finally { + deserializedStream.close() } - assert(length == 100) - deserializedStream.close() + } } diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala index 278214d4c51c1bae90a56729037b1ed7cb2be52a..aad135badf554d7b7a7143f1ddbfe8a10a167753 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSnappySuite.scala @@ -18,57 +18,230 @@ package org.apache.spark.shuffle -import java.io.FileInputStream +import java.io.{File, FileInputStream} import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer -import org.apache.spark.{SparkConf, SparkFunSuite} +import com.huawei.boostkit.spark.vectorized.PartitionInfo +import nova.hetu.omniruntime.`type`.{DataType, _} +import nova.hetu.omniruntime.vector._ +import org.apache.spark.{HashPartitioner, SparkConf, TaskContext} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.shuffle.sort.ColumnarShuffleHandle import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.OmniColumnVector import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils +import org.mockito.Answers.RETURNS_SMART_NULLS +import org.mockito.ArgumentMatchers.{any, anyInt, anyLong} +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.{doAnswer, when} +import org.mockito.invocation.InvocationOnMock -class ColumnShuffleSerializerSnappySuite extends SparkFunSuite with SharedSparkSession { - private var avgBatchNumRows: SQLMetric = _ - private var outputNumRows: SQLMetric = _ +class ColumnShuffleSerializerSnappySuite extends SharedSparkSession { + @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ + @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ + @Mock(answer = RETURNS_SMART_NULLS) private var dependency + : ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = _ override def sparkConf: SparkConf = super.sparkConf - .setAppName("test ColumnarShuffleDeSerializer") + .setAppName("test shuffle serializer for snappy") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "snappy") + private var taskMetrics: TaskMetrics = _ + private var tempDir: File = _ + private var outputFile: File = _ + + private var shuffleHandle: ColumnarShuffleHandle[Int, ColumnarBatch] = _ + private val numPartitions = 1 + + protected var avgBatchNumRows: SQLMetric = _ + protected var outputNumRows: SQLMetric = _ + override def beforeEach(): Unit = { + super.beforeEach() + avgBatchNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer avg read batch num rows") outputNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer number of output rows") + + tempDir = Utils.createTempDir() + outputFile = File.createTempFile("shuffle", null, tempDir) + taskMetrics = new TaskMetrics + + MockitoAnnotations.initMocks(this) + + shuffleHandle = + new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) + + val types : Array[DataType] = Array[DataType]( + IntDataType.INTEGER, + ShortDataType.SHORT, + LongDataType.LONG, + DoubleDataType.DOUBLE, + new Decimal64DataType(18, 3), + new Decimal128DataType(28, 11), + VarcharDataType.VARCHAR, + BooleanDataType.BOOLEAN) + val inputTypes = DataTypeSerializer.serialize(types) + + when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) + when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) + when(dependency.partitionInfo).thenReturn( + new PartitionInfo("hash", numPartitions, types.length, inputTypes)) + when(dependency.dataSize) + .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "data size")) + when(dependency.bytesSpilled) + .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "shuffle bytes spilled")) + when(dependency.numInputRows) + .thenReturn(SQLMetrics.createMetric(spark.sparkContext, "number of input rows")) + when(dependency.splitTime) + .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_split")) + when(dependency.spillTime) + .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_spill")) + when(taskContext.taskMetrics()).thenReturn(taskMetrics) + when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) + + doAnswer { (invocationOnMock: InvocationOnMock) => + val tmp = invocationOnMock.getArguments()(3).asInstanceOf[File] + if (tmp != null) { + outputFile.delete + tmp.renameTo(outputFile) + } + null + }.when(blockResolver) + .writeIndexFileAndCommit(anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[File])) + } + + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(tempDir) + } finally { + super.afterEach() + } + } + + override def afterAll(): Unit = { + super.afterAll() } - test("columnar shuffle deserialize no null snappy compressed") { - val input = getTestResourcePath("test-data/shuffle_spilled_mix_1batch_100rows_snappy") - val serializer = - new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() - val deserializedStream = - serializer.deserializeStream(new FileInputStream(input)) - - val kv = deserializedStream.asKeyValueIterator - var length = 0 - kv.foreach { - case (_, batch: ColumnarBatch) => - length += 1 - assert(batch.numRows == 100) - assert(batch.numCols == 4) - (0 until batch.numCols).foreach { i => - val valueVector = - batch - .column(i) - .asInstanceOf[OmniColumnVector] - .getVec - assert(valueVector.getSize == batch.numRows) - } - batch.close() + test("write shuffle compress for snappy") { + val pidArray: Array[java.lang.Integer] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + val intArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) + val shortArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) + val longArray: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, + 17L, 18L, 19L, 20L) + val doubleArray: Array[java.lang.Double] = Array(0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.10, 11.11, 12.12, + 13.13, 14.14, 15.15, 16.16, 17.17, 18.18, 19.19, 20.20) + val decimal64Array: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L, 12L, 13L, 14L, 15L, 16L, + 17L, 18L, 19L, 20L) + val decimal128Array: Array[Array[Long]] = Array( + Array(0L, 0L), Array(1L, 1L), Array(2L, 2L), Array(3L, 3L), Array(4L, 4L), Array(5L, 5L), Array(6L, 6L), + Array(7L, 7L), Array(8L, 8L), Array(9L, 9L), Array(10L, 10L), Array(11L, 11L), Array(12L, 12L), Array(13L, 13L), + Array(14L, 14L), Array(15L, 15L), Array(16L, 16L), Array(17L, 17L), Array(18L, 18L), Array(19L, 19L), Array(20L, 20L)) + val stringArray: Array[java.lang.String] = Array("", "a", "bb", "ccc", "dddd", "eeeee", "ffffff", "ggggggg", + "hhhhhhhh", "iiiiiiiii", "jjjjjjjjjj", "kkkkkkkkkkk", "llllllllllll", "mmmmmmmmmmmmm", "nnnnnnnnnnnnnn", + "ooooooooooooooo", "pppppppppppppppp", "qqqqqqqqqqqqqqqqq", "rrrrrrrrrrrrrrrrrr", "sssssssssssssssssss", + "tttttttttttttttttttt") + val booleanArray: Array[java.lang.Boolean] = Array(true, true, true, true, true, true, true, true, true, true, + false, false, false, false, false, false, false, false, false, false, false) + + val pidVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) + val intVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) + val shortVector0 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) + val longVector0 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) + val doubleVector0 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) + val decimal64Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) + val decimal128Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) + val varcharVector0 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) + val booleanVector0 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) + + val cb0 = ColumnarShuffleWriterSuite.makeColumnarBatch( + pidVector0.getVec.getSize, + List(pidVector0, intVector0, shortVector0, longVector0, doubleVector0, + decimal64Vector0, decimal128Vector0, varcharVector0, booleanVector0) + ) + + val pidVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) + val intVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) + val shortVector1 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) + val longVector1 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) + val doubleVector1 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) + val decimal64Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) + val decimal128Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) + val varcharVector1 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) + val booleanVector1 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) + + val cb1 = ColumnarShuffleWriterSuite.makeColumnarBatch( + pidVector1.getVec.getSize, + List(pidVector1, intVector1, shortVector1, longVector1, doubleVector1, + decimal64Vector1, decimal128Vector1, varcharVector1, booleanVector1) + ) + + def records: Iterator[(Int, ColumnarBatch)] = Iterator((0, cb0), (0, cb1)) + + val writer = new ColumnarShuffleWriter[Int, ColumnarBatch]( + blockResolver, + shuffleHandle, + 0L, // MapId + taskContext.taskMetrics().shuffleWriteMetrics) + + writer.write(records) + writer.stop(success = true) + + assert(writer.getPartitionLengths.sum === outputFile.length()) + assert(writer.getPartitionLengths.count(_ == 0L) === 0) + // should be (numPartitions - 2) zero length files + + val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics + assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) + assert(shuffleWriteMetrics.recordsWritten === records.length) + + assert(taskMetrics.diskBytesSpilled === 0) + assert(taskMetrics.memoryBytesSpilled === 0) + + val serializer = new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() + val deserializedStream = serializer.deserializeStream(new FileInputStream(outputFile)) + + try { + val kv = deserializedStream.asKeyValueIterator + var length = 0 + kv.foreach { + case (_, batch: ColumnarBatch) => + length += 1 + assert(batch.numRows == 42) + assert(batch.numCols == 8) + assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(0) == 0) + assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(19) == 19) + assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(0) == 0) + assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(19) == 19) + assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0) + assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19) + assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(0) == 0.0) + assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(19) == 19.19) + assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0L) + assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19L) + assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(0) sameElements Array(0L, 0L)) + assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(19) sameElements Array(19L, 19L)) + assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(0) sameElements "") + assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(19) sameElements "sssssssssssssssssss") + assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(0) == true) + assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(19) == false) + (0 until batch.numCols).foreach { i => + val valueVector = batch.column(i).asInstanceOf[OmniColumnVector].getVec + assert(valueVector.getSize == batch.numRows) + } + batch.close() + } + assert(length == 1) + } finally { + deserializedStream.close() } - assert(length == 1) - deserializedStream.close() + } } diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSuite.scala deleted file mode 100644 index 51e3466b674141d1181f238c34db2e4b5013c16b..0000000000000000000000000000000000000000 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerSuite.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle - -import java.io.FileInputStream - -import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} -import org.apache.spark.sql.execution.vectorized.OmniColumnVector -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.vectorized.ColumnarBatch - -class ColumnShuffleSerializerSuite extends SparkFunSuite with SharedSparkSession { - private var avgBatchNumRows: SQLMetric = _ - private var outputNumRows: SQLMetric = _ - - override def sparkConf: SparkConf = - super.sparkConf - .setAppName("test ColumnarShuffleDeSerializer") - .set("spark.shuffle.compress", "true") - .set("spark.io.compression.codec", "lz4") - - override def beforeEach(): Unit = { - avgBatchNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, - "test serializer avg read batch num rows") - outputNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, - "test serializer number of output rows") - } - - test("columnar shuffle deserialize some row nullable value lz4 compressed") { - val input = getTestResourcePath("test-data/shuffle_split_fixed_singlePartition_someNullRow") - val serializer = - new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() - val deserializedStream = - serializer.deserializeStream(new FileInputStream(input)) - - val kv = deserializedStream.asKeyValueIterator - var length = 0 - kv.foreach { - case (_, batch: ColumnarBatch) => - length += 1 - assert(batch.numRows == 600) - assert(batch.numCols == 4) - (0 until batch.numCols).foreach { i => - val valueVector = - batch - .column(i) - .asInstanceOf[OmniColumnVector] - .getVec - assert(valueVector.getSize == batch.numRows) - } - batch.close() - } - assert(length == 1) - deserializedStream.close() - } - - test("columnar shuffle deserialize some col nullable value lz4 compressed") { - val input = getTestResourcePath("test-data/shuffle_split_fixed_singlePartition_someNullCol") - val serializer = - new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() - val deserializedStream = - serializer.deserializeStream(new FileInputStream(input)) - - val kv = deserializedStream.asKeyValueIterator - var length = 0 - kv.foreach { - case (_, batch: ColumnarBatch) => - length += 1 - assert(batch.numRows == 600) - assert(batch.numCols == 4) - (0 until batch.numCols).foreach { i => - val valueVector = - batch - .column(i) - .asInstanceOf[OmniColumnVector] - .getVec - assert(valueVector.getSize == batch.numRows) - } - batch.close() - } - assert(length == 1) - deserializedStream.close() - } -} diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerZlibSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerZlibSuite.scala index 08a4343283262fe6b99854cb3da52b38251c49ae..f892e22bcac56577e56d98026ca913125eaa3da7 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerZlibSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnShuffleSerializerZlibSuite.scala @@ -18,57 +18,231 @@ package org.apache.spark.shuffle -import java.io.FileInputStream +import java.io.{File, FileInputStream} import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer -import org.apache.spark.{SparkConf, SparkFunSuite} +import com.huawei.boostkit.spark.vectorized.PartitionInfo +import nova.hetu.omniruntime.`type`.{DataType, _} +import nova.hetu.omniruntime.vector._ +import org.apache.spark.{HashPartitioner, SparkConf, TaskContext} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.shuffle.sort.ColumnarShuffleHandle import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.OmniColumnVector import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils +import org.mockito.Answers.RETURNS_SMART_NULLS +import org.mockito.ArgumentMatchers.{any, anyInt, anyLong} +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.{doAnswer, when} +import org.mockito.invocation.InvocationOnMock -class ColumnShuffleSerializerZlibSuite extends SparkFunSuite with SharedSparkSession { - private var avgBatchNumRows: SQLMetric = _ - private var outputNumRows: SQLMetric = _ +class ColumnShuffleSerializerZlibSuite extends SharedSparkSession { + @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ + @Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _ + @Mock(answer = RETURNS_SMART_NULLS) private var dependency + : ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = _ override def sparkConf: SparkConf = super.sparkConf - .setAppName("test ColumnarShuffleDeSerializer zlib compressed") + .setAppName("test shuffle serializer for zlib") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.shuffle.compress", "true") .set("spark.io.compression.codec", "zlib") + private var taskMetrics: TaskMetrics = _ + private var tempDir: File = _ + private var outputFile: File = _ + + private var shuffleHandle: ColumnarShuffleHandle[Int, ColumnarBatch] = _ + private val numPartitions = 1 + + protected var avgBatchNumRows: SQLMetric = _ + protected var outputNumRows: SQLMetric = _ + override def beforeEach(): Unit = { + super.beforeEach() + avgBatchNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer avg read batch num rows") outputNumRows = SQLMetrics.createAverageMetric(spark.sparkContext, "test serializer number of output rows") + + tempDir = Utils.createTempDir() + outputFile = File.createTempFile("shuffle", null, tempDir) + taskMetrics = new TaskMetrics + + MockitoAnnotations.initMocks(this) + + shuffleHandle = + new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) + + val types : Array[DataType] = Array[DataType]( + IntDataType.INTEGER, + ShortDataType.SHORT, + LongDataType.LONG, + DoubleDataType.DOUBLE, + new Decimal64DataType(18, 3), + new Decimal128DataType(28, 11), + VarcharDataType.VARCHAR, + BooleanDataType.BOOLEAN) + val inputTypes = DataTypeSerializer.serialize(types) + + when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) + when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) + when(dependency.partitionInfo).thenReturn( + new PartitionInfo("hash", numPartitions, types.length, inputTypes)) + when(dependency.dataSize) + .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "data size")) + when(dependency.bytesSpilled) + .thenReturn(SQLMetrics.createSizeMetric(spark.sparkContext, "shuffle bytes spilled")) + when(dependency.numInputRows) + .thenReturn(SQLMetrics.createMetric(spark.sparkContext, "number of input rows")) + when(dependency.splitTime) + .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_split")) + when(dependency.spillTime) + .thenReturn(SQLMetrics.createNanoTimingMetric(spark.sparkContext, "totaltime_spill")) + when(taskContext.taskMetrics()).thenReturn(taskMetrics) + when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile) + + doAnswer { (invocationOnMock: InvocationOnMock) => + val tmp = invocationOnMock.getArguments()(3).asInstanceOf[File] + if (tmp != null) { + outputFile.delete + tmp.renameTo(outputFile) + } + null + }.when(blockResolver) + .writeIndexFileAndCommit(anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[File])) + } + + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(tempDir) + } finally { + super.afterEach() + } + } + + override def afterAll(): Unit = { + super.afterAll() } - test("columnar shuffle deserialize no null snappy compressed") { - val input = getTestResourcePath("test-data/shuffle_spilled_mix_1batch_100rows_zlib") - val serializer = - new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() - val deserializedStream = - serializer.deserializeStream(new FileInputStream(input)) - - val kv = deserializedStream.asKeyValueIterator - var length = 0 - kv.foreach { - case (_, batch: ColumnarBatch) => - length += 1 - assert(batch.numRows == 100) - assert(batch.numCols == 4) - (0 until batch.numCols).foreach { i => - val valueVector = - batch - .column(i) - .asInstanceOf[OmniColumnVector] - .getVec - assert(valueVector.getSize == batch.numRows) - } - batch.close() + test("write shuffle compress for zlib with null value middle") { + val pidArray: Array[java.lang.Integer] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + val intArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, null, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) + val shortArray: Array[java.lang.Integer] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, null, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) + val longArray: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, null, 11L, 12L, 13L, 14L, 15L, 16L, + 17L, 18L, 19L, 20L) + val doubleArray: Array[java.lang.Double] = Array(0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, null, 11.11, 12.12, + 13.13, 14.14, 15.15, 16.16, 17.17, 18.18, 19.19, 20.20) + val decimal64Array: Array[java.lang.Long] = Array(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, null, 11L, 12L, 13L, 14L, 15L, 16L, + 17L, 18L, 19L, 20L) + val decimal128Array: Array[Array[Long]] = Array( + Array(0L, 0L), Array(1L, 1L), Array(2L, 2L), Array(3L, 3L), Array(4L, 4L), Array(5L, 5L), Array(6L, 6L), + Array(7L, 7L), Array(8L, 8L), Array(9L, 9L), null, Array(11L, 11L), Array(12L, 12L), Array(13L, 13L), + Array(14L, 14L), Array(15L, 15L), Array(16L, 16L), Array(17L, 17L), Array(18L, 18L), Array(19L, 19L), Array(20L, 20L)) + val stringArray: Array[java.lang.String] = Array("", "a", "bb", "ccc", "dddd", "eeeee", "ffffff", "ggggggg", + "hhhhhhhh", "iiiiiiiii", null, "kkkkkkkkkkk", "llllllllllll", "mmmmmmmmmmmmm", "nnnnnnnnnnnnnn", + "ooooooooooooooo", "pppppppppppppppp", "qqqqqqqqqqqqqqqqq", "rrrrrrrrrrrrrrrrrr", "sssssssssssssssssss", + "tttttttttttttttttttt") + val booleanArray: Array[java.lang.Boolean] = Array(true, true, true, true, true, true, true, true, true, true, + null, false, false, false, false, false, false, false, false, false, false) + + val pidVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) + val intVector0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) + val shortVector0 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) + val longVector0 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) + val doubleVector0 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) + val decimal64Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) + val decimal128Vector0 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) + val varcharVector0 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) + val booleanVector0 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) + + val cb0 = ColumnarShuffleWriterSuite.makeColumnarBatch( + pidVector0.getVec.getSize, + List(pidVector0, intVector0, shortVector0, longVector0, doubleVector0, + decimal64Vector0, decimal128Vector0, varcharVector0, booleanVector0) + ) + + val pidVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(pidArray) + val intVector1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(intArray) + val shortVector1 = ColumnarShuffleWriterSuite.initOmniColumnShortVector(shortArray) + val longVector1 = ColumnarShuffleWriterSuite.initOmniColumnLongVector(longArray) + val doubleVector1 = ColumnarShuffleWriterSuite.initOmniColumnDoubleVector(doubleArray) + val decimal64Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(decimal64Array) + val decimal128Vector1 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(decimal128Array) + val varcharVector1 = ColumnarShuffleWriterSuite.initOmniColumnVarcharVector(stringArray) + val booleanVector1 = ColumnarShuffleWriterSuite.initOmniColumnBooleanVector(booleanArray) + + val cb1 = ColumnarShuffleWriterSuite.makeColumnarBatch( + pidVector1.getVec.getSize, + List(pidVector1, intVector1, shortVector1, longVector1, doubleVector1, + decimal64Vector1, decimal128Vector1, varcharVector1, booleanVector1) + ) + + def records: Iterator[(Int, ColumnarBatch)] = Iterator((0, cb0), (0, cb1)) + + val writer = new ColumnarShuffleWriter[Int, ColumnarBatch]( + blockResolver, + shuffleHandle, + 0L, // MapId + taskContext.taskMetrics().shuffleWriteMetrics) + + writer.write(records) + writer.stop(success = true) + + assert(writer.getPartitionLengths.sum === outputFile.length()) + assert(writer.getPartitionLengths.count(_ == 0L) === 0) + // should be (numPartitions - 2) zero length files + + val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics + assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) + assert(shuffleWriteMetrics.recordsWritten === records.length) + + assert(taskMetrics.diskBytesSpilled === 0) + assert(taskMetrics.memoryBytesSpilled === 0) + + val serializer = new ColumnarBatchSerializer(avgBatchNumRows, outputNumRows).newInstance() + val deserializedStream = serializer.deserializeStream(new FileInputStream(outputFile)) + + try { + val kv = deserializedStream.asKeyValueIterator + var length = 0 + kv.foreach { + case (_, batch: ColumnarBatch) => + length += 1 + assert(batch.numRows == 42) + assert(batch.numCols == 8) + assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(0) == 0) + assert(batch.column(0).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[IntVec].get(19) == 19) + assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(0) == 0) + assert(batch.column(1).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[ShortVec].get(19) == 19) + assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0) + assert(batch.column(2).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19) + assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(0) == 0.0) + assert(batch.column(3).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[DoubleVec].get(19) == 19.19) + assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(0) == 0L) + assert(batch.column(4).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[LongVec].get(19) == 19L) + assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(0) sameElements Array(0L, 0L)) + assert(batch.column(5).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[Decimal128Vec].get(19) sameElements Array(19L, 19L)) + assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(0) sameElements "") + assert(batch.column(6).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[VarcharVec].get(19) sameElements "sssssssssssssssssss") + assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(0) == true) + assert(batch.column(7).asInstanceOf[OmniColumnVector].getVec.asInstanceOf[BooleanVec].get(19) == false) + (0 until batch.numCols).foreach { i => + val valueVector = batch.column(i).asInstanceOf[OmniColumnVector].getVec + assert(valueVector.getSize == batch.numRows) + assert(valueVector.isNull(10)) + } + batch.close() + } + assert(length == 1) + } finally { + deserializedStream.close() } - assert(length == 1) - deserializedStream.close() + } } diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala index 6e55a49f737461b3e1172aeff9f2566288cfb8f1..4a21362d624c896ebea5c02408ea61f31267a15f 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala @@ -22,7 +22,7 @@ import java.io.{File, FileInputStream} import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer import com.huawei.boostkit.spark.vectorized.PartitionInfo -import nova.hetu.omniruntime.`type`.Decimal64DataType +import nova.hetu.omniruntime.`type`.{DataType, _} import nova.hetu.omniruntime.vector._ import org.apache.spark.{HashPartitioner, SparkConf, TaskContext} import org.apache.spark.executor.TaskMetrics @@ -31,7 +31,7 @@ import org.apache.spark.shuffle.sort.ColumnarShuffleHandle import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.OmniColumnVector import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import org.apache.spark.util.Utils import org.mockito.Answers.RETURNS_SMART_NULLS @@ -50,6 +50,7 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { super.sparkConf .setAppName("test ColumnarShuffleWriter") .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.shuffle.compress", "false") private var taskMetrics: TaskMetrics = _ private var tempDir: File = _ @@ -78,10 +79,12 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { shuffleHandle = new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) - val inputTypes = "[{\"id\":1}," + - "{\"id\":1}," + - "{\"id\":6,\"precision\":18,\"scale\":3}," + - "{\"id\":7,\"precision\":28,\"scale\":11}]" + val types : Array[DataType] = Array[DataType]( + IntDataType.INTEGER, + IntDataType.INTEGER, + new Decimal64DataType(18, 3), + new Decimal128DataType(28, 11)) + val inputTypes = DataTypeSerializer.serialize(types) when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) @@ -146,22 +149,22 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { } test("write empty column batch") { - val vectorPid0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector() - val vector0_1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector() - val vector0_2 = ColumnarShuffleWriterSuite.initOmniColumnIntVector() - val vector0_3 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector() - val vector0_4 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector() - - val vectorPid1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector() - val vector1_1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector() - val vector1_2 = ColumnarShuffleWriterSuite.initOmniColumnIntVector() - val vector1_3 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector() - val vector1_4 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector() + val vectorPid0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array()) + val vector0_1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array()) + val vector0_2 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array()) + val vector0_3 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(Array()) + val vector0_4 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(Array()) + + val vectorPid1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array()) + val vector1_1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array()) + val vector1_2 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array()) + val vector1_3 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(Array()) + val vector1_4 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(Array()) val cb0 = ColumnarShuffleWriterSuite.makeColumnarBatch( - vectorPid0.getVec.getSize,List(vectorPid0, vector0_1, vector0_2, vector0_3, vector0_4)) + vectorPid0.getVec.getSize, List(vectorPid0, vector0_1, vector0_2, vector0_3, vector0_4)) val cb1 = ColumnarShuffleWriterSuite.makeColumnarBatch( - vectorPid1.getVec.getSize,List(vectorPid1, vector1_1, vector1_2, vector1_3, vector1_4)) + vectorPid1.getVec.getSize, List(vectorPid1, vector1_1, vector1_2, vector1_3, vector1_4)) def records: Iterator[(Int, ColumnarBatch)] = Iterator((0, cb0), (0, cb1)) @@ -184,21 +187,21 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { } test("write with some empty partitions") { - val vectorPid0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(0, 0, 1, 1) - val vector0_1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(null, null, null, null) - val vector0_2 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(100, 100, null, null) - val vector0_3 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(100L, 100L, 100L, 100L) - val vector0_4 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(Array(100L, 100L), Array(100L, 100L), null, null) + val vectorPid0 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array(0, 0, 1, 1)) + val vector0_1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array(null, null, null, null)) + val vector0_2 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array(100, 100, null, null)) + val vector0_3 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(Array(100L, 100L, 100L, 100L)) + val vector0_4 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(Array(Array(100L, 100L), Array(100L, 100L), null, null)) val cb0 = ColumnarShuffleWriterSuite.makeColumnarBatch( - vectorPid0.getVec.getSize,List(vectorPid0, vector0_1, vector0_2, vector0_3, vector0_4)) + vectorPid0.getVec.getSize, List(vectorPid0, vector0_1, vector0_2, vector0_3, vector0_4)) - val vectorPid1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(0, 0, 1, 1) - val vector1_1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(null, null, null, null) - val vector1_2 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(100, 100, null, null) - val vector1_3 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(100L, 100L, 100L, 100L) - val vector1_4 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(Array(100L, 100L), Array(100L, 100L), null, null) + val vectorPid1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array(0, 0, 1, 1)) + val vector1_1 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array(null, null, null, null)) + val vector1_2 = ColumnarShuffleWriterSuite.initOmniColumnIntVector(Array(100, 100, null, null)) + val vector1_3 = ColumnarShuffleWriterSuite.initOmniColumnDecimal64Vector(Array(100L, 100L, 100L, 100L)) + val vector1_4 = ColumnarShuffleWriterSuite.initOmniColumnDecimal128Vector(Array(Array(100L, 100L), Array(100L, 100L), null, null)) val cb1 = ColumnarShuffleWriterSuite.makeColumnarBatch( - vectorPid1.getVec.getSize,List(vectorPid1, vector1_1, vector1_2, vector1_3, vector1_4)) + vectorPid1.getVec.getSize, List(vectorPid1, vector1_1, vector1_2, vector1_3, vector1_4)) def records: Iterator[(Int, ColumnarBatch)] = Iterator((0, cb0), (0, cb1)) @@ -234,11 +237,7 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { assert(batch.numRows == 4) assert(batch.numCols == 4) (0 until batch.numCols).foreach { i => - val valueVector = - batch - .column(i) - .asInstanceOf[OmniColumnVector] - .getVec + val valueVector = batch.column(i).asInstanceOf[OmniColumnVector].getVec assert(valueVector.getSize == batch.numRows) } batch.close() @@ -252,12 +251,29 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { } object ColumnarShuffleWriterSuite { - def initOmniColumnIntVector(values: Integer*): OmniColumnVector = { + def initOmniColumnBooleanVector(values: Array[java.lang.Boolean]): OmniColumnVector = { + val length = values.length + val vecTmp = new BooleanVec(length) + (0 until length).foreach { i => + if (values(i) != null) { + vecTmp.set(i, values(i)) + } else { + vecTmp.setNull(i) + } + } + val colVecTmp = new OmniColumnVector(length, BooleanType, false) + colVecTmp.setVec(vecTmp) + colVecTmp + } + + def initOmniColumnIntVector(values: Array[java.lang.Integer]): OmniColumnVector = { val length = values.length val vecTmp = new IntVec(length) (0 until length).foreach { i => if (values(i) != null) { - vecTmp.set(i, values(i).asInstanceOf[Int]) + vecTmp.set(i, values(i)) + } else { + vecTmp.setNull(i) } } val colVecTmp = new OmniColumnVector(length, IntegerType, false) @@ -265,12 +281,74 @@ object ColumnarShuffleWriterSuite { colVecTmp } - def initOmniColumnDecimal64Vector(values: java.lang.Long*): OmniColumnVector = { + def initOmniColumnShortVector(values: Array[java.lang.Integer]): OmniColumnVector = { + val length = values.length + val vecTmp = new ShortVec(length) + (0 until length).foreach { i => + if (values(i) != null) { + vecTmp.set(i, values(i).shortValue()) + } else { + vecTmp.setNull(i) + } + } + val colVecTmp = new OmniColumnVector(length, ShortType, false) + colVecTmp.setVec(vecTmp) + colVecTmp + } + + def initOmniColumnLongVector(values: Array[java.lang.Long]): OmniColumnVector = { val length = values.length val vecTmp = new LongVec(length) (0 until length).foreach { i => if (values(i) != null) { - vecTmp.set(i, values(i).asInstanceOf[Long]) + vecTmp.set(i, values(i)) + } else { + vecTmp.setNull(i) + } + } + val colVecTmp = new OmniColumnVector(length, LongType, false) + colVecTmp.setVec(vecTmp) + colVecTmp + } + + def initOmniColumnDoubleVector(values: Array[java.lang.Double]): OmniColumnVector = { + val length = values.length + val vecTmp = new DoubleVec(length) + (0 until length).foreach { i => + if (values(i) != null) { + vecTmp.set(i, values(i)) + } else { + vecTmp.setNull(i) + } + } + val colVecTmp = new OmniColumnVector(length, DoubleType, false) + colVecTmp.setVec(vecTmp) + colVecTmp + } + + def initOmniColumnVarcharVector(values: Array[java.lang.String]): OmniColumnVector = { + val length = values.length + val vecTmp = new VarcharVec(1024, length) + (0 until length).foreach { i => + if (values(i) != null) { + vecTmp.set(i, values(i).getBytes()) + } else { + vecTmp.setNull(i) + } + } + val colVecTmp = new OmniColumnVector(length, StringType, false) + colVecTmp.setVec(vecTmp) + colVecTmp + } + + def initOmniColumnDecimal64Vector(values: Array[java.lang.Long]): OmniColumnVector = { + val length = values.length + val vecTmp = new LongVec(length) + (0 until length).foreach { i => + if (values(i) != null) { + vecTmp.set(i, values(i)) + } else { + vecTmp.setNull(i) } } val colVecTmp = new OmniColumnVector(length, DecimalType(18, 3), false) @@ -278,12 +356,14 @@ object ColumnarShuffleWriterSuite { colVecTmp } - def initOmniColumnDecimal128Vector(values: Array[Long]*): OmniColumnVector = { + def initOmniColumnDecimal128Vector(values: Array[Array[Long]]): OmniColumnVector = { val length = values.length val vecTmp = new Decimal128Vec(length) (0 until length).foreach { i => if (values(i) != null) { vecTmp.set(i, values(i)) + } else { + vecTmp.setNull(i) } } val colVecTmp = new OmniColumnVector(length, DecimalType(28, 11), false)