diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala index 13d204a7d319152e85b13bf2f7527fa2d3a75c43..d1ab4aa7cce7e5c3af4b9f8a594b68928089e2d3 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala @@ -906,7 +906,7 @@ case class ColumnarMultipleOperatorExec( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, buildOpFactory1, - if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, + if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator @@ -940,7 +940,7 @@ case class ColumnarMultipleOperatorExec( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, buildOpFactory2, - if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, + if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() @@ -975,7 +975,7 @@ case class ColumnarMultipleOperatorExec( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, buildOpFactory3, - if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, + if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() @@ -1010,7 +1010,7 @@ case class ColumnarMultipleOperatorExec( buildOp4.getOutput val lookupOpFactory4 = new OmniLookupJoinWithExprOperatorFactory(probeTypes4, probeOutputCols4, probeHashColsExp4, buildOutputCols4, buildOutputTypes4, buildOpFactory4, - if (joinFilter4.nonEmpty) {Optional.of(joinFilter4.get)} else {Optional.empty()}, + if (joinFilter4.nonEmpty) {Optional.of(joinFilter4.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp4 = lookupOpFactory4.createOperator() @@ -1271,7 +1271,7 @@ case class ColumnarMultipleOperatorExec1( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, buildOpFactory1, - if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, + if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() @@ -1306,7 +1306,7 @@ case class ColumnarMultipleOperatorExec1( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, buildOpFactory2, - if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, + if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() @@ -1341,7 +1341,7 @@ case class ColumnarMultipleOperatorExec1( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, buildOpFactory3, - if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, + if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index 7da879c9b93671ed557b423a7abf4ebb060b943e..996ca0522ddcd00ee91c49101129bef1657afdd1 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -414,7 +414,7 @@ case class ColumnarBroadcastHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, - probeHashColsExp, buildOutputCols, buildOutputTypes, buildOpFactory, filter, + probeHashColsExp, buildOutputCols, buildOutputTypes, buildOpFactory, filter, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala index 71a9bf292dc43ccce019688400ffb7cdd6af51e8..e1bb986cbe2a5d0cde520619f5e747e4de8f96e4 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, Inner, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, ExplainUtils, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.util.SparkMemoryUtils import org.apache.spark.sql.execution.vectorized.OmniColumnVector @@ -241,7 +241,7 @@ case class ColumnarShuffledHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, - buildOutputCols, buildOutputTypes, buildOpFactory, filter, new OperatorConfig(SpillConfig.NONE, + buildOutputCols, buildOutputTypes, buildOpFactory, filter, buildPlan.isInstanceOf[ColumnarShuffleExchangeExec], new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen)