From fcc1d0a67b382d89a08d65f93e5c13e35f621c79 Mon Sep 17 00:00:00 2001 From: Fomist Date: Fri, 29 Jul 2022 01:53:35 +0000 Subject: [PATCH 01/19] test pull --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c47c81a44..564f32cd9 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # boostkit-bigdata -BoostKit Acceleration Packages —— Big Data Component Adaptation Layer +BoostKit Acceleration Packages —— Big Data Component Adaptation Layer 00 ## Notice The boostkit-bigdata repo contains acceleration plugins and patches for multiple pieces of open source software including openLooKeng, Apache Spark, Hive, and HBase. Using these plugins and patches depends on other pieces of open source software (which are available in the central repo). You shall understand and agree that when using the other pieces of open source software, you shall strictly comply with their open source licenses and fulfill the obligations specified in the licenses. Any vulnerabilities and security issues of the other open source software are resolved by the corresponding upstream communities based on their own vulnerability and security response mechanisms. Please pay attention to the notifications and version updates released by the upstream communities. The Kunpeng Compute community does not assume any responsibility for the vulnerabilities and security issues of the preceding open source software. -- Gitee From 3c4fece0286f920f96e3a49f8854b615dae86e15 Mon Sep 17 00:00:00 2001 From: Jmist Date: Sat, 6 Aug 2022 10:10:53 +0800 Subject: [PATCH 02/19] del jit_enabel --- .../com/huawei/boostkit/spark/Constant.scala | 1 - .../ColumnarBasicPhysicalOperators.scala | 8 +-- .../ColumnarFileSourceScanExec.scala | 54 +++++++++---------- .../execution/ColumnarHashAggregateExec.scala | 6 +-- .../sql/execution/ColumnarProjection.scala | 6 +-- .../ColumnarShuffleExchangeExec.scala | 6 +-- .../sql/execution/ColumnarSortExec.scala | 4 +- .../ColumnarTakeOrderedAndProjectExec.scala | 6 +-- .../sql/execution/ColumnarWindowExec.scala | 6 +-- .../joins/ColumnarBroadcastHashJoinExec.scala | 8 +-- .../joins/ColumnarShuffledHashJoinExec.scala | 8 +-- .../joins/ColumnarSortMergeJoinExec.scala | 8 +-- 12 files changed, 60 insertions(+), 61 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala index 1460c618d..4112d669a 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala @@ -32,7 +32,6 @@ object Constant { val OMNI_DOUBLE_TYPE: String = DataTypeId.OMNI_DOUBLE.ordinal().toString val OMNI_BOOLEAN_TYPE: String = DataTypeId.OMNI_BOOLEAN.ordinal().toString val OMNI_DATE_TYPE: String = DataTypeId.OMNI_DATE32.ordinal().toString - val IS_ENABLE_JIT: Boolean = ColumnarPluginConfig.getSessionConf.enableJit val IS_DECIMAL_CHECK: Boolean = ColumnarPluginConfig.getSessionConf.enableDecimalCheck val IS_SKIP_VERIFY_EXP: Boolean = true val OMNI_DECIMAL64_TYPE: String = DataTypeId.OMNI_DECIMAL64.ordinal().toString diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala index 96d3189b1..73c3d6cb2 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.filter.OmniFilterAndProjectOperatorFactory import nova.hetu.omniruntime.vector.VecBatch @@ -195,7 +195,7 @@ case class ColumnarFilterExec(condition: Expression, child: SparkPlan) child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() val filterOperatorFactory = new OmniFilterAndProjectOperatorFactory( - omniExpression, omniInputTypes, seqAsJavaList(omniProjectIndices), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + omniExpression, omniInputTypes, seqAsJavaList(omniProjectIndices), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val filterOperator = filterOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) @@ -293,7 +293,7 @@ case class ColumnarConditionProjectExec(projectList: Seq[NamedExpression], child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() val operatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniInputTypes, seqAsJavaList(omniExpressions), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniInputTypes, seqAsJavaList(omniExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val operator = operatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala index cf78751c3..21da4d9ec 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import java.util.Optional import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import scala.collection.mutable.HashMap @@ -32,7 +32,7 @@ import nova.hetu.omniruntime.vector.{Vec, VecBatch} import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil._ import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory @@ -833,14 +833,14 @@ case class ColumnarMultipleOperatorExec( omniAggOutputTypes, omniAggInputRaw, omniAggOutputPartial, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val aggOperator = aggFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { aggOperator.close() }) - val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator1 = projectOperatorFactory1.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -849,7 +849,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory1 = new OmniHashBuilderWithExprOperatorFactory(buildTypes1, buildJoinColsExp1, if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp1 = buildOpFactory1.createOperator() buildData1.value.foreach { input => buildOp1.addInput(deserializer.deserialize(input)) @@ -857,7 +857,7 @@ case class ColumnarMultipleOperatorExec( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, OMNI_JOIN_TYPE_INNER, buildOpFactory1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -867,7 +867,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory1.close() }) - val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator2 = projectOperatorFactory2.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -876,7 +876,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory2 = new OmniHashBuilderWithExprOperatorFactory(buildTypes2, buildJoinColsExp2, if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp2 = buildOpFactory2.createOperator() buildData2.value.foreach { input => buildOp2.addInput(deserializer.deserialize(input)) @@ -884,7 +884,7 @@ case class ColumnarMultipleOperatorExec( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, OMNI_JOIN_TYPE_INNER, buildOpFactory2, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -894,7 +894,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory2.close() }) - val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator3 = projectOperatorFactory3.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -903,7 +903,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory3 = new OmniHashBuilderWithExprOperatorFactory(buildTypes3, buildJoinColsExp3, if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp3 = buildOpFactory3.createOperator() buildData3.value.foreach { input => buildOp3.addInput(deserializer.deserialize(input)) @@ -911,7 +911,7 @@ case class ColumnarMultipleOperatorExec( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, OMNI_JOIN_TYPE_INNER, buildOpFactory3, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -921,7 +921,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory3.close() }) - val projectOperatorFactory4 = new OmniProjectOperatorFactory(proj4OmniExpressions, proj4OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory4 = new OmniProjectOperatorFactory(proj4OmniExpressions, proj4OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator4 = projectOperatorFactory4.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -930,7 +930,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory4 = new OmniHashBuilderWithExprOperatorFactory(buildTypes4, buildJoinColsExp4, if (joinFilter4.nonEmpty) {Optional.of(joinFilter4.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp4 = buildOpFactory4.createOperator() buildData4.value.foreach { input => buildOp4.addInput(deserializer.deserialize(input)) @@ -938,7 +938,7 @@ case class ColumnarMultipleOperatorExec( buildOp4.getOutput val lookupOpFactory4 = new OmniLookupJoinWithExprOperatorFactory(probeTypes4, probeOutputCols4, probeHashColsExp4, buildOutputCols4, buildOutputTypes4, OMNI_JOIN_TYPE_INNER, buildOpFactory4, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp4 = lookupOpFactory4.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -949,7 +949,7 @@ case class ColumnarMultipleOperatorExec( }) val condOperatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val condOperator = condOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator @@ -1168,14 +1168,14 @@ case class ColumnarMultipleOperatorExec1( omniAggOutputTypes, omniAggInputRaw, omniAggOutputPartial, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val aggOperator = aggFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { aggOperator.close() }) - val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator1 = projectOperatorFactory1.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1184,7 +1184,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory1 = new OmniHashBuilderWithExprOperatorFactory(buildTypes1, buildJoinColsExp1, if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp1 = buildOpFactory1.createOperator() buildData1.value.foreach { input => buildOp1.addInput(deserializer.deserialize(input)) @@ -1192,7 +1192,7 @@ case class ColumnarMultipleOperatorExec1( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, OMNI_JOIN_TYPE_INNER, buildOpFactory1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1202,7 +1202,7 @@ case class ColumnarMultipleOperatorExec1( lookupOpFactory1.close() }) - val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator2 = projectOperatorFactory2.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1211,7 +1211,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory2 = new OmniHashBuilderWithExprOperatorFactory(buildTypes2, buildJoinColsExp2, if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp2 = buildOpFactory2.createOperator() buildData2.value.foreach { input => buildOp2.addInput(deserializer.deserialize(input)) @@ -1219,7 +1219,7 @@ case class ColumnarMultipleOperatorExec1( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, OMNI_JOIN_TYPE_INNER, buildOpFactory2, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1229,7 +1229,7 @@ case class ColumnarMultipleOperatorExec1( lookupOpFactory2.close() }) - val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator3 = projectOperatorFactory3.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1238,7 +1238,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory3 = new OmniHashBuilderWithExprOperatorFactory(buildTypes3, buildJoinColsExp3, if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp3 = buildOpFactory3.createOperator() buildData3.value.foreach { input => buildOp3.addInput(deserializer.deserialize(input)) @@ -1246,7 +1246,7 @@ case class ColumnarMultipleOperatorExec1( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, OMNI_JOIN_TYPE_INNER, buildOpFactory3, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1257,7 +1257,7 @@ case class ColumnarMultipleOperatorExec1( }) val condOperatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val condOperator = condOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala index 4389500af..1ecc0923d 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala @@ -18,13 +18,13 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.FunctionType import nova.hetu.omniruntime.operator.aggregator.OmniHashAggregationWithExprOperatorFactory -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -218,7 +218,7 @@ case class ColumnarHashAggregateExec( omniAggOutputTypes, omniInputRaw, omniOutputPartial, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val operator = factory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala index 6c8805589..ca0485384 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.sql.execution.metric.SQLMetric @@ -42,7 +42,7 @@ object ColumnarProjection { omniExpressions: Array[String], iter: Iterator[ColumnarBatch], schema: StructType): Iterator[ColumnarBatch] = { val startCodegen = System.nanoTime() - val projectOperatorFactory = new OmniProjectOperatorFactory(omniExpressions, omniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory = new OmniProjectOperatorFactory(omniExpressions, omniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator = projectOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 96cb162a3..5f0231b50 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import com.huawei.boostkit.spark.ColumnarPluginConfig import java.util.Random -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import scala.collection.JavaConverters._ import scala.concurrent.Future @@ -29,7 +29,7 @@ import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import com.huawei.boostkit.spark.vectorized.PartitionInfo import nova.hetu.omniruntime.`type`.{DataType, DataTypeSerializer} -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.{IntVec, VecBatch} import org.apache.spark._ @@ -289,7 +289,7 @@ object ColumnarShuffleExchangeExec extends Logging { // omni project val genHashExpression = genHashExpr() val omniExpr: String = genHashExpression(expressions, numPartitions, defaultMm3HashSeed, outputAttributes) - val factory = new OmniProjectOperatorFactory(Array(omniExpr), inputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val factory = new OmniProjectOperatorFactory(Array(omniExpr), inputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val op = factory.createOperator() cbIter.map { cb => diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala index 3edf533c2..165b6a4c0 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala @@ -22,7 +22,7 @@ import java.util.UUID import java.util.concurrent.TimeUnit.NANOSECONDS import com.huawei.boostkit.spark.ColumnarPluginConfig -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{addAllAndGetIterator, genSortParam} import nova.hetu.omniruntime.operator.config.{OperatorConfig, SparkSpillConfig} import nova.hetu.omniruntime.operator.sort.OmniSortWithExprOperatorFactory @@ -119,7 +119,7 @@ case class ColumnarSortExec( sortSpillDirDiskReserveSize, sortSpillRowThreshold) val startCodegen = System.nanoTime() val sortOperatorFactory = new OmniSortWithExprOperatorFactory(sourceTypes, outputCols, - sortColsExp, ascendings, nullFirsts, new OperatorConfig(IS_ENABLE_JIT, sparkSpillConf, IS_SKIP_VERIFY_EXP)) + sortColsExp, ascendings, nullFirsts, new OperatorConfig(sparkSpillConf, IS_SKIP_VERIFY_EXP)) val sortOperator = sortOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala index e0d920e01..3ae42ed3d 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala @@ -18,12 +18,12 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, getExprIdMap, rewriteToOmniJsonExpressionLiteral, sparkTypeToOmniType} import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{addAllAndGetIterator, genSortParam} import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.topn.OmniTopNWithExprOperatorFactory import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -108,7 +108,7 @@ case class ColumnarTakeOrderedAndProjectExec( def computeTopN(iter: Iterator[ColumnarBatch], schema: StructType): Iterator[ColumnarBatch] = { val startCodegen = System.nanoTime() val topNOperatorFactory = new OmniTopNWithExprOperatorFactory(sourceTypes, limit, - sortColsExp, ascendings, nullFirsts, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + sortColsExp, ascendings, nullFirsts, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val topNOperator = topNOperatorFactory.createOperator longMetric("omniCodegenTime") += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala index 851973bd5..cc8491268 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.{FunctionType, OmniWindowFrameBoundType, OmniWindowFrameType} -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.window.OmniWindowWithExprOperatorFactory import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.rdd.RDD @@ -343,7 +343,7 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], windowFunType, omminPartitionChannels, preGroupedChannels, sortCols, ascendings, nullFirsts, 0, 10000, windowArgKeys, windowFunRetType, windowFrameTypes, windowFrameStartTypes, windowFrameStartChannels, windowFrameEndTypes, - windowFrameEndChannels, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + windowFrameEndChannels, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val windowOperator = windowOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index b10b9cc36..e632831a0 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -21,7 +21,7 @@ import com.huawei.boostkit.spark.ColumnarPluginConfig import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.Optional -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import scala.collection.mutable import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor @@ -29,7 +29,7 @@ import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.checkOmniJsonW import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.vector.VecBatch import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory @@ -290,7 +290,7 @@ case class ColumnarBroadcastHashJoinExec( } val startBuildCodegen = System.nanoTime() val buildOpFactory = new OmniHashBuilderWithExprOperatorFactory(buildTypes, - buildJoinColsExp, filter, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + buildJoinColsExp, filter, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp = buildOpFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) @@ -307,7 +307,7 @@ case class ColumnarBroadcastHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, buildOutputCols, buildOutputTypes, OMNI_JOIN_TYPE_INNER, buildOpFactory, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala index ac5d5fdbc..cd81a8e25 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution.joins import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.Optional -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.TaskContext @@ -189,7 +189,7 @@ case class ColumnarShuffledHashJoinExec( } val startBuildCodegen = System.nanoTime() val buildOpFactory = new OmniHashBuilderWithExprOperatorFactory(buildTypes, - buildJoinColsExp, filter, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + buildJoinColsExp, filter, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp = buildOpFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) @@ -214,7 +214,7 @@ case class ColumnarShuffledHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, buildOutputCols, buildOutputTypes, OMNI_JOIN_TYPE_INNER, buildOpFactory, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala index 2a16a0dbf..dfe539662 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala @@ -21,13 +21,13 @@ import com.huawei.boostkit.spark.ColumnarPluginConfig import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.Optional -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.checkOmniJsonWhiteList import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniSmjBufferedTableWithExprOperatorFactory, OmniSmjStreamedTableWithExprOperatorFactory} import nova.hetu.omniruntime.vector.{BooleanVec, Decimal128Vec, DoubleVec, IntVec, LongVec, VarcharVec, Vec, VecBatch} import org.apache.spark.rdd.RDD @@ -173,13 +173,13 @@ class ColumnarSortMergeJoinExec( val filter: Optional[String] = Optional.ofNullable(filterString) val startStreamedCodegen = System.nanoTime() val streamedOpFactory = new OmniSmjStreamedTableWithExprOperatorFactory(streamedTypes, - streamedKeyColsExp, streamedOutputChannel, OMNI_JOIN_TYPE_INNER, filter, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + streamedKeyColsExp, streamedOutputChannel, OMNI_JOIN_TYPE_INNER, filter, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val streamedOp = streamedOpFactory.createOperator streamedCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startStreamedCodegen) val startBufferedCodegen = System.nanoTime() val bufferedOpFactory = new OmniSmjBufferedTableWithExprOperatorFactory(bufferedTypes, - bufferedKeyColsExp, bufferedOutputChannel, streamedOpFactory, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + bufferedKeyColsExp, bufferedOutputChannel, streamedOpFactory, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val bufferedOp = bufferedOpFactory.createOperator bufferedCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBufferedCodegen) -- Gitee From d43b62b497d98611ba26062336876761ce8bbefc Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Sat, 6 Aug 2022 06:14:00 +0000 Subject: [PATCH 03/19] bak --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 564f32cd9..c47c81a44 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # boostkit-bigdata -BoostKit Acceleration Packages —— Big Data Component Adaptation Layer 00 +BoostKit Acceleration Packages —— Big Data Component Adaptation Layer ## Notice The boostkit-bigdata repo contains acceleration plugins and patches for multiple pieces of open source software including openLooKeng, Apache Spark, Hive, and HBase. Using these plugins and patches depends on other pieces of open source software (which are available in the central repo). You shall understand and agree that when using the other pieces of open source software, you shall strictly comply with their open source licenses and fulfill the obligations specified in the licenses. Any vulnerabilities and security issues of the other open source software are resolved by the corresponding upstream communities based on their own vulnerability and security response mechanisms. Please pay attention to the notifications and version updates released by the upstream communities. The Kunpeng Compute community does not assume any responsibility for the vulnerabilities and security issues of the preceding open source software. -- Gitee From 7a3fada96c116519dd61693f46f89470ac77b36d Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Mon, 8 Aug 2022 06:54:28 +0000 Subject: [PATCH 04/19] del jit in ColumnarPluginConfig --- .../scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index 39ac95e32..c2c1b989e 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -152,7 +152,6 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { val maxRowCount = conf.getConfString("spark.sql.columnar.maxRowCount", "20000").toInt - val enableJit: Boolean = conf.getConfString("spark.omni.sql.columnar.jit", "false").toBoolean val enableDecimalCheck : Boolean = conf.getConfString("spark.omni.sql.decimal.constraint.check", "true").toBoolean } -- Gitee From e783a349fdd1f4088b5e72f80211e48a6ee9567a Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Tue, 9 Aug 2022 11:38:43 +0000 Subject: [PATCH 05/19] modify vec datatype --- .../cpp/src/jni/SparkJniWrapper.cpp | 12 ++++++++++-- .../cpp/src/shuffle/splitter.cpp | 6 ++++-- .../spark/shuffle/ColumnarShuffleWriterSuite.scala | 8 ++++---- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index 0f0351241..e9f7006ee 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -91,8 +91,16 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; for (int i = 0; i < size; ++i) { - inputDataPrecisions[i] = inputDataTpyes[i].GetPrecision(); - inputDataScales[i] = inputDataTpyes[i].GetScale(); + switch (inputDataTpyes[i]->GetId()){ + case OMNI_DECIMAL64: + case OMNI_DECIMAL128: { + inputDataPrecisions[i] = inputDataTpyes[i].GetPrecision(); + inputDataScales[i] = inputDataTpyes[i].GetScale(); + } + default: { + + } + } } inputDataTpyes.clear(); diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 5fdff5344..b28cced84 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -765,8 +765,10 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptrset_typeid_(CastShuffleTypeIdToVecType(vector_batch_col_types_[indexSchema])); LogsDebug("precision[indexSchema %d]: %d ", indexSchema, input_col_types.inputDataPrecisions[indexSchema]); LogsDebug("scale[indexSchema %d]: %d ", indexSchema, input_col_types.inputDataScales[indexSchema]); - vt->set_precision(input_col_types.inputDataPrecisions[indexSchema]); - vt->set_scale(input_col_types.inputDataScales[indexSchema]); + if(vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL128 || vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL64){ + vt->set_precision(input_col_types.inputDataPrecisions[indexSchema]); + vt->set_scale(input_col_types.inputDataScales[indexSchema]); + } } curBatch++; diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala index 6dddff494..c1794eae4 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala @@ -78,10 +78,10 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { shuffleHandle = new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) - val inputTypes = "[{\"id\":\"OMNI_INT\",\"width\":0,\"precision\":0,\"scale\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":\"OMNI_INT\",\"width\":0,\"precision\":0,\"scale\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":\"OMNI_DECIMAL64\",\"width\":0,\"precision\":18,\"scale\":3,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":\"OMNI_DECIMAL128\",\"width\":0,\"precision\":28,\"scale\":11,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}]" + val inputTypes = "[{\"id\":1,\"width\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + + "{\"id\":1,\"width\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + + "{\"id\":6,\"width\":0,\"precision\":18,\"scale\":3,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + + "{\"id\":7,\"width\":0,\"precision\":28,\"scale\":11,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}]" when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) -- Gitee From b74ab605824b38dd3428c88111527ce12fddd730 Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Tue, 9 Aug 2022 12:02:04 +0000 Subject: [PATCH 06/19] vecId modify --- .../omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index e9f7006ee..0c3572264 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -91,7 +91,7 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; for (int i = 0; i < size; ++i) { - switch (inputDataTpyes[i]->GetId()){ + switch (inputDataTpyes[i].GetId()){ case OMNI_DECIMAL64: case OMNI_DECIMAL128: { inputDataPrecisions[i] = inputDataTpyes[i].GetPrecision(); -- Gitee From 46834c27301237cb4540f041e352aa5537049f66 Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Tue, 9 Aug 2022 12:11:55 +0000 Subject: [PATCH 07/19] MofiyType --- .../omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index 0c3572264..223ad9ac4 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -86,7 +86,7 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( DataTypes inputVecTypes = Deserialize(inputTypeCharPtr); const int32_t *inputVecTypeIds = inputVecTypes.GetIds(); // - std::vector inputDataTpyes = inputVecTypes.Get(); + std::vector inputDataTpyes = inputVecTypes.Get(); int32_t size = inputDataTpyes.size(); uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; -- Gitee From 68e09484f96b651dbbe20fa67f38e269305946be Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Tue, 9 Aug 2022 14:36:53 +0000 Subject: [PATCH 08/19] sync jit code --- .../cpp/src/jni/SparkJniWrapper.cpp | 14 ++++----- .../cpp/test/utils/test_utils.cpp | 29 ++++++++++--------- .../shuffle/ColumnarShuffleWriterSuite.scala | 8 ++--- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index 223ad9ac4..706da8450 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -46,10 +46,10 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { return JNI_ERR; } - illegal_access_exception_class = + illegal_access_exception_class = CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); - split_result_class = + split_result_class = CreateGlobalClassReference(env, "Lcom/huawei/boostkit/spark/vectorized/SplitResult;"); split_result_constructor = GetMethodID(env, split_result_class, "", "(JJJJJ[J)V"); @@ -91,11 +91,11 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; for (int i = 0; i < size; ++i) { - switch (inputDataTpyes[i].GetId()){ + switch (inputDataTpyes[i]->GetId()){ case OMNI_DECIMAL64: case OMNI_DECIMAL128: { - inputDataPrecisions[i] = inputDataTpyes[i].GetPrecision(); - inputDataScales[i] = inputDataTpyes[i].GetScale(); + inputDataScales[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetScale(); + inputDataPrecisions[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetPrecision(); } default: { @@ -215,10 +215,10 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_stop( split_result_class, split_result_constructor, splitter->TotalComputePidTime(), splitter->TotalWriteTime(), splitter->TotalSpillTime(), splitter->TotalBytesWritten(), splitter->TotalBytesSpilled(), partition_length_arr); - + return split_result; } - + JNIEXPORT void JNICALL Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_close( JNIEnv* env, jobject, jlong splitter_id) { diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp index 5f7458566..70fe5d85d 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp @@ -17,21 +17,22 @@ * limitations under the License. */ +#include #include "test_utils.h" using namespace omniruntime::vec; -void ToVectorTypes(const int32_t *dataTypeIds, int32_t dataTypeCount, std::vector &dataTypes) +void ToVectorTypes(const int32_t *dataTypeIds, int32_t dataTypeCount, std::vector &dataTypes) { for (int i = 0; i < dataTypeCount; ++i) { if (dataTypeIds[i] == OMNI_VARCHAR) { - dataTypes.push_back(VarcharDataType(50)); + dataTypes.push_back(std::make_shared(50)); continue; } else if (dataTypeIds[i] == OMNI_CHAR) { - dataTypes.push_back(CharDataType(50)); + dataTypes.push_back(std::make_shared(50)); continue; } - dataTypes.push_back(DataType(dataTypeIds[i])); + dataTypes.push_back(std::make_shared(dataTypeIds[i])); } } @@ -41,7 +42,7 @@ VectorBatch* CreateInputData(const int32_t numRows, int64_t* allData) { auto *vecBatch = new VectorBatch(numCols, numRows); - vector inputTypes; + vector inputTypes; ToVectorTypes(inputTypeIds, numCols, inputTypes); vecBatch->NewVectors(omniruntime::vec::GetProcessGlobalVecAllocator(), inputTypes); for (int i = 0; i < numCols; ++i) { @@ -123,13 +124,13 @@ Vector *CreateVector(DataType &vecType, int32_t rowCount, va_list &args) } } -DictionaryVector *CreateDictionaryVector(DataType &vecType, int32_t rowCount, int32_t *ids, int32_t idsCount, ...) +DictionaryVector *CreateDictionaryVector(DataType &dataType, int32_t rowCount, int32_t *ids, int32_t idsCount, ...) { va_list args; va_start(args, idsCount); - Vector *dictionary = CreateVector(vecType, rowCount, args); + Vector *dictionary = CreateVector(dataType, rowCount, args); va_end(args); - auto vec = std::make_unique(dictionary, ids, idsCount).release(); + auto vec = new DictionaryVector(dictionary, ids, idsCount); delete dictionary; return vec; } @@ -199,15 +200,15 @@ Vector *buildVector(const DataType &aggType, int32_t rowNumber) } } -VectorBatch *CreateVectorBatch(DataTypes &types, int32_t rowCount, ...) +VectorBatch *CreateVectorBatch(const DataTypes &types, int32_t rowCount, ...) { int32_t typesCount = types.GetSize(); - VectorBatch *vectorBatch = std::make_unique(typesCount).release(); + auto *vectorBatch = new VectorBatch(typesCount, rowCount); va_list args; va_start(args, rowCount); for (int32_t i = 0; i < typesCount; i++) { - DataType type = types.Get()[i]; - vectorBatch->SetVector(i, CreateVector(type, rowCount, args)); + DataTypePtr type = types.GetType(i); + vectorBatch->SetVector(i, CreateVector(*type, rowCount, args)); } va_end(args); return vectorBatch; @@ -502,7 +503,7 @@ VectorBatch* CreateVectorBatch_2dictionaryCols_withPid(int partitionNum) { int32_t data0[dataSize] = {111, 112, 113, 114, 115, 116}; int64_t data1[dataSize] = {221, 222, 223, 224, 225, 226}; void *datas[2] = {data0, data1}; - DataTypes sourceTypes(std::vector({ IntDataType(), LongDataType()})); + DataTypes sourceTypes(std::vector({ std::make_unique(), std::make_unique()})); int32_t ids[] = {0, 1, 2, 3, 4, 5}; VectorBatch *vectorBatch = new VectorBatch(3, dataSize); VectorAllocator *allocator = omniruntime::vec::GetProcessGlobalVecAllocator(); @@ -514,7 +515,7 @@ VectorBatch* CreateVectorBatch_2dictionaryCols_withPid(int partitionNum) { if (i == 0) { vectorBatch->SetVector(i, intVectorTmp); } else { - omniruntime::vec::DataType dataType = sourceTypes.Get()[i - 1]; + omniruntime::vec::DataType dataType = *(sourceTypes.Get()[i - 1]); vectorBatch->SetVector(i, CreateDictionaryVector(dataType, dataSize, ids, dataSize, datas[i - 1])); } } diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala index c1794eae4..6e55a49f7 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala @@ -78,10 +78,10 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { shuffleHandle = new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) - val inputTypes = "[{\"id\":1,\"width\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":1,\"width\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":6,\"width\":0,\"precision\":18,\"scale\":3,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":7,\"width\":0,\"precision\":28,\"scale\":11,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}]" + val inputTypes = "[{\"id\":1}," + + "{\"id\":1}," + + "{\"id\":6,\"precision\":18,\"scale\":3}," + + "{\"id\":7,\"precision\":28,\"scale\":11}]" when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) -- Gitee From 7679ee3f7138c09727cbde5d6ac3648ef6ef5af8 Mon Sep 17 00:00:00 2001 From: Jmist Date: Sat, 6 Aug 2022 10:10:53 +0800 Subject: [PATCH 09/19] del jit_enabel --- .../com/huawei/boostkit/spark/Constant.scala | 1 - .../ColumnarBasicPhysicalOperators.scala | 8 +-- .../ColumnarFileSourceScanExec.scala | 54 +++++++++---------- .../execution/ColumnarHashAggregateExec.scala | 6 +-- .../sql/execution/ColumnarProjection.scala | 6 +-- .../ColumnarShuffleExchangeExec.scala | 6 +-- .../sql/execution/ColumnarSortExec.scala | 4 +- .../ColumnarTakeOrderedAndProjectExec.scala | 6 +-- .../sql/execution/ColumnarWindowExec.scala | 6 +-- .../joins/ColumnarBroadcastHashJoinExec.scala | 8 +-- .../joins/ColumnarShuffledHashJoinExec.scala | 8 +-- .../joins/ColumnarSortMergeJoinExec.scala | 8 +-- 12 files changed, 60 insertions(+), 61 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala index 1460c618d..4112d669a 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/Constant.scala @@ -32,7 +32,6 @@ object Constant { val OMNI_DOUBLE_TYPE: String = DataTypeId.OMNI_DOUBLE.ordinal().toString val OMNI_BOOLEAN_TYPE: String = DataTypeId.OMNI_BOOLEAN.ordinal().toString val OMNI_DATE_TYPE: String = DataTypeId.OMNI_DATE32.ordinal().toString - val IS_ENABLE_JIT: Boolean = ColumnarPluginConfig.getSessionConf.enableJit val IS_DECIMAL_CHECK: Boolean = ColumnarPluginConfig.getSessionConf.enableDecimalCheck val IS_SKIP_VERIFY_EXP: Boolean = true val OMNI_DECIMAL64_TYPE: String = DataTypeId.OMNI_DECIMAL64.ordinal().toString diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala index 96d3189b1..73c3d6cb2 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.filter.OmniFilterAndProjectOperatorFactory import nova.hetu.omniruntime.vector.VecBatch @@ -195,7 +195,7 @@ case class ColumnarFilterExec(condition: Expression, child: SparkPlan) child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() val filterOperatorFactory = new OmniFilterAndProjectOperatorFactory( - omniExpression, omniInputTypes, seqAsJavaList(omniProjectIndices), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + omniExpression, omniInputTypes, seqAsJavaList(omniProjectIndices), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val filterOperator = filterOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) @@ -293,7 +293,7 @@ case class ColumnarConditionProjectExec(projectList: Seq[NamedExpression], child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() val operatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniInputTypes, seqAsJavaList(omniExpressions), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniInputTypes, seqAsJavaList(omniExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val operator = operatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala index cf78751c3..21da4d9ec 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import java.util.Optional import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import scala.collection.mutable.HashMap @@ -32,7 +32,7 @@ import nova.hetu.omniruntime.vector.{Vec, VecBatch} import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil._ import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory @@ -833,14 +833,14 @@ case class ColumnarMultipleOperatorExec( omniAggOutputTypes, omniAggInputRaw, omniAggOutputPartial, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val aggOperator = aggFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { aggOperator.close() }) - val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator1 = projectOperatorFactory1.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -849,7 +849,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory1 = new OmniHashBuilderWithExprOperatorFactory(buildTypes1, buildJoinColsExp1, if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp1 = buildOpFactory1.createOperator() buildData1.value.foreach { input => buildOp1.addInput(deserializer.deserialize(input)) @@ -857,7 +857,7 @@ case class ColumnarMultipleOperatorExec( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, OMNI_JOIN_TYPE_INNER, buildOpFactory1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -867,7 +867,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory1.close() }) - val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator2 = projectOperatorFactory2.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -876,7 +876,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory2 = new OmniHashBuilderWithExprOperatorFactory(buildTypes2, buildJoinColsExp2, if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp2 = buildOpFactory2.createOperator() buildData2.value.foreach { input => buildOp2.addInput(deserializer.deserialize(input)) @@ -884,7 +884,7 @@ case class ColumnarMultipleOperatorExec( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, OMNI_JOIN_TYPE_INNER, buildOpFactory2, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -894,7 +894,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory2.close() }) - val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator3 = projectOperatorFactory3.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -903,7 +903,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory3 = new OmniHashBuilderWithExprOperatorFactory(buildTypes3, buildJoinColsExp3, if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp3 = buildOpFactory3.createOperator() buildData3.value.foreach { input => buildOp3.addInput(deserializer.deserialize(input)) @@ -911,7 +911,7 @@ case class ColumnarMultipleOperatorExec( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, OMNI_JOIN_TYPE_INNER, buildOpFactory3, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -921,7 +921,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory3.close() }) - val projectOperatorFactory4 = new OmniProjectOperatorFactory(proj4OmniExpressions, proj4OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory4 = new OmniProjectOperatorFactory(proj4OmniExpressions, proj4OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator4 = projectOperatorFactory4.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -930,7 +930,7 @@ case class ColumnarMultipleOperatorExec( val buildOpFactory4 = new OmniHashBuilderWithExprOperatorFactory(buildTypes4, buildJoinColsExp4, if (joinFilter4.nonEmpty) {Optional.of(joinFilter4.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp4 = buildOpFactory4.createOperator() buildData4.value.foreach { input => buildOp4.addInput(deserializer.deserialize(input)) @@ -938,7 +938,7 @@ case class ColumnarMultipleOperatorExec( buildOp4.getOutput val lookupOpFactory4 = new OmniLookupJoinWithExprOperatorFactory(probeTypes4, probeOutputCols4, probeHashColsExp4, buildOutputCols4, buildOutputTypes4, OMNI_JOIN_TYPE_INNER, buildOpFactory4, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp4 = lookupOpFactory4.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -949,7 +949,7 @@ case class ColumnarMultipleOperatorExec( }) val condOperatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val condOperator = condOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator @@ -1168,14 +1168,14 @@ case class ColumnarMultipleOperatorExec1( omniAggOutputTypes, omniAggInputRaw, omniAggOutputPartial, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val aggOperator = aggFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { aggOperator.close() }) - val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator1 = projectOperatorFactory1.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1184,7 +1184,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory1 = new OmniHashBuilderWithExprOperatorFactory(buildTypes1, buildJoinColsExp1, if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp1 = buildOpFactory1.createOperator() buildData1.value.foreach { input => buildOp1.addInput(deserializer.deserialize(input)) @@ -1192,7 +1192,7 @@ case class ColumnarMultipleOperatorExec1( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, OMNI_JOIN_TYPE_INNER, buildOpFactory1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1202,7 +1202,7 @@ case class ColumnarMultipleOperatorExec1( lookupOpFactory1.close() }) - val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator2 = projectOperatorFactory2.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1211,7 +1211,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory2 = new OmniHashBuilderWithExprOperatorFactory(buildTypes2, buildJoinColsExp2, if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp2 = buildOpFactory2.createOperator() buildData2.value.foreach { input => buildOp2.addInput(deserializer.deserialize(input)) @@ -1219,7 +1219,7 @@ case class ColumnarMultipleOperatorExec1( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, OMNI_JOIN_TYPE_INNER, buildOpFactory2, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1229,7 +1229,7 @@ case class ColumnarMultipleOperatorExec1( lookupOpFactory2.close() }) - val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator3 = projectOperatorFactory3.createOperator // close operator addLeakSafeTaskCompletionListener[Unit](_ => { @@ -1238,7 +1238,7 @@ case class ColumnarMultipleOperatorExec1( val buildOpFactory3 = new OmniHashBuilderWithExprOperatorFactory(buildTypes3, buildJoinColsExp3, if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, 1, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp3 = buildOpFactory3.createOperator() buildData3.value.foreach { input => buildOp3.addInput(deserializer.deserialize(input)) @@ -1246,7 +1246,7 @@ case class ColumnarMultipleOperatorExec1( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, OMNI_JOIN_TYPE_INNER, buildOpFactory3, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() // close operator SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { @@ -1257,7 +1257,7 @@ case class ColumnarMultipleOperatorExec1( }) val condOperatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val condOperator = condOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala index 4389500af..1ecc0923d 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala @@ -18,13 +18,13 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.FunctionType import nova.hetu.omniruntime.operator.aggregator.OmniHashAggregationWithExprOperatorFactory -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -218,7 +218,7 @@ case class ColumnarHashAggregateExec( omniAggOutputTypes, omniInputRaw, omniOutputPartial, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val operator = factory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala index 6c8805589..ca0485384 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.sql.execution.metric.SQLMetric @@ -42,7 +42,7 @@ object ColumnarProjection { omniExpressions: Array[String], iter: Iterator[ColumnarBatch], schema: StructType): Iterator[ColumnarBatch] = { val startCodegen = System.nanoTime() - val projectOperatorFactory = new OmniProjectOperatorFactory(omniExpressions, omniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val projectOperatorFactory = new OmniProjectOperatorFactory(omniExpressions, omniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val projectOperator = projectOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) // close operator diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 96cb162a3..5f0231b50 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import com.huawei.boostkit.spark.ColumnarPluginConfig import java.util.Random -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import scala.collection.JavaConverters._ import scala.concurrent.Future @@ -29,7 +29,7 @@ import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import com.huawei.boostkit.spark.vectorized.PartitionInfo import nova.hetu.omniruntime.`type`.{DataType, DataTypeSerializer} -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.{IntVec, VecBatch} import org.apache.spark._ @@ -289,7 +289,7 @@ object ColumnarShuffleExchangeExec extends Logging { // omni project val genHashExpression = genHashExpr() val omniExpr: String = genHashExpression(expressions, numPartitions, defaultMm3HashSeed, outputAttributes) - val factory = new OmniProjectOperatorFactory(Array(omniExpr), inputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val factory = new OmniProjectOperatorFactory(Array(omniExpr), inputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val op = factory.createOperator() cbIter.map { cb => diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala index 3edf533c2..165b6a4c0 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala @@ -22,7 +22,7 @@ import java.util.UUID import java.util.concurrent.TimeUnit.NANOSECONDS import com.huawei.boostkit.spark.ColumnarPluginConfig -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{addAllAndGetIterator, genSortParam} import nova.hetu.omniruntime.operator.config.{OperatorConfig, SparkSpillConfig} import nova.hetu.omniruntime.operator.sort.OmniSortWithExprOperatorFactory @@ -119,7 +119,7 @@ case class ColumnarSortExec( sortSpillDirDiskReserveSize, sortSpillRowThreshold) val startCodegen = System.nanoTime() val sortOperatorFactory = new OmniSortWithExprOperatorFactory(sourceTypes, outputCols, - sortColsExp, ascendings, nullFirsts, new OperatorConfig(IS_ENABLE_JIT, sparkSpillConf, IS_SKIP_VERIFY_EXP)) + sortColsExp, ascendings, nullFirsts, new OperatorConfig(sparkSpillConf, IS_SKIP_VERIFY_EXP)) val sortOperator = sortOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala index e0d920e01..3ae42ed3d 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala @@ -18,12 +18,12 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, getExprIdMap, rewriteToOmniJsonExpressionLiteral, sparkTypeToOmniType} import com.huawei.boostkit.spark.serialize.ColumnarBatchSerializer import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{addAllAndGetIterator, genSortParam} import nova.hetu.omniruntime.`type`.DataType -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.topn.OmniTopNWithExprOperatorFactory import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -108,7 +108,7 @@ case class ColumnarTakeOrderedAndProjectExec( def computeTopN(iter: Iterator[ColumnarBatch], schema: StructType): Iterator[ColumnarBatch] = { val startCodegen = System.nanoTime() val topNOperatorFactory = new OmniTopNWithExprOperatorFactory(sourceTypes, limit, - sortColsExp, ascendings, nullFirsts, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + sortColsExp, ascendings, nullFirsts, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val topNOperator = topNOperatorFactory.createOperator longMetric("omniCodegenTime") += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit]( _ => { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala index 851973bd5..cc8491268 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.{FunctionType, OmniWindowFrameBoundType, OmniWindowFrameType} -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.window.OmniWindowWithExprOperatorFactory import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.rdd.RDD @@ -343,7 +343,7 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], windowFunType, omminPartitionChannels, preGroupedChannels, sortCols, ascendings, nullFirsts, 0, 10000, windowArgKeys, windowFunRetType, windowFrameTypes, windowFrameStartTypes, windowFrameStartChannels, windowFrameEndTypes, - windowFrameEndChannels, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + windowFrameEndChannels, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val windowOperator = windowOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index b10b9cc36..e632831a0 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -21,7 +21,7 @@ import com.huawei.boostkit.spark.ColumnarPluginConfig import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.Optional -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import scala.collection.mutable import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor @@ -29,7 +29,7 @@ import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.checkOmniJsonW import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.vector.VecBatch import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory @@ -290,7 +290,7 @@ case class ColumnarBroadcastHashJoinExec( } val startBuildCodegen = System.nanoTime() val buildOpFactory = new OmniHashBuilderWithExprOperatorFactory(buildTypes, - buildJoinColsExp, filter, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + buildJoinColsExp, filter, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp = buildOpFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) @@ -307,7 +307,7 @@ case class ColumnarBroadcastHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, buildOutputCols, buildOutputTypes, OMNI_JOIN_TYPE_INNER, buildOpFactory, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala index ac5d5fdbc..cd81a8e25 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala @@ -19,12 +19,12 @@ package org.apache.spark.sql.execution.joins import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.Optional -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniHashBuilderWithExprOperatorFactory, OmniLookupJoinWithExprOperatorFactory} import nova.hetu.omniruntime.vector.VecBatch import org.apache.spark.TaskContext @@ -189,7 +189,7 @@ case class ColumnarShuffledHashJoinExec( } val startBuildCodegen = System.nanoTime() val buildOpFactory = new OmniHashBuilderWithExprOperatorFactory(buildTypes, - buildJoinColsExp, filter, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + buildJoinColsExp, filter, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val buildOp = buildOpFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) @@ -214,7 +214,7 @@ case class ColumnarShuffledHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, buildOutputCols, buildOutputTypes, OMNI_JOIN_TYPE_INNER, buildOpFactory, - new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala index 2a16a0dbf..dfe539662 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala @@ -21,13 +21,13 @@ import com.huawei.boostkit.spark.ColumnarPluginConfig import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.Optional -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.checkOmniJsonWhiteList import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.JoinType.OMNI_JOIN_TYPE_INNER -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.join.{OmniSmjBufferedTableWithExprOperatorFactory, OmniSmjStreamedTableWithExprOperatorFactory} import nova.hetu.omniruntime.vector.{BooleanVec, Decimal128Vec, DoubleVec, IntVec, LongVec, VarcharVec, Vec, VecBatch} import org.apache.spark.rdd.RDD @@ -173,13 +173,13 @@ class ColumnarSortMergeJoinExec( val filter: Optional[String] = Optional.ofNullable(filterString) val startStreamedCodegen = System.nanoTime() val streamedOpFactory = new OmniSmjStreamedTableWithExprOperatorFactory(streamedTypes, - streamedKeyColsExp, streamedOutputChannel, OMNI_JOIN_TYPE_INNER, filter, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + streamedKeyColsExp, streamedOutputChannel, OMNI_JOIN_TYPE_INNER, filter, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val streamedOp = streamedOpFactory.createOperator streamedCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startStreamedCodegen) val startBufferedCodegen = System.nanoTime() val bufferedOpFactory = new OmniSmjBufferedTableWithExprOperatorFactory(bufferedTypes, - bufferedKeyColsExp, bufferedOutputChannel, streamedOpFactory, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + bufferedKeyColsExp, bufferedOutputChannel, streamedOpFactory, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) val bufferedOp = bufferedOpFactory.createOperator bufferedCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBufferedCodegen) -- Gitee From fbcbdcdeef66d26e53a6333cf88d9d233283b99f Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Sat, 6 Aug 2022 06:14:00 +0000 Subject: [PATCH 10/19] bak --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 564f32cd9..c47c81a44 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # boostkit-bigdata -BoostKit Acceleration Packages —— Big Data Component Adaptation Layer 00 +BoostKit Acceleration Packages —— Big Data Component Adaptation Layer ## Notice The boostkit-bigdata repo contains acceleration plugins and patches for multiple pieces of open source software including openLooKeng, Apache Spark, Hive, and HBase. Using these plugins and patches depends on other pieces of open source software (which are available in the central repo). You shall understand and agree that when using the other pieces of open source software, you shall strictly comply with their open source licenses and fulfill the obligations specified in the licenses. Any vulnerabilities and security issues of the other open source software are resolved by the corresponding upstream communities based on their own vulnerability and security response mechanisms. Please pay attention to the notifications and version updates released by the upstream communities. The Kunpeng Compute community does not assume any responsibility for the vulnerabilities and security issues of the preceding open source software. -- Gitee From 0495db318b87b7b4edcd115e06df81fac32fb0b0 Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Mon, 8 Aug 2022 06:54:28 +0000 Subject: [PATCH 11/19] del jit in ColumnarPluginConfig --- .../scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index ecffee407..f780e6e5f 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -155,7 +155,6 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { val maxRowCount = conf.getConfString("spark.sql.columnar.maxRowCount", "20000").toInt - val enableJit: Boolean = conf.getConfString("spark.omni.sql.columnar.jit", "false").toBoolean val enableDecimalCheck : Boolean = conf.getConfString("spark.omni.sql.decimal.constraint.check", "true").toBoolean } -- Gitee From 4490f952bcd68b9345c19b2406e170c1a6f08f8e Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Tue, 9 Aug 2022 11:38:43 +0000 Subject: [PATCH 12/19] modify vec datatype --- .../cpp/src/jni/SparkJniWrapper.cpp | 12 ++++++++++-- .../cpp/src/shuffle/splitter.cpp | 6 ++++-- .../spark/shuffle/ColumnarShuffleWriterSuite.scala | 8 ++++---- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index 0f0351241..e9f7006ee 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -91,8 +91,16 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; for (int i = 0; i < size; ++i) { - inputDataPrecisions[i] = inputDataTpyes[i].GetPrecision(); - inputDataScales[i] = inputDataTpyes[i].GetScale(); + switch (inputDataTpyes[i]->GetId()){ + case OMNI_DECIMAL64: + case OMNI_DECIMAL128: { + inputDataPrecisions[i] = inputDataTpyes[i].GetPrecision(); + inputDataScales[i] = inputDataTpyes[i].GetScale(); + } + default: { + + } + } } inputDataTpyes.clear(); diff --git a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp index 5fdff5344..b28cced84 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/shuffle/splitter.cpp @@ -765,8 +765,10 @@ int Splitter::protoSpillPartition(int32_t partition_id, std::unique_ptrset_typeid_(CastShuffleTypeIdToVecType(vector_batch_col_types_[indexSchema])); LogsDebug("precision[indexSchema %d]: %d ", indexSchema, input_col_types.inputDataPrecisions[indexSchema]); LogsDebug("scale[indexSchema %d]: %d ", indexSchema, input_col_types.inputDataScales[indexSchema]); - vt->set_precision(input_col_types.inputDataPrecisions[indexSchema]); - vt->set_scale(input_col_types.inputDataScales[indexSchema]); + if(vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL128 || vt->typeid_() == spark::VecType::VEC_TYPE_DECIMAL64){ + vt->set_precision(input_col_types.inputDataPrecisions[indexSchema]); + vt->set_scale(input_col_types.inputDataScales[indexSchema]); + } } curBatch++; diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala index 6dddff494..c1794eae4 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala @@ -78,10 +78,10 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { shuffleHandle = new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) - val inputTypes = "[{\"id\":\"OMNI_INT\",\"width\":0,\"precision\":0,\"scale\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":\"OMNI_INT\",\"width\":0,\"precision\":0,\"scale\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":\"OMNI_DECIMAL64\",\"width\":0,\"precision\":18,\"scale\":3,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":\"OMNI_DECIMAL128\",\"width\":0,\"precision\":28,\"scale\":11,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}]" + val inputTypes = "[{\"id\":1,\"width\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + + "{\"id\":1,\"width\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + + "{\"id\":6,\"width\":0,\"precision\":18,\"scale\":3,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + + "{\"id\":7,\"width\":0,\"precision\":28,\"scale\":11,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}]" when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) -- Gitee From d0a1d54a5ea4a3fa2933b77e1f785ef0ba10c05a Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Tue, 9 Aug 2022 12:02:04 +0000 Subject: [PATCH 13/19] vecId modify --- .../omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index e9f7006ee..0c3572264 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -91,7 +91,7 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; for (int i = 0; i < size; ++i) { - switch (inputDataTpyes[i]->GetId()){ + switch (inputDataTpyes[i].GetId()){ case OMNI_DECIMAL64: case OMNI_DECIMAL128: { inputDataPrecisions[i] = inputDataTpyes[i].GetPrecision(); -- Gitee From 35c3ce2f1293476882ba20f33df437d2bb163b4b Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Tue, 9 Aug 2022 12:11:55 +0000 Subject: [PATCH 14/19] MofiyType --- .../omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index 0c3572264..223ad9ac4 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -86,7 +86,7 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( DataTypes inputVecTypes = Deserialize(inputTypeCharPtr); const int32_t *inputVecTypeIds = inputVecTypes.GetIds(); // - std::vector inputDataTpyes = inputVecTypes.Get(); + std::vector inputDataTpyes = inputVecTypes.Get(); int32_t size = inputDataTpyes.size(); uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; -- Gitee From 8c35483423c0685e54a5dfade50a9cf48acccfa9 Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Tue, 9 Aug 2022 14:36:53 +0000 Subject: [PATCH 15/19] sync jit code --- .../cpp/src/jni/SparkJniWrapper.cpp | 14 ++++----- .../cpp/test/utils/test_utils.cpp | 29 ++++++++++--------- .../shuffle/ColumnarShuffleWriterSuite.scala | 8 ++--- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index 223ad9ac4..706da8450 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -46,10 +46,10 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { return JNI_ERR; } - illegal_access_exception_class = + illegal_access_exception_class = CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); - split_result_class = + split_result_class = CreateGlobalClassReference(env, "Lcom/huawei/boostkit/spark/vectorized/SplitResult;"); split_result_constructor = GetMethodID(env, split_result_class, "", "(JJJJJ[J)V"); @@ -91,11 +91,11 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; for (int i = 0; i < size; ++i) { - switch (inputDataTpyes[i].GetId()){ + switch (inputDataTpyes[i]->GetId()){ case OMNI_DECIMAL64: case OMNI_DECIMAL128: { - inputDataPrecisions[i] = inputDataTpyes[i].GetPrecision(); - inputDataScales[i] = inputDataTpyes[i].GetScale(); + inputDataScales[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetScale(); + inputDataPrecisions[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetPrecision(); } default: { @@ -215,10 +215,10 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_stop( split_result_class, split_result_constructor, splitter->TotalComputePidTime(), splitter->TotalWriteTime(), splitter->TotalSpillTime(), splitter->TotalBytesWritten(), splitter->TotalBytesSpilled(), partition_length_arr); - + return split_result; } - + JNIEXPORT void JNICALL Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_close( JNIEnv* env, jobject, jlong splitter_id) { diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp index 5f7458566..70fe5d85d 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp @@ -17,21 +17,22 @@ * limitations under the License. */ +#include #include "test_utils.h" using namespace omniruntime::vec; -void ToVectorTypes(const int32_t *dataTypeIds, int32_t dataTypeCount, std::vector &dataTypes) +void ToVectorTypes(const int32_t *dataTypeIds, int32_t dataTypeCount, std::vector &dataTypes) { for (int i = 0; i < dataTypeCount; ++i) { if (dataTypeIds[i] == OMNI_VARCHAR) { - dataTypes.push_back(VarcharDataType(50)); + dataTypes.push_back(std::make_shared(50)); continue; } else if (dataTypeIds[i] == OMNI_CHAR) { - dataTypes.push_back(CharDataType(50)); + dataTypes.push_back(std::make_shared(50)); continue; } - dataTypes.push_back(DataType(dataTypeIds[i])); + dataTypes.push_back(std::make_shared(dataTypeIds[i])); } } @@ -41,7 +42,7 @@ VectorBatch* CreateInputData(const int32_t numRows, int64_t* allData) { auto *vecBatch = new VectorBatch(numCols, numRows); - vector inputTypes; + vector inputTypes; ToVectorTypes(inputTypeIds, numCols, inputTypes); vecBatch->NewVectors(omniruntime::vec::GetProcessGlobalVecAllocator(), inputTypes); for (int i = 0; i < numCols; ++i) { @@ -123,13 +124,13 @@ Vector *CreateVector(DataType &vecType, int32_t rowCount, va_list &args) } } -DictionaryVector *CreateDictionaryVector(DataType &vecType, int32_t rowCount, int32_t *ids, int32_t idsCount, ...) +DictionaryVector *CreateDictionaryVector(DataType &dataType, int32_t rowCount, int32_t *ids, int32_t idsCount, ...) { va_list args; va_start(args, idsCount); - Vector *dictionary = CreateVector(vecType, rowCount, args); + Vector *dictionary = CreateVector(dataType, rowCount, args); va_end(args); - auto vec = std::make_unique(dictionary, ids, idsCount).release(); + auto vec = new DictionaryVector(dictionary, ids, idsCount); delete dictionary; return vec; } @@ -199,15 +200,15 @@ Vector *buildVector(const DataType &aggType, int32_t rowNumber) } } -VectorBatch *CreateVectorBatch(DataTypes &types, int32_t rowCount, ...) +VectorBatch *CreateVectorBatch(const DataTypes &types, int32_t rowCount, ...) { int32_t typesCount = types.GetSize(); - VectorBatch *vectorBatch = std::make_unique(typesCount).release(); + auto *vectorBatch = new VectorBatch(typesCount, rowCount); va_list args; va_start(args, rowCount); for (int32_t i = 0; i < typesCount; i++) { - DataType type = types.Get()[i]; - vectorBatch->SetVector(i, CreateVector(type, rowCount, args)); + DataTypePtr type = types.GetType(i); + vectorBatch->SetVector(i, CreateVector(*type, rowCount, args)); } va_end(args); return vectorBatch; @@ -502,7 +503,7 @@ VectorBatch* CreateVectorBatch_2dictionaryCols_withPid(int partitionNum) { int32_t data0[dataSize] = {111, 112, 113, 114, 115, 116}; int64_t data1[dataSize] = {221, 222, 223, 224, 225, 226}; void *datas[2] = {data0, data1}; - DataTypes sourceTypes(std::vector({ IntDataType(), LongDataType()})); + DataTypes sourceTypes(std::vector({ std::make_unique(), std::make_unique()})); int32_t ids[] = {0, 1, 2, 3, 4, 5}; VectorBatch *vectorBatch = new VectorBatch(3, dataSize); VectorAllocator *allocator = omniruntime::vec::GetProcessGlobalVecAllocator(); @@ -514,7 +515,7 @@ VectorBatch* CreateVectorBatch_2dictionaryCols_withPid(int partitionNum) { if (i == 0) { vectorBatch->SetVector(i, intVectorTmp); } else { - omniruntime::vec::DataType dataType = sourceTypes.Get()[i - 1]; + omniruntime::vec::DataType dataType = *(sourceTypes.Get()[i - 1]); vectorBatch->SetVector(i, CreateDictionaryVector(dataType, dataSize, ids, dataSize, datas[i - 1])); } } diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala index c1794eae4..6e55a49f7 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/shuffle/ColumnarShuffleWriterSuite.scala @@ -78,10 +78,10 @@ class ColumnarShuffleWriterSuite extends SharedSparkSession { shuffleHandle = new ColumnarShuffleHandle[Int, ColumnarBatch](shuffleId = 0, dependency = dependency) - val inputTypes = "[{\"id\":1,\"width\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":1,\"width\":0,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":6,\"width\":0,\"precision\":18,\"scale\":3,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}," + - "{\"id\":7,\"width\":0,\"precision\":28,\"scale\":11,\"dateUnit\":\"DAY\",\"timeUnit\":\"SEC\"}]" + val inputTypes = "[{\"id\":1}," + + "{\"id\":1}," + + "{\"id\":6,\"precision\":18,\"scale\":3}," + + "{\"id\":7,\"precision\":28,\"scale\":11}]" when(dependency.partitioner).thenReturn(new HashPartitioner(numPartitions)) when(dependency.serializer).thenReturn(new JavaSerializer(sparkConf)) -- Gitee From 2959e6679fd2d07f1e4640526d797f6f4120c8d9 Mon Sep 17 00:00:00 2001 From: Jmist Date: Wed, 10 Aug 2022 09:48:46 +0800 Subject: [PATCH 16/19] modify ColumnarExpandExec jitenable --- .../org/apache/spark/sql/execution/ColumnarExpandExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala index e3039b3aa..9e2130c4b 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala @@ -1,6 +1,6 @@ package org.apache.spark.sql.execution -import com.huawei.boostkit.spark.Constant.{IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP} +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, getExprIdMap, rewriteToOmniJsonExpressionLiteral, sparkTypeToOmniType} import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.operator.config.OperatorConfig @@ -80,7 +80,7 @@ case class ColumnarExpandExec( child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() var projectOperators = omniExpressions.map(exps => { - val factory = new OmniProjectOperatorFactory(exps, omniInputTypes, 1, new OperatorConfig(IS_ENABLE_JIT, IS_SKIP_VERIFY_EXP)) + val factory = new OmniProjectOperatorFactory(exps, omniInputTypes, 1, new OperatorConfig(SpillConfig.NONE, IS_SKIP_VERIFY_EXP)) factory.createOperator }) omniCodegenTimeMetric += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) -- Gitee From e53b05b6c4e1fad7769b95da3e6fb79d56dce5e6 Mon Sep 17 00:00:00 2001 From: Jmist Date: Wed, 10 Aug 2022 10:00:47 +0800 Subject: [PATCH 17/19] import pkg --- .../org/apache/spark/sql/execution/ColumnarExpandExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala index 9e2130c4b..bac1f1408 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala @@ -3,7 +3,7 @@ package org.apache.spark.sql.execution import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, getExprIdMap, rewriteToOmniJsonExpressionLiteral, sparkTypeToOmniType} import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs -import nova.hetu.omniruntime.operator.config.OperatorConfig +import nova.hetu.omniruntime.operator.config.{OperatorConfig, SpillConfig} import nova.hetu.omniruntime.operator.project.OmniProjectOperatorFactory import nova.hetu.omniruntime.vector.{LongVec, Vec, VecBatch} import org.apache.spark.rdd.RDD -- Gitee From 11967d8d1df03463f33749dd3a604a74778b76d4 Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Wed, 10 Aug 2022 02:49:13 +0000 Subject: [PATCH 18/19] change switch to if --- .../cpp/src/jni/SparkJniWrapper.cpp | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp index 706da8450..49b39abb7 100644 --- a/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp +++ b/omnioperator/omniop-spark-extension/cpp/src/jni/SparkJniWrapper.cpp @@ -91,15 +91,9 @@ Java_com_huawei_boostkit_spark_jni_SparkJniWrapper_nativeMake( uint32_t *inputDataPrecisions = new uint32_t[size]; uint32_t *inputDataScales = new uint32_t[size]; for (int i = 0; i < size; ++i) { - switch (inputDataTpyes[i]->GetId()){ - case OMNI_DECIMAL64: - case OMNI_DECIMAL128: { - inputDataScales[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetScale(); - inputDataPrecisions[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetPrecision(); - } - default: { - - } + if(inputDataTpyes[i]->GetId() == OMNI_DECIMAL64 || inputDataTpyes[i]->GetId() == OMNI_DECIMAL128) { + inputDataScales[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetScale(); + inputDataPrecisions[i] = std::dynamic_pointer_cast(inputDataTpyes[i])->GetPrecision(); } } inputDataTpyes.clear(); -- Gitee From 4c599d2af0da96c93510b5ec8b93bafa7672a87e Mon Sep 17 00:00:00 2001 From: fengyaojie Date: Wed, 10 Aug 2022 03:03:06 +0000 Subject: [PATCH 19/19] modify test_utils head --- .../omniop-spark-extension/cpp/test/utils/test_utils.cpp | 1 - omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp index 70fe5d85d..875484a23 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.cpp @@ -17,7 +17,6 @@ * limitations under the License. */ -#include #include "test_utils.h" using namespace omniruntime::vec; diff --git a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h index 042dc4a14..428d81c46 100644 --- a/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h +++ b/omnioperator/omniop-spark-extension/cpp/test/utils/test_utils.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "../../src/shuffle/splitter.h" #include "../../src/jni/concurrent_map.h" -- Gitee