From 59c42618382dbce7082fa633df71a872ea30636a Mon Sep 17 00:00:00 2001 From: ruanrunxue Date: Fri, 12 Aug 2022 15:31:08 +0800 Subject: [PATCH 1/7] adapt omni decimal refector --- .../expression/OmniExpressionAdaptor.scala | 52 ++++--------------- 1 file changed, 11 insertions(+), 41 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index c760d6ee5..853462e3c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -328,22 +328,18 @@ object OmniExpressionAdaptor extends Logging { expr match { case unscaledValue: UnscaledValue => ("{\"exprType\":\"FUNCTION\",\"returnType\":%s," + - "\"function_name\":\"UnscaledValue\", \"arguments\":[%s, %s, %s]}") + "\"function_name\":\"UnscaledValue\", \"arguments\":[%s]}") .format(sparkTypeToOmniExpJsonType(unscaledValue.dataType), - rewriteToOmniJsonExpressionLiteral(unscaledValue.child, exprsIndexMap), - toOmniJsonLiteral( - Literal(unscaledValue.child.dataType.asInstanceOf[DecimalType].precision, IntegerType)), - toOmniJsonLiteral( - Literal(unscaledValue.child.dataType.asInstanceOf[DecimalType].scale, IntegerType))) + rewriteToOmniJsonExpressionLiteral(unscaledValue.child, exprsIndexMap)) // omni not support return null, now rewrite to if(IsOverflowDecimal())? NULL:MakeDecimal() case checkOverflow: CheckOverflow => ("{\"exprType\":\"IF\",\"returnType\":%s," + "\"condition\":{\"exprType\":\"FUNCTION\",\"returnType\":%s," + - "\"function_name\":\"IsOverflowDecimal\",\"arguments\":[%s,%s,%s,%s,%s]}," + + "\"function_name\":\"IsOverflowDecimal\",\"arguments\":[%s,%s,%s]}," + "\"if_true\":%s," + "\"if_false\":{\"exprType\":\"FUNCTION\",\"returnType\":%s," + - "\"function_name\":\"MakeDecimal\", \"arguments\":[%s,%s,%s,%s,%s]}" + + "\"function_name\":\"MakeDecimal\", \"arguments\":[%s]}" + "}") .format(sparkTypeToOmniExpJsonType(checkOverflow.dataType), // IsOverflowDecimal returnType @@ -355,10 +351,6 @@ object OmniExpressionAdaptor extends Logging { Literal(checkOverflow.dataType.precision, IntegerType)), toOmniJsonLiteral( Literal(checkOverflow.dataType.scale, IntegerType)), - toOmniJsonLiteral( - Literal(checkOverflow.dataType.precision, IntegerType)), - toOmniJsonLiteral( - Literal(checkOverflow.dataType.scale, IntegerType)), // if_true toOmniJsonLiteral( Literal(null, checkOverflow.dataType)), @@ -367,41 +359,21 @@ object OmniExpressionAdaptor extends Logging { DecimalType(checkOverflow.dataType.precision, checkOverflow.dataType.scale)), rewriteToOmniJsonExpressionLiteral(checkOverflow.child, exprsIndexMap, - DecimalType(checkOverflow.dataType.precision, checkOverflow.dataType.scale)), - toOmniJsonLiteral( - Literal(checkOverflow.dataType.precision, IntegerType)), - toOmniJsonLiteral( - Literal(checkOverflow.dataType.scale, IntegerType)), - toOmniJsonLiteral( - Literal(checkOverflow.dataType.precision, IntegerType)), - toOmniJsonLiteral( - Literal(checkOverflow.dataType.scale, IntegerType))) + DecimalType(checkOverflow.dataType.precision, checkOverflow.dataType.scale))) case makeDecimal: MakeDecimal => makeDecimal.child.dataType match { case decimalChild: DecimalType => ("{\"exprType\": \"FUNCTION\", \"returnType\":%s," + - "\"function_name\": \"MakeDecimal\", \"arguments\": [%s,%s,%s,%s,%s]}") + "\"function_name\": \"MakeDecimal\", \"arguments\": [%s]}") .format(sparkTypeToOmniExpJsonType(makeDecimal.dataType), - rewriteToOmniJsonExpressionLiteral(makeDecimal.child, exprsIndexMap), - toOmniJsonLiteral( - Literal(decimalChild.precision, IntegerType)), - toOmniJsonLiteral( - Literal(decimalChild.scale, IntegerType)), - toOmniJsonLiteral( - Literal(makeDecimal.precision, IntegerType)), - toOmniJsonLiteral( - Literal(makeDecimal.scale, IntegerType))) + rewriteToOmniJsonExpressionLiteral(makeDecimal.child, exprsIndexMap)) case longChild: LongType => ("{\"exprType\": \"FUNCTION\", \"returnType\":%s," + - "\"function_name\": \"MakeDecimal\", \"arguments\": [%s,%s,%s]}") + "\"function_name\": \"MakeDecimal\", \"arguments\": [%s]}") .format(sparkTypeToOmniExpJsonType(makeDecimal.dataType), - rewriteToOmniJsonExpressionLiteral(makeDecimal.child, exprsIndexMap), - toOmniJsonLiteral( - Literal(makeDecimal.precision, IntegerType)), - toOmniJsonLiteral( - Literal(makeDecimal.scale, IntegerType))) + rewriteToOmniJsonExpressionLiteral(makeDecimal.child, exprsIndexMap)) case _ => throw new UnsupportedOperationException(s"Unsupported datatype for MakeDecimal: ${makeDecimal.child.dataType}") } @@ -530,10 +502,8 @@ object OmniExpressionAdaptor extends Logging { case dt: DecimalType => if (cast.child.dataType.isInstanceOf[DoubleType]) { ("{\"exprType\":\"FUNCTION\",\"returnType\":%s," + - "\"function_name\":\"CAST\", \"arguments\":[%s,%s,%s]}") - .format(returnType, rewriteToOmniJsonExpressionLiteral(cast.child, exprsIndexMap), - toOmniJsonLiteral(Literal(dt.precision, IntegerType)), - toOmniJsonLiteral(Literal(dt.scale, IntegerType))) + "\"function_name\":\"CAST\", \"arguments\":[%s]}") + .format(returnType, rewriteToOmniJsonExpressionLiteral(cast.child, exprsIndexMap)) } else { rewriteToOmniJsonExpressionLiteral( MakeDecimal(cast.child, dt.precision, dt.scale), exprsIndexMap) -- Gitee From a61a5a3999444c05228dc566cb3990ccca358e77 Mon Sep 17 00:00:00 2001 From: ruanrunxue Date: Tue, 23 Aug 2022 19:33:59 +0800 Subject: [PATCH 2/7] feat: adapt OmniOperator support spark overflow config --- .../boostkit/spark/util/OmniAdaptorUtil.scala | 9 +++ .../ColumnarBasicPhysicalOperators.scala | 9 ++- .../sql/execution/ColumnarExpandExec.scala | 6 +- .../ColumnarFileSourceScanExec.scala | 62 +++++++++++-------- .../execution/ColumnarHashAggregateExec.scala | 5 +- .../sql/execution/ColumnarProjection.scala | 6 +- .../ColumnarShuffleExchangeExec.scala | 6 +- .../sql/execution/ColumnarSortExec.scala | 7 ++- .../ColumnarTakeOrderedAndProjectExec.scala | 7 ++- .../sql/execution/ColumnarWindowExec.scala | 9 +-- .../joins/ColumnarBroadcastHashJoinExec.scala | 9 +-- .../joins/ColumnarShuffledHashJoinExec.scala | 9 +-- .../joins/ColumnarSortMergeJoinExec.scala | 9 ++- 13 files changed, 95 insertions(+), 58 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala index 1661874db..027432ce7 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/util/OmniAdaptorUtil.scala @@ -21,12 +21,14 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import nova.hetu.omniruntime.operator.OmniOperator +import nova.hetu.omniruntime.operator.config.OverflowConfig import nova.hetu.omniruntime.vector._ import org.apache.spark.sql.catalyst.expressions.{Attribute, ExprId, SortOrder} import org.apache.spark.sql.execution.datasources.orc.OrcColumnVector import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.vectorized.{OmniColumnVector, OnHeapColumnVector} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} @@ -261,4 +263,11 @@ object OmniAdaptorUtil { } } } + + def overflowConf(): OverflowConfig.OverflowConfigId = { + if (SQLConf.get.ansiEnabled) + OverflowConfig.OverflowConfigId.OVERFLOW_CONFIG_EXCEPTION + else + OverflowConfig.OverflowConfigId.OVERFLOW_CONFIG_NULL + } } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala index 73c3d6cb2..cb23b68f0 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.operator.filter.OmniFilterAndProjectOperatorFactory import nova.hetu.omniruntime.vector.VecBatch @@ -195,7 +196,8 @@ case class ColumnarFilterExec(condition: Expression, child: SparkPlan) child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() val filterOperatorFactory = new OmniFilterAndProjectOperatorFactory( - omniExpression, omniInputTypes, seqAsJavaList(omniProjectIndices), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + omniExpression, omniInputTypes, seqAsJavaList(omniProjectIndices), 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val filterOperator = filterOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) @@ -293,7 +295,8 @@ case class ColumnarConditionProjectExec(projectList: Seq[NamedExpression], child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() val operatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniInputTypes, seqAsJavaList(omniExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniInputTypes, seqAsJavaList(omniExpressions), 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val operator = operatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala index bac1f1408..27b05b16c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala @@ -2,8 +2,9 @@ package org.apache.spark.sql.execution import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, getExprIdMap, rewriteToOmniJsonExpressionLiteral, sparkTypeToOmniType} +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.{LongVec, Vec, VecBatch} import org.apache.spark.rdd.RDD @@ -80,7 +81,8 @@ case class ColumnarExpandExec( child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() var projectOperators = omniExpressions.map(exps => { - val factory = new OmniProjectOperatorFactory(exps, omniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val factory = new OmniProjectOperatorFactory(exps, omniInputTypes, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) factory.createOperator }) omniCodegenTimeMetric += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala index 21da4d9ec..91b615385 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala @@ -30,9 +30,10 @@ import nova.hetu.omniruntime.operator.aggregator.OmniHashAggregationWithExprOper import nova.hetu.omniruntime.operator.filter.OmniFilterAndProjectOperatorFactory import nova.hetu.omniruntime.vector.{Vec, VecBatch} import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil._ import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory @@ -833,14 +834,15 @@ case class ColumnarMultipleOperatorExec( omniAggOutputTypes, omniAggInputRaw, omniAggOutputPartial, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val aggOperator = aggFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { aggOperator.close() }) - val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator1 = projectOperatorFactory1.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -849,7 +851,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory1 = new OmniHashBuilderWithExprOperatorFactory(buildTypes1, buildJoinColsExp1, if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, 1, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val buildOp1 = buildOpFactory1.createOperator() buildData1.value.foreach { input => buildOp1.addInput(deserializer.deserialize(input)) @@ -857,7 +859,7 @@ case class ColumnarMultipleOperatorExec( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, OMNI_JOIN_TYPE_INNER, buildOpFactory1, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -867,7 +869,8 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory1.close() }) - val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator2 = projectOperatorFactory2.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -876,7 +879,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory2 = new OmniHashBuilderWithExprOperatorFactory(buildTypes2, buildJoinColsExp2, if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, 1, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val buildOp2 = buildOpFactory2.createOperator() buildData2.value.foreach { input => buildOp2.addInput(deserializer.deserialize(input)) @@ -884,7 +887,7 @@ case class ColumnarMultipleOperatorExec( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, OMNI_JOIN_TYPE_INNER, buildOpFactory2, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -894,7 +897,8 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory2.close() }) - val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator3 = projectOperatorFactory3.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -903,7 +907,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory3 = new OmniHashBuilderWithExprOperatorFactory(buildTypes3, buildJoinColsExp3, if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, 1, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val buildOp3 = buildOpFactory3.createOperator() buildData3.value.foreach { input => buildOp3.addInput(deserializer.deserialize(input)) @@ -911,7 +915,7 @@ case class ColumnarMultipleOperatorExec( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, OMNI_JOIN_TYPE_INNER, buildOpFactory3, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -921,7 +925,8 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory3.close() }) - val projectOperatorFactory4 = new OmniProjectOperatorFactory(proj4OmniExpressions, proj4OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory4 = new OmniProjectOperatorFactory(proj4OmniExpressions, proj4OmniInputTypes, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator4 = projectOperatorFactory4.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -930,7 +935,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory4 = new OmniHashBuilderWithExprOperatorFactory(buildTypes4, buildJoinColsExp4, if (joinFilter4.nonEmpty) {Optional.of(joinFilter4.get)} else {Optional.empty()}, 1, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val buildOp4 = buildOpFactory4.createOperator() buildData4.value.foreach { input => buildOp4.addInput(deserializer.deserialize(input)) @@ -938,7 +943,7 @@ case class ColumnarMultipleOperatorExec( buildOp4.getOutput val lookupOpFactory4 = new OmniLookupJoinWithExprOperatorFactory(probeTypes4, probeOutputCols4, probeHashColsExp4, buildOutputCols4, buildOutputTypes4, OMNI_JOIN_TYPE_INNER, buildOpFactory4, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp4 = lookupOpFactory4.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -949,7 +954,8 @@ case class ColumnarMultipleOperatorExec( }) val condOperatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val condOperator = condOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator @@ -1168,14 +1174,15 @@ case class ColumnarMultipleOperatorExec1( omniAggOutputTypes, omniAggInputRaw, omniAggOutputPartial, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val aggOperator = aggFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { aggOperator.close() }) - val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator1 = projectOperatorFactory1.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1184,7 +1191,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory1 = new OmniHashBuilderWithExprOperatorFactory(buildTypes1, buildJoinColsExp1, if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, 1, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val buildOp1 = buildOpFactory1.createOperator() buildData1.value.foreach { input => buildOp1.addInput(deserializer.deserialize(input)) @@ -1192,7 +1199,7 @@ case class ColumnarMultipleOperatorExec1( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, OMNI_JOIN_TYPE_INNER, buildOpFactory1, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1202,7 +1209,8 @@ case class ColumnarMultipleOperatorExec1( lookupOpFactory1.close() }) - val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator2 = projectOperatorFactory2.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1211,7 +1219,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory2 = new OmniHashBuilderWithExprOperatorFactory(buildTypes2, buildJoinColsExp2, if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, 1, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val buildOp2 = buildOpFactory2.createOperator() buildData2.value.foreach { input => buildOp2.addInput(deserializer.deserialize(input)) @@ -1219,7 +1227,7 @@ case class ColumnarMultipleOperatorExec1( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, OMNI_JOIN_TYPE_INNER, buildOpFactory2, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1229,7 +1237,8 @@ case class ColumnarMultipleOperatorExec1( lookupOpFactory2.close() }) - val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator3 = projectOperatorFactory3.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1238,7 +1247,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory3 = new OmniHashBuilderWithExprOperatorFactory(buildTypes3, buildJoinColsExp3, if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, 1, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val buildOp3 = buildOpFactory3.createOperator() buildData3.value.foreach { input => buildOp3.addInput(deserializer.deserialize(input)) @@ -1246,7 +1255,7 @@ case class ColumnarMultipleOperatorExec1( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, OMNI_JOIN_TYPE_INNER, buildOpFactory3, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1257,7 +1266,8 @@ case class ColumnarMultipleOperatorExec1( }) val condOperatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val condOperator = condOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala index d93e1dc54..4ea8298dd 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala @@ -20,12 +20,13 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.FunctionType import nova.hetu.omniruntime.constants.FunctionType.OMNI_AGGREGATION_TYPE_COUNT_ALL import nova.hetu.omniruntime.operator.aggregator.OmniHashAggregationWithExprOperatorFactory -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -226,7 +227,7 @@ case class ColumnarHashAggregateExec( omniAggOutputTypes, omniInputRaw, omniOutputPartial, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val operator = factory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala index ca0485384..0ccdbd6de 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.sql.execution.metric.SQLMetric @@ -42,7 +43,8 @@ object ColumnarProjection { omniExpressions: Array[String], iter: Iterator[ColumnarBatch], schema: StructType): Iterator[ColumnarBatch] = { val startCodegen = System.nanoTime() - val projectOperatorFactory = new OmniProjectOperatorFactory(omniExpressions, omniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory = new OmniProjectOperatorFactory(omniExpressions, omniInputTypes, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator = projectOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 5f0231b50..9a52f2c72 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -26,10 +26,11 @@ import scala.collection.JavaConverters._ import scala.concurrent.Future import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import com.huawei.boostkit.spark.vectorized.PartitionInfo import nova.hetu.omniruntime.`type`.{DataType, DataTypeSerializer} -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.{IntVec, VecBatch} import org.apache.spark._ @@ -289,7 +290,8 @@ object ColumnarShuffleExchangeExec extends Logging { // omni project val genHashExpression = genHashExpr() val omniExpr: String = genHashExpression(expressions, numPartitions, defaultMm3HashSeed, outputAttributes) - val factory = new OmniProjectOperatorFactory(Array(omniExpr), inputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val factory = new OmniProjectOperatorFactory(Array(omniExpr), inputTypes, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val op = factory.createOperator() cbIter.map { cb => diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala index 165b6a4c0..7c7001dbc 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala @@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import com.huawei.boostkit.spark.ColumnarPluginConfig import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{addAllAndGetIterator, genSortParam} -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SparkSpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SparkSpillConfig} import nova.hetu.omniruntime.operator.sort.OmniSortWithExprOperatorFactory import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.rdd.RDD @@ -118,8 +119,8 @@ case class ColumnarSortExec( val sparkSpillConf = new SparkSpillConfig(sortSpillEnable, spillPathDir, sortSpillDirDiskReserveSize, sortSpillRowThreshold) val startCodegen = System.nanoTime() - val sortOperatorFactory = new OmniSortWithExprOperatorFactory(sourceTypes, outputCols, - sortColsExp, ascendings, nullFirsts, new OperatorConfig(sparkSpillConf, IS_SKIP_VERIFY_EXP)) + val sortOperatorFactory = new OmniSortWithExprOperatorFactory(sourceTypes, outputCols, sortColsExp, ascendings, nullFirsts, + new OperatorConfig(sparkSpillConf, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val sortOperator = sortOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala index 3ae42ed3d..e4ca7eb9c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala @@ -21,9 +21,10 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, getExprIdMap, rewriteToOmniJsonExpressionLiteral, sparkTypeToOmniType} import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{addAllAndGetIterator, genSortParam} import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.operator.topn.OmniTopNWithExprOperatorFactory import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -107,8 +108,8 @@ case class ColumnarTakeOrderedAndProjectExec( def computeTopN(iter: Iterator[ColumnarBatch], schema: StructType): Iterator[ColumnarBatch] = { val startCodegen = System.nanoTime() - val topNOperatorFactory = new OmniTopNWithExprOperatorFactory(sourceTypes, limit, - sortColsExp, ascendings, nullFirsts, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val topNOperatorFactory = new OmniTopNWithExprOperatorFactory(sourceTypes, limit, sortColsExp, ascendings, nullFirsts, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val topNOperator = topNOperatorFactory.createOperator longMetric("omniCodegenTime") += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala index cc8491268..ee0a4918f 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala @@ -21,10 +21,11 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.{FunctionType, OmniWindowFrameBoundType, OmniWindowFrameType} -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.operator.window.OmniWindowWithExprOperatorFactory import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.rdd.RDD @@ -341,9 +342,9 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], val startCodegen = System.nanoTime() val windowOperatorFactory = new OmniWindowWithExprOperatorFactory(sourceTypes, outputCols, windowFunType, omminPartitionChannels, preGroupedChannels, sortCols, ascendings, - nullFirsts, 0, 10000, windowArgKeys, windowFunRetType, - windowFrameTypes, windowFrameStartTypes, windowFrameStartChannels, windowFrameEndTypes, - windowFrameEndChannels, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + nullFirsts, 0, 10000, windowArgKeys, windowFunRetType, windowFrameTypes, windowFrameStartTypes, + windowFrameStartChannels, windowFrameEndTypes, windowFrameEndChannels, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val windowOperator = windowOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index e632831a0..47da03670 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -26,10 +26,11 @@ import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import scala.collection.mutable import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.checkOmniJsonWhiteList +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.vector.VecBatch import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory @@ -289,8 +290,8 @@ case class ColumnarBroadcastHashJoinExec( case _ => Optional.empty() } val startBuildCodegen = System.nanoTime() - val buildOpFactory = new OmniHashBuilderWithExprOperatorFactory(buildTypes, - buildJoinColsExp, filter, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val buildOpFactory = new OmniHashBuilderWithExprOperatorFactory(buildTypes, buildJoinColsExp, filter, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val buildOp = buildOpFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) @@ -307,7 +308,7 @@ case class ColumnarBroadcastHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, buildOutputCols, buildOutputTypes, OMNI_JOIN_TYPE_INNER, buildOpFactory, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala index cd81a8e25..4199035ec 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala @@ -21,10 +21,11 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.Optional import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.TaskContext @@ -188,8 +189,8 @@ case class ColumnarShuffledHashJoinExec( case _ => Optional.empty() } val startBuildCodegen = System.nanoTime() - val buildOpFactory = new OmniHashBuilderWithExprOperatorFactory(buildTypes, - buildJoinColsExp, filter, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + val buildOpFactory = new OmniHashBuilderWithExprOperatorFactory(buildTypes, buildJoinColsExp, filter, 1, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val buildOp = buildOpFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) @@ -214,7 +215,7 @@ case class ColumnarShuffledHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, buildOutputCols, buildOutputTypes, OMNI_JOIN_TYPE_INNER, buildOpFactory, - new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala index dfe539662..73abadf50 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala @@ -24,10 +24,11 @@ import java.util.Optional import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.checkOmniJsonWhiteList +import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniSmjBufferedTableWithExprOperatorFactory, OmniSmjStreamedTableWithExprOperatorFactory} import nova.hetu.omniruntime.vector.{BooleanVec, Decimal128Vec, DoubleVec, IntVec, LongVec, VarcharVec, Vec, VecBatch} import org.apache.spark.rdd.RDD @@ -173,13 +174,15 @@ class ColumnarSortMergeJoinExec( val filter: Optional[String] = Optional.ofNullable(filterString) val startStreamedCodegen = System.nanoTime() val streamedOpFactory = new OmniSmjStreamedTableWithExprOperatorFactory(streamedTypes, - streamedKeyColsExp, streamedOutputChannel, OMNI_JOIN_TYPE_INNER, filter, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + streamedKeyColsExp, streamedOutputChannel, OMNI_JOIN_TYPE_INNER, filter, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val streamedOp = streamedOpFactory.createOperator streamedCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startStreamedCodegen) val startBufferedCodegen = System.nanoTime() val bufferedOpFactory = new OmniSmjBufferedTableWithExprOperatorFactory(bufferedTypes, - bufferedKeyColsExp, bufferedOutputChannel, streamedOpFactory, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) + bufferedKeyColsExp, bufferedOutputChannel, streamedOpFactory, + new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val bufferedOp = bufferedOpFactory.createOperator bufferedCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBufferedCodegen) -- Gitee From 739032ca626163fbb3cc731512d4fb08289841b3 Mon Sep 17 00:00:00 2001 From: ruanrunxue Date: Tue, 23 Aug 2022 19:42:01 +0800 Subject: [PATCH 3/7] feat: adapt json expression for CAST, CheckOverflow --- .../expression/OmniExpressionAdaptor.scala | 131 +++++++----------- 1 file changed, 49 insertions(+), 82 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index 853462e3c..464a5a5f8 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -43,6 +43,7 @@ object OmniExpressionAdaptor extends Logging { throw new UnsupportedOperationException(s"Unsupported expression: $expr") } } + def getExprIdMap(inputAttrs: Seq[Attribute]): Map[ExprId, Int] = { var attrMap: Map[ExprId, Int] = Map() inputAttrs.zipWithIndex.foreach { case (inputAttr, i) => @@ -51,7 +52,7 @@ object OmniExpressionAdaptor extends Logging { attrMap } - private def DECIMAL_ALLOWEDTYPES: Seq[DecimalType] = Seq(DecimalType(7,2), DecimalType(17,2), DecimalType(21,6), DecimalType(22,6), DecimalType(38,16)) + private def DECIMAL_ALLOWEDTYPES: Seq[DecimalType] = Seq(DecimalType(7, 2), DecimalType(17, 2), DecimalType(21, 6), DecimalType(22, 6), DecimalType(38, 16)) def checkDecimalTypeWhiteList(dt: DecimalType): Unit = { if (!IS_DECIMAL_CHECK) { @@ -109,13 +110,13 @@ object OmniExpressionAdaptor extends Logging { ("MakeDecimal:%s(%s,%s,%s,%s,%s)") .format(sparkTypeToOmniExpJsonType(makeDecimal.dataType), rewriteToOmniExpressionLiteral(makeDecimal.child, exprsIndexMap), - decimalChild.precision,decimalChild.scale, + decimalChild.precision, decimalChild.scale, makeDecimal.precision, makeDecimal.scale) case longChild: LongType => ("MakeDecimal:%s(%s,%s,%s)") .format(sparkTypeToOmniExpJsonType(makeDecimal.dataType), - rewriteToOmniExpressionLiteral(makeDecimal.child, exprsIndexMap), - makeDecimal.precision, makeDecimal.scale) + rewriteToOmniExpressionLiteral(makeDecimal.child, exprsIndexMap), + makeDecimal.precision, makeDecimal.scale) case _ => throw new UnsupportedOperationException(s"Unsupported datatype for MakeDecimal: ${makeDecimal.child.dataType}") } @@ -318,7 +319,7 @@ object OmniExpressionAdaptor extends Logging { } def rewriteToOmniJsonExpressionLiteral(expr: Expression, - exprsIndexMap: Map[ExprId, Int]): String = { + exprsIndexMap: Map[ExprId, Int]): String = { rewriteToOmniJsonExpressionLiteral(expr, exprsIndexMap, expr.dataType) } @@ -334,32 +335,9 @@ object OmniExpressionAdaptor extends Logging { // omni not support return null, now rewrite to if(IsOverflowDecimal())? NULL:MakeDecimal() case checkOverflow: CheckOverflow => - ("{\"exprType\":\"IF\",\"returnType\":%s," + - "\"condition\":{\"exprType\":\"FUNCTION\",\"returnType\":%s," + - "\"function_name\":\"IsOverflowDecimal\",\"arguments\":[%s,%s,%s]}," + - "\"if_true\":%s," + - "\"if_false\":{\"exprType\":\"FUNCTION\",\"returnType\":%s," + - "\"function_name\":\"MakeDecimal\", \"arguments\":[%s]}" + - "}") + ("{\"exprType\":\"IF\",\"returnType\":%s,\"function_name\":\"CheckOverflow\",\"arguments\":[%s]}") .format(sparkTypeToOmniExpJsonType(checkOverflow.dataType), - // IsOverflowDecimal returnType - sparkTypeToOmniExpJsonType(BooleanType), - // IsOverflowDecimal arguments - rewriteToOmniJsonExpressionLiteral(checkOverflow.child, exprsIndexMap, - DecimalType(checkOverflow.dataType.precision, checkOverflow.dataType.scale)), - toOmniJsonLiteral( - Literal(checkOverflow.dataType.precision, IntegerType)), - toOmniJsonLiteral( - Literal(checkOverflow.dataType.scale, IntegerType)), - // if_true - toOmniJsonLiteral( - Literal(null, checkOverflow.dataType)), - // if_false - sparkTypeToOmniExpJsonType( - DecimalType(checkOverflow.dataType.precision, checkOverflow.dataType.scale)), - rewriteToOmniJsonExpressionLiteral(checkOverflow.child, - exprsIndexMap, - DecimalType(checkOverflow.dataType.precision, checkOverflow.dataType.scale))) + rewriteToOmniJsonExpressionLiteral(checkOverflow.child, exprsIndexMap)) case makeDecimal: MakeDecimal => makeDecimal.child.dataType match { @@ -384,86 +362,86 @@ object OmniExpressionAdaptor extends Logging { case sub: Subtract => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"SUBTRACT\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(returnDatatype), - rewriteToOmniJsonExpressionLiteral(sub.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(sub.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(returnDatatype), + rewriteToOmniJsonExpressionLiteral(sub.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(sub.right, exprsIndexMap)) case add: Add => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"ADD\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(returnDatatype), - rewriteToOmniJsonExpressionLiteral(add.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(add.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(returnDatatype), + rewriteToOmniJsonExpressionLiteral(add.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(add.right, exprsIndexMap)) case mult: Multiply => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"MULTIPLY\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(returnDatatype), - rewriteToOmniJsonExpressionLiteral(mult.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(mult.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(returnDatatype), + rewriteToOmniJsonExpressionLiteral(mult.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(mult.right, exprsIndexMap)) case divide: Divide => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"DIVIDE\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(returnDatatype), - rewriteToOmniJsonExpressionLiteral(divide.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(divide.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(returnDatatype), + rewriteToOmniJsonExpressionLiteral(divide.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(divide.right, exprsIndexMap)) case mod: Remainder => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"MODULUS\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(returnDatatype), - rewriteToOmniJsonExpressionLiteral(mod.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(mod.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(returnDatatype), + rewriteToOmniJsonExpressionLiteral(mod.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(mod.right, exprsIndexMap)) case greaterThan: GreaterThan => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"GREATER_THAN\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(greaterThan.dataType), - rewriteToOmniJsonExpressionLiteral(greaterThan.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(greaterThan.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(greaterThan.dataType), + rewriteToOmniJsonExpressionLiteral(greaterThan.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(greaterThan.right, exprsIndexMap)) case greaterThanOrEq: GreaterThanOrEqual => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"GREATER_THAN_OR_EQUAL\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(greaterThanOrEq.dataType), - rewriteToOmniJsonExpressionLiteral(greaterThanOrEq.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(greaterThanOrEq.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(greaterThanOrEq.dataType), + rewriteToOmniJsonExpressionLiteral(greaterThanOrEq.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(greaterThanOrEq.right, exprsIndexMap)) case lessThan: LessThan => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"LESS_THAN\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(lessThan.dataType), - rewriteToOmniJsonExpressionLiteral(lessThan.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(lessThan.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(lessThan.dataType), + rewriteToOmniJsonExpressionLiteral(lessThan.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(lessThan.right, exprsIndexMap)) case lessThanOrEq: LessThanOrEqual => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"LESS_THAN_OR_EQUAL\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(lessThanOrEq.dataType), - rewriteToOmniJsonExpressionLiteral(lessThanOrEq.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(lessThanOrEq.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(lessThanOrEq.dataType), + rewriteToOmniJsonExpressionLiteral(lessThanOrEq.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(lessThanOrEq.right, exprsIndexMap)) case equal: EqualTo => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"EQUAL\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(equal.dataType), - rewriteToOmniJsonExpressionLiteral(equal.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(equal.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(equal.dataType), + rewriteToOmniJsonExpressionLiteral(equal.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(equal.right, exprsIndexMap)) case or: Or => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"OR\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(or.dataType), - rewriteToOmniJsonExpressionLiteral(or.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(or.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(or.dataType), + rewriteToOmniJsonExpressionLiteral(or.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(or.right, exprsIndexMap)) case and: And => ("{\"exprType\":\"BINARY\",\"returnType\":%s," + "\"operator\":\"AND\",\"left\":%s,\"right\":%s}").format( - sparkTypeToOmniExpJsonType(and.dataType), - rewriteToOmniJsonExpressionLiteral(and.left, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(and.right, exprsIndexMap)) + sparkTypeToOmniExpJsonType(and.dataType), + rewriteToOmniJsonExpressionLiteral(and.left, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(and.right, exprsIndexMap)) case alias: Alias => rewriteToOmniJsonExpressionLiteral(alias.child, exprsIndexMap) case literal: Literal => toOmniJsonLiteral(literal) @@ -484,9 +462,9 @@ object OmniExpressionAdaptor extends Logging { ("{\"exprType\":\"FUNCTION\",\"returnType\":%s," + "\"function_name\":\"substr\", \"arguments\":[%s,%s,%s]}") .format(sparkTypeToOmniExpJsonType(subString.dataType), - rewriteToOmniJsonExpressionLiteral(subString.str, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(subString.pos, exprsIndexMap), - rewriteToOmniJsonExpressionLiteral(subString.len, exprsIndexMap)) + rewriteToOmniJsonExpressionLiteral(subString.str, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(subString.pos, exprsIndexMap), + rewriteToOmniJsonExpressionLiteral(subString.len, exprsIndexMap)) // Cast case cast: Cast => @@ -497,17 +475,6 @@ object OmniExpressionAdaptor extends Logging { ("{\"exprType\":\"FUNCTION\",\"returnType\":%s," + "\"width\":50,\"function_name\":\"CAST\", \"arguments\":[%s]}") .format(returnType, rewriteToOmniJsonExpressionLiteral(cast.child, exprsIndexMap)) - // for to decimal omni default cast no precision and scale handle - // use MakeDecimal to take it - case dt: DecimalType => - if (cast.child.dataType.isInstanceOf[DoubleType]) { - ("{\"exprType\":\"FUNCTION\",\"returnType\":%s," + - "\"function_name\":\"CAST\", \"arguments\":[%s]}") - .format(returnType, rewriteToOmniJsonExpressionLiteral(cast.child, exprsIndexMap)) - } else { - rewriteToOmniJsonExpressionLiteral( - MakeDecimal(cast.child, dt.precision, dt.scale), exprsIndexMap) - } case _ => ("{\"exprType\":\"FUNCTION\",\"returnType\":%s," + "\"function_name\":\"CAST\",\"arguments\":[%s]}") @@ -593,7 +560,7 @@ object OmniExpressionAdaptor extends Logging { def toOmniJsonAttribute(attr: Attribute, colVal: Int): String = { - val omniDataType = sparkTypeToOmniExpType(attr.dataType) + val omniDataType = sparkTypeToOmniExpType(attr.dataType) attr.dataType match { case StringType => ("{\"exprType\":\"FIELD_REFERENCE\",\"dataType\":%s," + @@ -604,7 +571,7 @@ object OmniExpressionAdaptor extends Logging { "\"colVal\":%d,\"precision\":%s, \"scale\":%s}").format(omniDataType, colVal, dt.precision, dt.scale) case _ => ("{\"exprType\":\"FIELD_REFERENCE\",\"dataType\":%s," + - "\"colVal\":%d}").format(omniDataType, colVal) + "\"colVal\":%d}").format(omniDataType, colVal) } } -- Gitee From 79521131f277d0fd559cbeef2d7e1f878193623e Mon Sep 17 00:00:00 2001 From: ruanrunxue Date: Tue, 23 Aug 2022 19:43:53 +0800 Subject: [PATCH 4/7] feat: remove white of decimal --- .../spark/expression/OmniExpressionAdaptor.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index 464a5a5f8..74d4947d1 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -54,15 +54,6 @@ object OmniExpressionAdaptor extends Logging { private def DECIMAL_ALLOWEDTYPES: Seq[DecimalType] = Seq(DecimalType(7, 2), DecimalType(17, 2), DecimalType(21, 6), DecimalType(22, 6), DecimalType(38, 16)) - def checkDecimalTypeWhiteList(dt: DecimalType): Unit = { - if (!IS_DECIMAL_CHECK) { - return - } - if (!DECIMAL_ALLOWEDTYPES.contains(dt)) { - throw new UnsupportedOperationException(s"decimal precision and scale not in support scope, ${dt}") - } - } - def checkOmniJsonWhiteList(filterExpr: String, projections: Array[AnyRef]): Unit = { if (!IS_DECIMAL_CHECK) { return @@ -646,7 +637,6 @@ object OmniExpressionAdaptor extends Logging { case StringType => OMNI_VARCHAR_TYPE case DateType => OMNI_DATE_TYPE case dt: DecimalType => - checkDecimalTypeWhiteList(dt) if (DecimalType.is64BitDecimalType(dt)) { OMNI_DECIMAL64_TYPE } else { @@ -687,7 +677,6 @@ object OmniExpressionAdaptor extends Logging { case DateType => Date32DataType.DATE32 case dt: DecimalType => - checkDecimalTypeWhiteList(dt) if (DecimalType.is64BitDecimalType(dt)) { new Decimal64DataType(dt.precision, dt.scale) } else { @@ -710,7 +699,6 @@ object OmniExpressionAdaptor extends Logging { "{\"exprType\":\"FIELD_REFERENCE\",\"dataType\":%s,\"colVal\":%d,\"width\":%d}" .format(omniDataType, colVal, getStringLength(metadata)) case dt: DecimalType => - checkDecimalTypeWhiteList(dt) var omniDataType = OMNI_DECIMAL128_TYPE if (DecimalType.is64BitDecimalType(dt)) { omniDataType = OMNI_DECIMAL64_TYPE -- Gitee From 5a886495d27bc94bebc858ae5cada2990cdb6029 Mon Sep 17 00:00:00 2001 From: ruanrunxue Date: Mon, 5 Sep 2022 11:13:00 +0800 Subject: [PATCH 5/7] fix: remove CheckOverflow from omni json expression --- .../boostkit/spark/expression/OmniExpressionAdaptor.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index 74d4947d1..38356d11e 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -324,11 +324,8 @@ object OmniExpressionAdaptor extends Logging { .format(sparkTypeToOmniExpJsonType(unscaledValue.dataType), rewriteToOmniJsonExpressionLiteral(unscaledValue.child, exprsIndexMap)) - // omni not support return null, now rewrite to if(IsOverflowDecimal())? NULL:MakeDecimal() case checkOverflow: CheckOverflow => - ("{\"exprType\":\"IF\",\"returnType\":%s,\"function_name\":\"CheckOverflow\",\"arguments\":[%s]}") - .format(sparkTypeToOmniExpJsonType(checkOverflow.dataType), - rewriteToOmniJsonExpressionLiteral(checkOverflow.child, exprsIndexMap)) + rewriteToOmniJsonExpressionLiteral(checkOverflow.child, exprsIndexMap, returnDatatype) case makeDecimal: MakeDecimal => makeDecimal.child.dataType match { -- Gitee From 59425722e4088c842224680d7ba498b72849982b Mon Sep 17 00:00:00 2001 From: ruanrunxue Date: Thu, 8 Sep 2022 09:45:45 +0800 Subject: [PATCH 6/7] add DecimalOperationSuite --- .../execution/ColumnarExpandExecSuite.scala | 18 ++++ .../sql/execution/DecimalOperationSuite.scala | 87 +++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/DecimalOperationSuite.scala diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarExpandExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarExpandExecSuite.scala index 3af1849f8..85000b62c 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarExpandExecSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarExpandExecSuite.scala @@ -1,3 +1,21 @@ +/* + * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution import org.apache.spark.sql.{DataFrame, Row} diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/DecimalOperationSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/DecimalOperationSuite.scala new file mode 100644 index 000000000..747b53bbb --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/DecimalOperationSuite.scala @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{Column, DataFrame} +import org.apache.spark.sql.types.Decimal + +class DecimalOperationSuite extends ColumnarSparkPlanTest { + + import testImplicits.{localSeqToDatasetHolder, newProductEncoder} + + private var deci_overflow: DataFrame = _ + + def newRow(id: Int, c_deci5_0: String, c_deci7_2: String, c_deci17_2: String, c_deci18_6: String, + c_deci21_6: String, c_deci22_6: String, c_deci38_0: String, c_deci38_16: String): + (Int, Decimal, Decimal, Decimal, Decimal, Decimal, Decimal, Decimal, Decimal) = { + def newDecimal(deci: String, precision: Int, scale: Int): Decimal = { + if (deci == null) { + null + } else { + new Decimal().set(BigDecimal(deci, BigDecimal.defaultMathContext), precision, scale) + } + } + + (id, + newDecimal(c_deci5_0, 5, 0), + newDecimal(c_deci7_2, 7, 2), + newDecimal(c_deci17_2, 17, 2), + newDecimal(c_deci18_6, 18, 6), + newDecimal(c_deci21_6, 21, 6), + newDecimal(c_deci22_6, 22, 6), + newDecimal(c_deci38_0, 38, 0), + newDecimal(c_deci38_16, 38, 16)) + } + + override def beforeAll(): Unit = { + super.beforeAll() + + deci_overflow = Seq[(Int, Decimal, Decimal, Decimal, Decimal, + Decimal, Decimal, Decimal, Decimal)]( + newRow(1, "12345", "12345.12", "123456789123456.23", "123456789123.34", + "123456789123456.456789", "1234567891234567.567891", + "123456789123456789123456789", "1234567891234567891234.6789123456"), + newRow(2, "99999", "99999.99", "999999999999999.99", "999999999999.999999", + "999999999999999.999999", "9999999999999999.999999", + "99999999999999999999999999999999999999", "9999999999999999999999.9999999999999999"), + newRow(3, "99999", "0.99", "0.99", "0.999999", + "0.999999", "0.999999", + "99999999999999999999999999999999999999", "-9999999999999999999999.9999999999999999"), + newRow(4, "99999", "0", "0.99", "0.999999", + "0.999999", "0.999999", + "99999999999999999999999999999999999999", "0"), + newRow(5, "99999", null, "0.99", "0.999999", + "0.999999", "0.999999", + "99999999999999999999999999999999999999", null), + newRow(6, "-12345", "12345.12", "-123456789123456.23", "123456789123.34", + "-123456789123456.456789", "1234567891234567.567891", + "123456789123456789123456789", "-1234567891234567891234.6789123456"), + ).toDF("id", "c_deci5_0", "c_deci7_2", "c_deci17_2", "c_deci18_6", "c_deci21_6", + "c_deci22_6", "c_deci38_0", "c_deci38_16") + + deci_overflow = deci_overflow.withColumn("c_deci5_0", Column("c_deci5_0").cast("decimal(5,0)")) + .withColumn("c_deci7_2", Column("c_deci7_2").cast("decimal(7,2)")) + .withColumn("c_deci17_2", Column("c_deci17_2").cast("decimal(17,2)")) + .withColumn("c_deci18_6", Column("c_deci18_6").cast("decimal(18,6)")) + .withColumn("c_deci21_6", Column("c_deci21_6").cast("decimal(21,6)")) + .withColumn("c_deci22_6", Column("c_deci22_6").cast("decimal(22,6)")) + .withColumn("c_deci38_0", Column("c_deci38_0").cast("decimal(38,0)")) + .withColumn("c_deci38_16", Column("c_deci38_16").cast("decimal(38,16)")) + } +} -- Gitee From 144eb45a4db15e13ce4fd07f0dfba35bb43fe59a Mon Sep 17 00:00:00 2001 From: ruanrunxue Date: Tue, 13 Sep 2022 09:37:00 +0800 Subject: [PATCH 7/7] Revert "add DecimalOperationSuite" This reverts commit 59425722e4088c842224680d7ba498b72849982b. --- .../execution/ColumnarExpandExecSuite.scala | 18 ---- .../sql/execution/DecimalOperationSuite.scala | 87 ------------------- 2 files changed, 105 deletions(-) delete mode 100644 omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/DecimalOperationSuite.scala diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarExpandExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarExpandExecSuite.scala index 85000b62c..3af1849f8 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarExpandExecSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarExpandExecSuite.scala @@ -1,21 +1,3 @@ -/* - * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.spark.sql.execution import org.apache.spark.sql.{DataFrame, Row} diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/DecimalOperationSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/DecimalOperationSuite.scala deleted file mode 100644 index 747b53bbb..000000000 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/DecimalOperationSuite.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.sql.{Column, DataFrame} -import org.apache.spark.sql.types.Decimal - -class DecimalOperationSuite extends ColumnarSparkPlanTest { - - import testImplicits.{localSeqToDatasetHolder, newProductEncoder} - - private var deci_overflow: DataFrame = _ - - def newRow(id: Int, c_deci5_0: String, c_deci7_2: String, c_deci17_2: String, c_deci18_6: String, - c_deci21_6: String, c_deci22_6: String, c_deci38_0: String, c_deci38_16: String): - (Int, Decimal, Decimal, Decimal, Decimal, Decimal, Decimal, Decimal, Decimal) = { - def newDecimal(deci: String, precision: Int, scale: Int): Decimal = { - if (deci == null) { - null - } else { - new Decimal().set(BigDecimal(deci, BigDecimal.defaultMathContext), precision, scale) - } - } - - (id, - newDecimal(c_deci5_0, 5, 0), - newDecimal(c_deci7_2, 7, 2), - newDecimal(c_deci17_2, 17, 2), - newDecimal(c_deci18_6, 18, 6), - newDecimal(c_deci21_6, 21, 6), - newDecimal(c_deci22_6, 22, 6), - newDecimal(c_deci38_0, 38, 0), - newDecimal(c_deci38_16, 38, 16)) - } - - override def beforeAll(): Unit = { - super.beforeAll() - - deci_overflow = Seq[(Int, Decimal, Decimal, Decimal, Decimal, - Decimal, Decimal, Decimal, Decimal)]( - newRow(1, "12345", "12345.12", "123456789123456.23", "123456789123.34", - "123456789123456.456789", "1234567891234567.567891", - "123456789123456789123456789", "1234567891234567891234.6789123456"), - newRow(2, "99999", "99999.99", "999999999999999.99", "999999999999.999999", - "999999999999999.999999", "9999999999999999.999999", - "99999999999999999999999999999999999999", "9999999999999999999999.9999999999999999"), - newRow(3, "99999", "0.99", "0.99", "0.999999", - "0.999999", "0.999999", - "99999999999999999999999999999999999999", "-9999999999999999999999.9999999999999999"), - newRow(4, "99999", "0", "0.99", "0.999999", - "0.999999", "0.999999", - "99999999999999999999999999999999999999", "0"), - newRow(5, "99999", null, "0.99", "0.999999", - "0.999999", "0.999999", - "99999999999999999999999999999999999999", null), - newRow(6, "-12345", "12345.12", "-123456789123456.23", "123456789123.34", - "-123456789123456.456789", "1234567891234567.567891", - "123456789123456789123456789", "-1234567891234567891234.6789123456"), - ).toDF("id", "c_deci5_0", "c_deci7_2", "c_deci17_2", "c_deci18_6", "c_deci21_6", - "c_deci22_6", "c_deci38_0", "c_deci38_16") - - deci_overflow = deci_overflow.withColumn("c_deci5_0", Column("c_deci5_0").cast("decimal(5,0)")) - .withColumn("c_deci7_2", Column("c_deci7_2").cast("decimal(7,2)")) - .withColumn("c_deci17_2", Column("c_deci17_2").cast("decimal(17,2)")) - .withColumn("c_deci18_6", Column("c_deci18_6").cast("decimal(18,6)")) - .withColumn("c_deci21_6", Column("c_deci21_6").cast("decimal(21,6)")) - .withColumn("c_deci22_6", Column("c_deci22_6").cast("decimal(22,6)")) - .withColumn("c_deci38_0", Column("c_deci38_0").cast("decimal(38,0)")) - .withColumn("c_deci38_16", Column("c_deci38_16").cast("decimal(38,16)")) - } -} -- Gitee