diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index 49f60368853931c52052a4e556ef060a61cc6061..454bcb4ed6e3ad5af529a35975f14db01ff16814 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -66,7 +66,7 @@ object OmniExpressionAdaptor extends Logging { // only if its parseFormat is String (== 0) val returnCode: Long = new OmniExprVerify().exprVerifyNative( DataTypeSerializer.serialize(new Array[nova.hetu.omniruntime.`type`.DataType](0)), - 0, filterExpr, projections, projections.length, 1) + 0, filterExpr, projections, projections.length) if (returnCode == 0) { throw new UnsupportedOperationException(s"Unsupported OmniJson Expression \nfilter:${filterExpr} \nproejcts:${projections.mkString("=")}") } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala index cb23b68f09bb085d86e133d3ef40628e8c5ca4c2..c92b4ea3439753f501713c5138d50b26b2ae2729 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala @@ -196,7 +196,7 @@ case class ColumnarFilterExec(condition: Expression, child: SparkPlan) child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() val filterOperatorFactory = new OmniFilterAndProjectOperatorFactory( - omniExpression, omniInputTypes, seqAsJavaList(omniProjectIndices), 1, + omniExpression, omniInputTypes, seqAsJavaList(omniProjectIndices), new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val filterOperator = filterOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) @@ -295,7 +295,7 @@ case class ColumnarConditionProjectExec(projectList: Seq[NamedExpression], child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() val operatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniInputTypes, seqAsJavaList(omniExpressions), 1, + conditionExpression, omniInputTypes, seqAsJavaList(omniExpressions), new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val operator = operatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala index 27b05b16c017c43e73a0c3b6d4f05ea02d11f951..76ca5396523aac3b44c76c97a28745f13678d435 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala @@ -81,7 +81,7 @@ case class ColumnarExpandExec( child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val startCodegen = System.nanoTime() var projectOperators = omniExpressions.map(exps => { - val factory = new OmniProjectOperatorFactory(exps, omniInputTypes, 1, + val factory = new OmniProjectOperatorFactory(exps, omniInputTypes, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) factory.createOperator }) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala index 73091d069cb311f129fef45078d936ad365e14e0..c8d8418f52469677f9a8e8e14353a47ce4df7fc7 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala @@ -867,7 +867,7 @@ case class ColumnarMultipleOperatorExec( aggOperator.close() }) - val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, + val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator1 = projectOperatorFactory1.createOperator // close operator @@ -900,7 +900,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory1.close() }) - val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, + val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator2 = projectOperatorFactory2.createOperator // close operator @@ -934,7 +934,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory2.close() }) - val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, + val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator3 = projectOperatorFactory3.createOperator // close operator @@ -968,7 +968,7 @@ case class ColumnarMultipleOperatorExec( lookupOpFactory3.close() }) - val projectOperatorFactory4 = new OmniProjectOperatorFactory(proj4OmniExpressions, proj4OmniInputTypes, 1, + val projectOperatorFactory4 = new OmniProjectOperatorFactory(proj4OmniExpressions, proj4OmniInputTypes, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator4 = projectOperatorFactory4.createOperator // close operator @@ -1003,7 +1003,7 @@ case class ColumnarMultipleOperatorExec( }) val condOperatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, + conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val condOperator = condOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) @@ -1227,7 +1227,7 @@ case class ColumnarMultipleOperatorExec1( aggOperator.close() }) - val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, 1, + val projectOperatorFactory1 = new OmniProjectOperatorFactory(proj1OmniExpressions, proj1OmniInputTypes, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator1 = projectOperatorFactory1.createOperator // close operator @@ -1261,7 +1261,7 @@ case class ColumnarMultipleOperatorExec1( lookupOpFactory1.close() }) - val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, 1, + val projectOperatorFactory2 = new OmniProjectOperatorFactory(proj2OmniExpressions, proj2OmniInputTypes, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator2 = projectOperatorFactory2.createOperator // close operator @@ -1295,7 +1295,7 @@ case class ColumnarMultipleOperatorExec1( lookupOpFactory2.close() }) - val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, 1, + val projectOperatorFactory3 = new OmniProjectOperatorFactory(proj3OmniExpressions, proj3OmniInputTypes, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator3 = projectOperatorFactory3.createOperator // close operator @@ -1330,7 +1330,7 @@ case class ColumnarMultipleOperatorExec1( }) val condOperatorFactory = new OmniFilterAndProjectOperatorFactory( - conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), 1, + conditionExpression, omniCondInputTypes, seqAsJavaList(omniCondExpressions), new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val condOperator = condOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala index 0ccdbd6de43c3cbd62b455ef7adf6e487cc69c45..94c810f4908607414b124882fa54e815b1586b22 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarProjection.scala @@ -43,7 +43,7 @@ object ColumnarProjection { omniExpressions: Array[String], iter: Iterator[ColumnarBatch], schema: StructType): Iterator[ColumnarBatch] = { val startCodegen = System.nanoTime() - val projectOperatorFactory = new OmniProjectOperatorFactory(omniExpressions, omniInputTypes, 1, + val projectOperatorFactory = new OmniProjectOperatorFactory(omniExpressions, omniInputTypes, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val projectOperator = projectOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index cea0a1438b1c64a0d1372e2a272742aa9be08502..587d651abff1ccce8a6d8e78fe32b65b6ae6c1b5 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -297,7 +297,7 @@ object ColumnarShuffleExchangeExec extends Logging { // omni project val genHashExpression = genHashExpr() val omniExpr: String = genHashExpression(expressions, numPartitions, defaultMm3HashSeed, outputAttributes) - val factory = new OmniProjectOperatorFactory(Array(omniExpr), inputTypes, 1, + val factory = new OmniProjectOperatorFactory(Array(omniExpr), inputTypes, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val op = factory.createOperator() // close operator