diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index 0f03512410d0c403945f2cec3993c6638b17a8af..49b39abb764a12f599c16ef5240c796f88871161 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -46,10 +46,10 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { return JNI_ERR; } - illegal_access_exception_class = + illegal_access_exception_class = CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); - split_result_class = + split_result_class = CreateGlobalClassReference(env, "Lcom/huawei/boostkit/spark/vectorized/SplitResult;"); split_result_constructor = GetMethodID(env, split_result_class, "", "(JJJJJ[J)V"); @@ -86,13 +86,15 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( DataTypes inputVecTypes = Deserialize(inputTypeCharPtr); const int32_t *inputVecTypeIds = inputVecTypes.GetIds(); // - std::vector inputDataTpyes = inputVecTypes.Get(); + std::vector inputDataTpyes = inputVecTypes.Get(); int32_t size = inputDataTpyes.size(); uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; for (int i = 0; i < size; ++i) { - inputDataPrecisions[i] = inputDataTpyes[i].GetPrecision(); - inputDataScales[i] = inputDataTpyes[i].GetScale(); + if(inputDataTpyes[i]->GetId() == OMNI_DECIMAL64 || inputDataTpyes[i]->GetId() == OMNI_DECIMAL128) { + inputDataScales[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetScale(); + inputDataPrecisions[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetPrecision(); + } } inputDataTpyes.clear(); @@ -207,10 +209,10 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_stop( split_result_class, split_result_constructor, splitter->TotalComputePidTime(), splitter->TotalWriteTime(), splitter->TotalSpillTime(), splitter->TotalBytesWritten(), splitter->TotalBytesSpilled(), partition_length_arr); - + return split_result; } - + JNIEXPORT void JNICALL Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_close( JNIEnv* env, jobject, jlong splitter_id) { diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 5fdff534440c394fb8aaddd2311badfca57bb00c..b28cced84fc5cab9823038c7b852e681fa24bcca 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -765,8 +765,10 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptrset_typeid_(CastShuffleTypeIdToVecType(vector_batch_col_types_[indexSchema])); LogsDebug("precision[indexSchema %d]: %d ", indexSchema, input_col_types.inputDataPrecisions[indexSchema]); LogsDebug("scale[indexSchema %d]: %d ", indexSchema, input_col_types.inputDataScales[indexSchema]); - vt->set_precision(input_col_types.inputDataPrecisions[indexSchema]); - vt->set_scale(input_col_types.inputDataScales[indexSchema]); + if(vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL128 || vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL64){ + vt->set_precision(input_col_types.inputDataPrecisions[indexSchema]); + vt->set_scale(input_col_types.inputDataScales[indexSchema]); + } } curBatch++; diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp index 5f7458566a44e7b4f829b4da6163788d060c0c9d..875484a23a0a734c71a6b62c0361fdad3387cacf 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp @@ -21,17 +21,17 @@ using namespace omniruntime::vec; -void ToVectorTypes(const int32_t *dataTypeIds, int32_t dataTypeCount, std::vector &dataTypes) +void ToVectorTypes(const int32_t *dataTypeIds, int32_t dataTypeCount, std::vector &dataTypes) { for (int i = 0; i < dataTypeCount; ++i) { if (dataTypeIds[i] == OMNI_VARCHAR) { - dataTypes.push_back(VarcharDataType(50)); + dataTypes.push_back(std::make_shared(50)); continue; } else if (dataTypeIds[i] == OMNI_CHAR) { - dataTypes.push_back(CharDataType(50)); + dataTypes.push_back(std::make_shared(50)); continue; } - dataTypes.push_back(DataType(dataTypeIds[i])); + dataTypes.push_back(std::make_shared(dataTypeIds[i])); } } @@ -41,7 +41,7 @@ VectorBatch* CreateInputData(const int32_t numRows, int64_t* allData) { auto *vecBatch = new VectorBatch(numCols, numRows); - vector inputTypes; + vector inputTypes; ToVectorTypes(inputTypeIds, numCols, inputTypes); vecBatch->NewVectors(omniruntime::vec::GetProcessGlobalVecAllocator(), inputTypes); for (int i = 0; i < numCols; ++i) { @@ -123,13 +123,13 @@ Vector *CreateVector(DataType &vecType, int32_t rowCount, va_list &args) } } -DictionaryVector *CreateDictionaryVector(DataType &vecType, int32_t rowCount, int32_t *ids, int32_t idsCount, ...) +DictionaryVector *CreateDictionaryVector(DataType &dataType, int32_t rowCount, int32_t *ids, int32_t idsCount, ...) { va_list args; va_start(args, idsCount); - Vector *dictionary = CreateVector(vecType, rowCount, args); + Vector *dictionary = CreateVector(dataType, rowCount, args); va_end(args); - auto vec = std::make_unique(dictionary, ids, idsCount).release(); + auto vec = new DictionaryVector(dictionary, ids, idsCount); delete dictionary; return vec; } @@ -199,15 +199,15 @@ Vector *buildVector(const DataType &aggType, int32_t rowNumber) } } -VectorBatch *CreateVectorBatch(DataTypes &types, int32_t rowCount, ...) +VectorBatch *CreateVectorBatch(const DataTypes &types, int32_t rowCount, ...) { int32_t typesCount = types.GetSize(); - VectorBatch *vectorBatch = std::make_unique(typesCount).release(); + auto *vectorBatch = new VectorBatch(typesCount, rowCount); va_list args; va_start(args, rowCount); for (int32_t i = 0; i < typesCount; i++) { - DataType type = types.Get()[i]; - vectorBatch->SetVector(i, CreateVector(type, rowCount, args)); + DataTypePtr type = types.GetType(i); + vectorBatch->SetVector(i, CreateVector(*type, rowCount, args)); } va_end(args); return vectorBatch; @@ -502,7 +502,7 @@ VectorBatch* CreateVectorBatch_2dictionaryCols_withPid(int partitionNum) { int32_t data0[dataSize] = {111, 112, 113, 114, 115, 116}; int64_t data1[dataSize] = {221, 222, 223, 224, 225, 226}; void *datas[2] = {data0, data1}; - DataTypes sourceTypes(std::vector({ IntDataType(), LongDataType()})); + DataTypes sourceTypes(std::vector({ std::make_unique(), std::make_unique()})); int32_t ids[] = {0, 1, 2, 3, 4, 5}; VectorBatch *vectorBatch = new VectorBatch(3, dataSize); VectorAllocator *allocator = omniruntime::vec::GetProcessGlobalVecAllocator(); @@ -514,7 +514,7 @@ VectorBatch* CreateVectorBatch_2dictionaryCols_withPid(int partitionNum) { if (i == 0) { vectorBatch->SetVector(i, intVectorTmp); } else { - omniruntime::vec::DataType dataType = sourceTypes.Get()[i - 1]; + omniruntime::vec::DataType dataType = *(sourceTypes.Get()[i - 1]); vectorBatch->SetVector(i, CreateDictionaryVector(dataType, dataSize, ids, dataSize, datas[i - 1])); } } diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h index 042dc4a142ebaa20cabba735894f4c0b221b1ab7..428d81c46a9ccbd72a7def9a5c0b3dab574a40b7 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "../../src/shuffle/splitter.h" #include "../../src/jni/concurrent_map.h" diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index ecffee407196897db1ee503a474d9a73e54b2030..f780e6e5fe48aefdf1ea93acfef793169cc884b0 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -155,7 +155,6 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { val maxRowCount = conf.getConfString("spark.sql.columnar.maxRowCount", "20000").toInt - val enableJit: Boolean = conf.getConfString("spark.omni.sql.columnar.jit", "false").toBoolean val enableDecimalCheck : Boolean = conf.getConfString("spark.omni.sql.decimal.constraint.check", "true").toBoolean } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala index 1460c618d401de3bdd13d18eb97445a08642e030..4112d669a0884bde80afad99144a3916ebb19f3d 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala @@ -32,7 +32,6 @@ object Constant { val OMNI_DOUBLE_TYPE: String = DataTypeId.OMNI_DOUBLE.ordinal().toString val OMNI_BOOLEAN_TYPE: String = DataTypeId.OMNI_BOOLEAN.ordinal().toString val OMNI_DATE_TYPE: String = DataTypeId.OMNI_DATE32.ordinal().toString - val IS_ENABLE_JIT: Boolean = ColumnarPluginConfig.getSessionConf.enableJit val IS_DECIMAL_CHECK: Boolean = ColumnarPluginConfig.getSessionConf.enableDecimalCheck val IS_SKIP_VERIFY_EXP: Boolean = true val OMNI_DECIMAL64_TYPE: String = DataTypeId.OMNI_DECIMAL64.ordinal().toString diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala index 96d3189b17f218bd4d2f4ca4467d88f28f799498..73c3d6cb243363ed8b1489e0f6a08a5abab82761 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.filter.OmniFilterAndProjectOperatorFactory import nova.hetu.omniruntime.vector.VecBatch @@ -195,7 +195,7 @@ case class ColumnarFilterExec(condition: Expression, child: SparkPlan) child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() val filterOperatorFactory = new OmniFilterAndProjectOperatorFactory( - omniExpression, omniInputTypes, seqAsJavaList(omniProjectIndices), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + omniExpression, omniInputTypes, seqAsJavaList(omniProjectIndices), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val filterOperator = filterOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) @@ -293,7 +293,7 @@ case class ColumnarConditionProjectExec(projectList: Seq[NamedExpression], child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() val operatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniInputTypes, seqAsJavaList(omniExpressions), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniInputTypes, seqAsJavaList(omniExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val operator = operatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala index e3039b3aa62b47c1a11649794e617a6d93a48a40..bac1f1408b76f04e6789a98ec53839e46878bad8 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala @@ -1,9 +1,9 @@ package org.apache.spark.sql.execution -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, getExprIdMap, rewriteToOmniJsonExpressionLiteral, sparkTypeToOmniType} import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.{LongVec, Vec, VecBatch} import org.apache.spark.rdd.RDD @@ -80,7 +80,7 @@ case class ColumnarExpandExec( child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() var projectOperators = omniExpressions.map(exps => { - val factory = new OmniProjectOperatorFactory(exps, omniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val factory = new OmniProjectOperatorFactory(exps, omniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) factory.createOperator }) omniCodegenTimeMetric += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala index cf78751c3db301fdc2fdd060aaa717a12d617ff1..21da4d9ec4c078b9fc8e5816841b4adea14af76e 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import java.util.Optional import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import scala.collection.mutable.HashMap @@ -32,7 +32,7 @@ import nova.hetu.omniruntime.vector.{Vec, VecBatch} import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil._ import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory @@ -833,14 +833,14 @@ case class ColumnarMultipleOperatorExec( omniAggOutputTypes, omniAggInputRaw, omniAggOutputPartial, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val aggOperator = aggFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { aggOperator.close() }) - val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator1 = projectOperatorFactory1.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -849,7 +849,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory1 = new OmniHashBuilderWithExprOperatorFactory(buildTypes1, buildJoinColsExp1, if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp1 = buildOpFactory1.createOperator() buildData1.value.foreach { input => buildOp1.addInput(deserializer.deserialize(input)) @@ -857,7 +857,7 @@ case class ColumnarMultipleOperatorExec( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, OMNI_JOIN_TYPE_INNER, buildOpFactory1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -867,7 +867,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory1.close() }) - val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator2 = projectOperatorFactory2.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -876,7 +876,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory2 = new OmniHashBuilderWithExprOperatorFactory(buildTypes2, buildJoinColsExp2, if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp2 = buildOpFactory2.createOperator() buildData2.value.foreach { input => buildOp2.addInput(deserializer.deserialize(input)) @@ -884,7 +884,7 @@ case class ColumnarMultipleOperatorExec( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, OMNI_JOIN_TYPE_INNER, buildOpFactory2, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -894,7 +894,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory2.close() }) - val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator3 = projectOperatorFactory3.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -903,7 +903,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory3 = new OmniHashBuilderWithExprOperatorFactory(buildTypes3, buildJoinColsExp3, if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp3 = buildOpFactory3.createOperator() buildData3.value.foreach { input => buildOp3.addInput(deserializer.deserialize(input)) @@ -911,7 +911,7 @@ case class ColumnarMultipleOperatorExec( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, OMNI_JOIN_TYPE_INNER, buildOpFactory3, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -921,7 +921,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory3.close() }) - val projectOperatorFactory4 = new OmniProjectOperatorFactory(proj4OmniExpressions, proj4OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory4 = new OmniProjectOperatorFactory(proj4OmniExpressions, proj4OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator4 = projectOperatorFactory4.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -930,7 +930,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory4 = new OmniHashBuilderWithExprOperatorFactory(buildTypes4, buildJoinColsExp4, if (joinFilter4.nonEmpty) {Optional.of(joinFilter4.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp4 = buildOpFactory4.createOperator() buildData4.value.foreach { input => buildOp4.addInput(deserializer.deserialize(input)) @@ -938,7 +938,7 @@ case class ColumnarMultipleOperatorExec( buildOp4.getOutput val lookupOpFactory4 = new OmniLookupJoinWithExprOperatorFactory(probeTypes4, probeOutputCols4, probeHashColsExp4, buildOutputCols4, buildOutputTypes4, OMNI_JOIN_TYPE_INNER, buildOpFactory4, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp4 = lookupOpFactory4.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -949,7 +949,7 @@ case class ColumnarMultipleOperatorExec( }) val condOperatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val condOperator = condOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator @@ -1168,14 +1168,14 @@ case class ColumnarMultipleOperatorExec1( omniAggOutputTypes, omniAggInputRaw, omniAggOutputPartial, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val aggOperator = aggFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { aggOperator.close() }) - val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator1 = projectOperatorFactory1.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1184,7 +1184,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory1 = new OmniHashBuilderWithExprOperatorFactory(buildTypes1, buildJoinColsExp1, if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp1 = buildOpFactory1.createOperator() buildData1.value.foreach { input => buildOp1.addInput(deserializer.deserialize(input)) @@ -1192,7 +1192,7 @@ case class ColumnarMultipleOperatorExec1( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, OMNI_JOIN_TYPE_INNER, buildOpFactory1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1202,7 +1202,7 @@ case class ColumnarMultipleOperatorExec1( lookupOpFactory1.close() }) - val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator2 = projectOperatorFactory2.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1211,7 +1211,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory2 = new OmniHashBuilderWithExprOperatorFactory(buildTypes2, buildJoinColsExp2, if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp2 = buildOpFactory2.createOperator() buildData2.value.foreach { input => buildOp2.addInput(deserializer.deserialize(input)) @@ -1219,7 +1219,7 @@ case class ColumnarMultipleOperatorExec1( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, OMNI_JOIN_TYPE_INNER, buildOpFactory2, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1229,7 +1229,7 @@ case class ColumnarMultipleOperatorExec1( lookupOpFactory2.close() }) - val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator3 = projectOperatorFactory3.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1238,7 +1238,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory3 = new OmniHashBuilderWithExprOperatorFactory(buildTypes3, buildJoinColsExp3, if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp3 = buildOpFactory3.createOperator() buildData3.value.foreach { input => buildOp3.addInput(deserializer.deserialize(input)) @@ -1246,7 +1246,7 @@ case class ColumnarMultipleOperatorExec1( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, OMNI_JOIN_TYPE_INNER, buildOpFactory3, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1257,7 +1257,7 @@ case class ColumnarMultipleOperatorExec1( }) val condOperatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val condOperator = condOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala index 4389500af84ebc878148e7ff4a8fde74de3d1322..1ecc0923d524c5c75291667c7a02f49c4adc9df0 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala @@ -18,13 +18,13 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.FunctionType import nova.hetu.omniruntime.operator.aggregator.OmniHashAggregationWithExprOperatorFactory -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -218,7 +218,7 @@ case class ColumnarHashAggregateExec( omniAggOutputTypes, omniInputRaw, omniOutputPartial, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val operator = factory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala index 6c8805589104e4bda04fd1d1776565df36ebcc58..ca04853844f27dfd1e65e488159788d51dddb036 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.sql.execution.metric.SQLMetric @@ -42,7 +42,7 @@ object ColumnarProjection { omniExpressions: Array[String], iter: Iterator[ColumnarBatch], schema: StructType): Iterator[ColumnarBatch] = { val startCodegen = System.nanoTime() - val projectOperatorFactory = new OmniProjectOperatorFactory(omniExpressions, omniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory = new OmniProjectOperatorFactory(omniExpressions, omniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator = projectOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 96cb162a34b8db2f3d03a2c45024b27dd6ab81f3..5f0231b50279bd1784e4ef892adc60b8bde6cb47 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import com.huawei.boostkit.spark.ColumnarPluginConfig import java.util.Random -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import scala.collection.JavaConverters._ import scala.concurrent.Future @@ -29,7 +29,7 @@ import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import com.huawei.boostkit.spark.vectorized.PartitionInfo import nova.hetu.omniruntime.`type`.{DataType, DataTypeSerializer} -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.{IntVec, VecBatch} import org.apache.spark._ @@ -289,7 +289,7 @@ object ColumnarShuffleExchangeExec extends Logging { // omni project val genHashExpression = genHashExpr() val omniExpr: String = genHashExpression(expressions, numPartitions, defaultMm3HashSeed, outputAttributes) - val factory = new OmniProjectOperatorFactory(Array(omniExpr), inputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val factory = new OmniProjectOperatorFactory(Array(omniExpr), inputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val op = factory.createOperator() cbIter.map { cb => diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala index 3edf533c2b933544c2822a540de99343154d8bba..165b6a4c02ca4319398b3b7ec74ed78f5c802c75 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala @@ -22,7 +22,7 @@ import java.util.UUID import java.util.concurrent.TimeUnit.NANOSECONDS import com.huawei.boostkit.spark.ColumnarPluginConfig -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{addAllAndGetIterator, genSortParam} import nova.hetu.omniruntime.operator.config.{OperatorConfig, SparkSpillConfig} import nova.hetu.omniruntime.operator.sort.OmniSortWithExprOperatorFactory @@ -119,7 +119,7 @@ case class ColumnarSortExec( sortSpillDirDiskReserveSize, sortSpillRowThreshold) val startCodegen = System.nanoTime() val sortOperatorFactory = new OmniSortWithExprOperatorFactory(sourceTypes, outputCols, - sortColsExp, ascendings, nullFirsts, new OperatorConfig(IS_ENABLE_JIT, sparkSpillConf, IS_SKIP_VERIFY_EXP)) + sortColsExp, ascendings, nullFirsts, new OperatorConfig(sparkSpillConf, IS_SKIP_VERIFY_EXP)) val sortOperator = sortOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala index e0d920e014d77c2271a0e7a68f897a975c488dbb..3ae42ed3d2f2670d313f513f9282905eb53241d5 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala @@ -18,12 +18,12 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, getExprIdMap, rewriteToOmniJsonExpressionLiteral, sparkTypeToOmniType} import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{addAllAndGetIterator, genSortParam} import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.topn.OmniTopNWithExprOperatorFactory import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -108,7 +108,7 @@ case class ColumnarTakeOrderedAndProjectExec( def computeTopN(iter: Iterator[ColumnarBatch], schema: StructType): Iterator[ColumnarBatch] = { val startCodegen = System.nanoTime() val topNOperatorFactory = new OmniTopNWithExprOperatorFactory(sourceTypes, limit, - sortColsExp, ascendings, nullFirsts, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + sortColsExp, ascendings, nullFirsts, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val topNOperator = topNOperatorFactory.createOperator longMetric("omniCodegenTime") += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala index 851973bd5c04d42e694465395abe563e3f176ea8..cc8491268b4c7448bf0771258eb90f5e7e68257d 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.{FunctionType, OmniWindowFrameBoundType, OmniWindowFrameType} -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.window.OmniWindowWithExprOperatorFactory import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.rdd.RDD @@ -343,7 +343,7 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], windowFunType, omminPartitionChannels, preGroupedChannels, sortCols, ascendings, nullFirsts, 0, 10000, windowArgKeys, windowFunRetType, windowFrameTypes, windowFrameStartTypes, windowFrameStartChannels, windowFrameEndTypes, - windowFrameEndChannels, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + windowFrameEndChannels, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val windowOperator = windowOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index b10b9cc3685630f56025281302d912b222d585cc..e632831a0cea3b640eace6f942030e4966c5b8ca 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -21,7 +21,7 @@ import com.huawei.boostkit.spark.ColumnarPluginConfig import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.Optional -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import scala.collection.mutable import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor @@ -29,7 +29,7 @@ import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.checkOmniJsonW import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.vector.VecBatch import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory @@ -290,7 +290,7 @@ case class ColumnarBroadcastHashJoinExec( } val startBuildCodegen = System.nanoTime() val buildOpFactory = new OmniHashBuilderWithExprOperatorFactory(buildTypes, - buildJoinColsExp, filter, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + buildJoinColsExp, filter, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp = buildOpFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) @@ -307,7 +307,7 @@ case class ColumnarBroadcastHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, buildOutputCols, buildOutputTypes, OMNI_JOIN_TYPE_INNER, buildOpFactory, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala index ac5d5fdbc1bc3f6e503ebcd84ff2f88194f162dc..cd81a8e25d6549fbcfd86c989e98379b32663dd0 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution.joins import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.Optional -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.TaskContext @@ -189,7 +189,7 @@ case class ColumnarShuffledHashJoinExec( } val startBuildCodegen = System.nanoTime() val buildOpFactory = new OmniHashBuilderWithExprOperatorFactory(buildTypes, - buildJoinColsExp, filter, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + buildJoinColsExp, filter, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp = buildOpFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) @@ -214,7 +214,7 @@ case class ColumnarShuffledHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, buildOutputCols, buildOutputTypes, OMNI_JOIN_TYPE_INNER, buildOpFactory, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala index 2a16a0dbf2bd1ddab91e410f6def2c8630ff2e8c..dfe539662b0616781e96039d31b8e86147718df8 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala @@ -21,13 +21,13 @@ import com.huawei.boostkit.spark.ColumnarPluginConfig import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.Optional -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.checkOmniJsonWhiteList import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniSmjBufferedTableWithExprOperatorFactory, OmniSmjStreamedTableWithExprOperatorFactory} import nova.hetu.omniruntime.vector.{BooleanVec, Decimal128Vec, DoubleVec, IntVec, LongVec, VarcharVec, Vec, VecBatch} import org.apache.spark.rdd.RDD @@ -173,13 +173,13 @@ class ColumnarSortMergeJoinExec( val filter: Optional[String] = Optional.ofNullable(filterString) val startStreamedCodegen = System.nanoTime() val streamedOpFactory = new OmniSmjStreamedTableWithExprOperatorFactory(streamedTypes, - streamedKeyColsExp, streamedOutputChannel, OMNI_JOIN_TYPE_INNER, filter, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + streamedKeyColsExp, streamedOutputChannel, OMNI_JOIN_TYPE_INNER, filter, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val streamedOp = streamedOpFactory.createOperator streamedCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startStreamedCodegen) val startBufferedCodegen = System.nanoTime() val bufferedOpFactory = new OmniSmjBufferedTableWithExprOperatorFactory(bufferedTypes, - bufferedKeyColsExp, bufferedOutputChannel, streamedOpFactory, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + bufferedKeyColsExp, bufferedOutputChannel, streamedOpFactory, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val bufferedOp = bufferedOpFactory.createOperator bufferedCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBufferedCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala index 6dddff494d23f9869398fe19cb49767d1d31967d..6e55a49f737461b3e1172aeff9f2566288cfb8f1 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala @@ -78,10 +78,10 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { shuffleHandle = new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) - val inputTypes = "[{\"id\":\"OMNI_INT\",\"width\":0,\"precision\":0,\"scale\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":\"OMNI_INT\",\"width\":0,\"precision\":0,\"scale\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":\"OMNI_DECIMAL64\",\"width\":0,\"precision\":18,\"scale\":3,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":\"OMNI_DECIMAL128\",\"width\":0,\"precision\":28,\"scale\":11,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}]" + val inputTypes = "[{\"id\":1}," + + "{\"id\":1}," + + "{\"id\":6,\"precision\":18,\"scale\":3}," + + "{\"id\":7,\"precision\":28,\"scale\":11}]" when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf))