From 5d2bf409bf0ee4458e386fefc90c97be9b9c1150 Mon Sep 17 00:00:00 2001 From: maguoxiong Date: Thu, 17 Apr 2025 00:18:40 +0800 Subject: [PATCH] join opt --- .../sql/execution/ColumnarFileSourceScanExec.scala | 14 +++++++------- .../joins/ColumnarBroadcastHashJoinExec.scala | 2 +- .../joins/ColumnarShuffledHashJoinExec.scala | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala index 13d204a7d..d1ab4aa7c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala @@ -906,7 +906,7 @@ case class ColumnarMultipleOperatorExec( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, buildOpFactory1, - if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, + if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() // close operator @@ -940,7 +940,7 @@ case class ColumnarMultipleOperatorExec( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, buildOpFactory2, - if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, + if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() @@ -975,7 +975,7 @@ case class ColumnarMultipleOperatorExec( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, buildOpFactory3, - if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, + if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() @@ -1010,7 +1010,7 @@ case class ColumnarMultipleOperatorExec( buildOp4.getOutput val lookupOpFactory4 = new OmniLookupJoinWithExprOperatorFactory(probeTypes4, probeOutputCols4, probeHashColsExp4, buildOutputCols4, buildOutputTypes4, buildOpFactory4, - if (joinFilter4.nonEmpty) {Optional.of(joinFilter4.get)} else {Optional.empty()}, + if (joinFilter4.nonEmpty) {Optional.of(joinFilter4.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp4 = lookupOpFactory4.createOperator() @@ -1271,7 +1271,7 @@ case class ColumnarMultipleOperatorExec1( buildOp1.getOutput val lookupOpFactory1 = new OmniLookupJoinWithExprOperatorFactory(probeTypes1, probeOutputCols1, probeHashColsExp1, buildOutputCols1, buildOutputTypes1, buildOpFactory1, - if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, + if (joinFilter1.nonEmpty) {Optional.of(joinFilter1.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp1 = lookupOpFactory1.createOperator() @@ -1306,7 +1306,7 @@ case class ColumnarMultipleOperatorExec1( buildOp2.getOutput val lookupOpFactory2 = new OmniLookupJoinWithExprOperatorFactory(probeTypes2, probeOutputCols2, probeHashColsExp2, buildOutputCols2, buildOutputTypes2, buildOpFactory2, - if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, + if (joinFilter2.nonEmpty) {Optional.of(joinFilter2.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp2 = lookupOpFactory2.createOperator() @@ -1341,7 +1341,7 @@ case class ColumnarMultipleOperatorExec1( buildOp3.getOutput val lookupOpFactory3 = new OmniLookupJoinWithExprOperatorFactory(probeTypes3, probeOutputCols3, probeHashColsExp3, buildOutputCols3, buildOutputTypes3, buildOpFactory3, - if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, + if (joinFilter3.nonEmpty) {Optional.of(joinFilter3.get)} else {Optional.empty()}, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp3 = lookupOpFactory3.createOperator() diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index 7da879c9b..996ca0522 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -414,7 +414,7 @@ case class ColumnarBroadcastHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, - probeHashColsExp, buildOutputCols, buildOutputTypes, buildOpFactory, filter, + probeHashColsExp, buildOutputCols, buildOutputTypes, buildOpFactory, filter, false, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala index 71a9bf292..e1bb986cb 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, FullOuter, Inner, JoinType, LeftAnti, LeftExistence, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, ExplainUtils, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.util.SparkMemoryUtils import org.apache.spark.sql.execution.vectorized.OmniColumnVector @@ -241,7 +241,7 @@ case class ColumnarShuffledHashJoinExec( val startLookupCodegen = System.nanoTime() val lookupOpFactory = new OmniLookupJoinWithExprOperatorFactory(probeTypes, probeOutputCols, probeHashColsExp, - buildOutputCols, buildOutputTypes, buildOpFactory, filter, new OperatorConfig(SpillConfig.NONE, + buildOutputCols, buildOutputTypes, buildOpFactory, filter, buildPlan.isInstanceOf[ColumnarShuffleExchangeExec], new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) -- Gitee