From bf6402ce8a1eccc8c32cbfcdcb2eca484a9a44bc Mon Sep 17 00:00:00 2001 From: chen-guang-wang <18767185082@163.com> Date: Wed, 1 Mar 2023 14:40:45 +0800 Subject: [PATCH] shj support leftouter join --- .../joins/ColumnarShuffledHashJoinExec.scala | 6 ++-- .../sql/execution/ColumnarJoinExecSuite.scala | 29 +++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala index 9eb666fcc..b147bafe5 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, SortOrder} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildSide} -import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, InnerLike, JoinType, LeftExistence, LeftSemi} +import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, InnerLike, JoinType, LeftExistence, LeftOuter, LeftSemi} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.metric.SQLMetrics @@ -92,7 +92,7 @@ case class ColumnarShuffledHashJoinExec( def buildCheck(): Unit = { joinType match { - case FullOuter | Inner | LeftSemi => + case FullOuter | Inner | LeftOuter | LeftSemi => case _ => throw new UnsupportedOperationException(s"Join-type[${joinType}] is not supported " + s"in ${this.nodeName}") @@ -158,7 +158,7 @@ case class ColumnarShuffledHashJoinExec( } val buildOutputCols: Array[Int] = joinType match { - case _: InnerLike | FullOuter => + case _: InnerLike | FullOuter | LeftOuter => buildOutput.indices.toArray case LeftExistence(_) => Array[Int]() diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarJoinExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarJoinExecSuite.scala index 136b28115..f4c32ae62 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarJoinExecSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarJoinExecSuite.scala @@ -302,6 +302,35 @@ class ColumnarJoinExecSuite extends ColumnarSparkPlanTest { ), false) } + test("validate columnar shuffledHashJoin left outer join happened") { + val res = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "leftouter") + assert( + res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffledHashJoinExec]).isDefined, + s"ColumnarShuffledHashJoinExec not happened," + + s" executedPlan as follows: \n${res.queryExecution.executedPlan}") + } + + test("columnar shuffledHashJoin left outer join is equal to native") { + val df = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "leftouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), + Row(" yeah ", "yeah", 10, 8.0, null, null, null, null), + Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), + Row(" add", "World", 8, 3.0, null, null, null, null) + ), false) + } + + test("columnar shuffledHashJoin left outer join is equal to native with null") { + val df = leftWithNull.join(rightWithNull.hint("SHUFFLE_HASH"), + col("q") === col("c"), "leftouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("", "Hello", null, 1.0, null, null, null, null), + Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), + Row(" yeah ", "yeah", 10, 8.0, null, null, null, null), + Row(" add", "World", 8, 3.0, null, null, null, null) + ), false) + } + test("ColumnarBroadcastHashJoin is not rolled back with not_equal filter expr") { val res = left.join(right.hint("broadcast"), left("a") <=> right("a")) assert( -- Gitee