From 643da4c1e095d44a149146788eb13198327092b9 Mon Sep 17 00:00:00 2001 From: maxiaoqi2020 Date: Thu, 21 Jul 2022 16:32:10 +0800 Subject: [PATCH 1/2] add UT test for AQE feature --- .../adaptive/AdaptiveQueryExecSuite.scala | 787 ++++++++++++++++++ 1 file changed, 787 insertions(+) create mode 100644 omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala new file mode 100644 index 000000000..4b05f850e --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -0,0 +1,787 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.Partition +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.catalyst.optimizer.BuildRight +import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarSparkPlanTest, PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffleColumnarRDD, SparkPlan} +import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ColumnarBroadcastHashJoinExec, ColumnarSortMergeJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate +import org.apache.spark.sql.functions.when +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils + +import java.io.File +import java.net.URI + +class AdaptiveQueryExecSuite extends ColumnarSparkPlanTest + with AdaptiveSparkPlanHelper { + + import testImplicits._ + + setupTestData() + + private def runAdaptiveAndVerifyResult(query: String): (SparkPlan, SparkPlan) = { + var finalPlanCnt = 0 + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) => + if (sparkPlanInfo.simpleString.startsWith( + "AdaptiveSparkPlan isFinalPlan=true")) { + finalPlanCnt += 1 + } + case _ => // ignore other events + } + } + } + spark.sparkContext.addSparkListener(listener) + + val dfAdaptive = sql(query) + val planBefore = dfAdaptive.queryExecution.executedPlan + assert(planBefore.toString.startsWith("AdaptiveSparkPlan isFinalPlan=false")) + val result = dfAdaptive.collect() + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val df = sql(query) + checkAnswer(df, result) + } + val planAfter = dfAdaptive.queryExecution.executedPlan + assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) + val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan + + spark.sparkContext.listenerBus.waitUntilEmpty() + // AQE will post `SparkListenerSQLAdaptiveExecutionUpdate` twice in case of subqueries that + // exist out of query stages. + val expectedFinalPlanCnt = adaptivePlan.find(_.subqueries.nonEmpty).map(_ => 2).getOrElse(1) + assert(finalPlanCnt == expectedFinalPlanCnt) + spark.sparkContext.removeSparkListener(listener) + + val exchanges = adaptivePlan.collect { + case e: Exchange => e + } + assert(exchanges.isEmpty, "The final plan should not contain any Exchange node.") + (dfAdaptive.queryExecution.sparkPlan, adaptivePlan) + } + + private def findTopLevelBroadcastHashJoin(plan: SparkPlan): Seq[BroadcastHashJoinExec] = { + collect(plan) { + case j: BroadcastHashJoinExec => j + } + } + + private def findTopLevelColumnarBroadcastHashJoin(plan: SparkPlan) + : Seq[ColumnarBroadcastHashJoinExec] = { + collect(plan) { + case j: ColumnarBroadcastHashJoinExec => j + } + } + + private def findTopLevelSortMergeJoin(plan: SparkPlan): Seq[SortMergeJoinExec] = { + collect(plan) { + case j: SortMergeJoinExec => j + } + } + + private def findTopLevelColumnarSortMergeJoin(plan: SparkPlan): Seq[ColumnarSortMergeJoin] = { + collect(plan) { + case j: ColumnarSortMergeJoin => j + } + } + + private def findTopLevelBaseJoin(plan: SparkPlan): Seq[BaseJoinExec] = { + collect(plan) { + case j: BaseJoinExec => j + } + } + + private def findReusedExchange(plan: SparkPlan): Seq[ReusedExchangeExec] = { + collectWithSubqueries(plan) { + case ShuffleQueryStageExec(_, e: ReusedExchangeExec) => e + case BroadcastQueryStageExec(_, e: ReusedExchangeExec) => e + } + } + + private def findReusedSubquery(plan: SparkPlan): Seq[ReusedSubqueryExec] = { + collectWithSubqueries(plan) { + case e: ReusedSubqueryExec => e + } + } + + private def checkNumLocalShuffleReaders( + plan: SparkPlan, numShufflesWithoutLocalReader: Int = 0): Unit = { + val numShuffles = collect(plan) { + case s: ShuffleQueryStageExec => s + }.length + + val numLocalReaders = collect(plan) { + case reader: ColumnarCustomShuffleReaderExec if reader.isLocalReader => reader + } + numLocalReaders.foreach { r => + val rdd: RDD[ColumnarBatch] = r.executeColumnar() + val parts: Array[Partition] = rdd.partitions + assert(parts.forall(rdd.preferredLocations(_).nonEmpty)) + } + assert(numShuffles === (numLocalReaders.length + numShufflesWithoutLocalReader)) + } + + private def checkInitialPartitionNum(df: Dataset[_], numPartition: Int): Unit = { + // repartition obeys initialPartitionNum when adaptiveExecutionEnabled + val plan = df.queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffle.size == 1) + assert(shuffle(0).outputPartitioning.numPartitions == numPartition) + } + + test("Change merge join to broadcast join") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData join testData2 ON key = a where value = '1'") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + checkNumLocalShuffleReaders(adaptivePlan) + } + } + + test("Reuse the parallelism of CoalescedShuffleReaderExec in LocalShuffleReaderExec") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "10") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData join testData2 ON key = a where value = '1'") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + val localReaders = collect(adaptivePlan) { + case reader: ColumnarCustomShuffleReaderExec if reader.isLocalReader => reader + } + assert(localReaders.length == 2) + val localShuffleRDD0 = localReaders(0).executeColumnar().asInstanceOf[ShuffledColumnarRDD] + val localShuffleRDD1 = localReaders(1).executeColumnar().asInstanceOf[ShuffledColumnarRDD] + // The pre-shuffle partition size is [0, 0, 0, 72, 0] + // We exclude the 0-size partitions, so only one partition, advisoryParallelism = 1 + // the final parallelism is + // math.max(1, advisoryParallelism / numMappers): math.max(1, 1/2) = 1 + // and the partitions length is 1 * numMappers = 2 + assert(localShuffleRDD0.getPartitions.length == 2) + // The pre-shuffle partition size is [0, 72, 0, 72, 126] + // We exclude the 0-size partitions, so only 3 partition, advisoryParallelism = 3 + // the final parallelism is + // math.max(1, advisoryParallelism / numMappers): math.max(1, 3/2) = 1 + // and the partitions length is 1 * numMappers = 2 + assert(localShuffleRDD1.getPartitions.length == 2) + } + } + + test("Reuse the default parallelism in LocalShuffleReaderExec") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "false") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData join testData2 ON key = a where value = '1'") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + val localReaders = collect(adaptivePlan) { + case reader: ColumnarCustomShuffleReaderExec if reader.isLocalReader => reader + } + assert(localReaders.length == 2) + val localShuffleRDD0 = localReaders(0).executeColumnar().asInstanceOf[ShuffledColumnarRDD] + val localShuffleRDD1 = localReaders(1).executeColumnar().asInstanceOf[ShuffledColumnarRDD] + // the final parallelism is math.max(1, numReduces / numMappers): math.max(1, 5/2) = 2 + // and the partitions length is 2 * numMappers = 4 + assert(localShuffleRDD0.getPartitions.length == 4) + // the final parallelism is math.max(1, numReduces / numMappers): math.max(1, 5/2) = 2 + // and the partitions length is 2 * numMappers = 4 + assert(localShuffleRDD1.getPartitions.length == 4) + } + } + + test("Empty stage coalesced to 1-partition RDD") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true") { + val df1 = spark.range(10).withColumn("a", 'id) + val df2 = spark.range(10).withColumn("b", 'id) + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val testDf = df1.where('a > 10).join(df2.where('b > 10), Seq("id"), "left_outer") + .groupBy('a).count() + checkAnswer(testDf, Seq()) + val plan = testDf.queryExecution.executedPlan + assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined) + val coalescedReaders = collect(plan) { + case r: ColumnarCustomShuffleReaderExec => r + } + assert(coalescedReaders.length == 3) + coalescedReaders.foreach(r => assert(r.partitionSpecs.length == 1)) + } + + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { + val testDf = df1.where('a > 10).join(df2.where('b > 10), Seq("id"), "left_outer") + .groupBy('a).count() + checkAnswer(testDf, Seq()) + val plan = testDf.queryExecution.executedPlan + assert(find(plan)(_.isInstanceOf[ColumnarBroadcastHashJoinExec]).isDefined) + val coalescedReaders = collect(plan) { + case r: ColumnarCustomShuffleReaderExec => r + } + assert(coalescedReaders.length == 3, s"$plan") + coalescedReaders.foreach(r => assert(r.isLocalReader || r.partitionSpecs.length == 1)) + } + } + } + + test("Scalar subquery") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData join testData2 ON key = a " + + "where value = (SELECT max(a) from testData3)") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + checkNumLocalShuffleReaders(adaptivePlan) + } + } + + // Currently, OmniFilterExec will fall back to Filter, if AQE is enabled, it will case error + ignore("Scalar subquery in later stages") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData join testData2 ON key = a " + + "where (value + a) = (SELECT max(a) from testData3)") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + + checkNumLocalShuffleReaders(adaptivePlan) + } + } + + test("multiple joins") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + """ + |WITH t4 AS ( + | SELECT * FROM lowercaseData t2 JOIN testData3 t3 ON t2.n = t3.a where t2.n = '1' + |) + |SELECT * FROM testData + |JOIN testData2 t2 ON key = t2.a + |JOIN t4 ON t2.b = t4.a + |WHERE value = 1 + """.stripMargin) + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 3) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 3) + + // A possible resulting query plan: + // BroadcastHashJoin + // +- BroadcastExchange + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- BroadcastHashJoin + // +- BroadcastExchange + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- BroadcastHashJoin + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- BroadcastExchange + // +-LocalShuffleReader* + // +- ShuffleExchange + + // After applied the 'OptimizeLocalShuffleReader' rule, we can convert all the four + // shuffle reader to local shuffle reader in the bottom two 'BroadcastHashJoin'. + // For the top level 'BroadcastHashJoin', the probe side is not shuffle query stage + // and the build side shuffle query stage is also converted to local shuffle reader. + checkNumLocalShuffleReaders(adaptivePlan) + } + } + + test("multiple joins with aggregate") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + """ + |WITH t4 AS ( + | SELECT * FROM lowercaseData t2 JOIN ( + | select a, sum(b) from testData3 group by a + | ) t3 ON t2.n = t3.a where t2.n = '1' + |) + |SELECT * FROM testData + |JOIN testData2 t2 ON key = t2.a + |JOIN t4 ON t2.b = t4.a + |WHERE value = 1 + """.stripMargin) + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 3) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 3) + + // A possible resulting query plan: + // BroadcastHashJoin + // +- BroadcastExchange + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- BroadcastHashJoin + // +- BroadcastExchange + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- BroadcastHashJoin + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- BroadcastExchange + // +-HashAggregate + // +- CoalescedShuffleReader + // +- ShuffleExchange + + // The shuffle added by Aggregate can't apply local reader. + checkNumLocalShuffleReaders(adaptivePlan, 1) + } + } + + // TODO: issue: ColumnarToRow does not implement doExecuteBroadcast + ignore("multiple joins with aggregate 2") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + """ + |WITH t4 AS ( + | SELECT * FROM lowercaseData t2 JOIN ( + | select a, max(b) b from testData2 group by a + | ) t3 ON t2.n = t3.b + |) + |SELECT * FROM testData + |JOIN testData2 t2 ON key = t2.a + |JOIN t4 ON value = t4.a + |WHERE value = 1 + """.stripMargin) + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 3) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 3) + + // A possible resulting query plan: + // BroadcastHashJoin + // +- BroadcastExchange + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- BroadcastHashJoin + // +- BroadcastExchange + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- LocalShuffleReader* + // +- ShuffleExchange + // +- BroadcastHashJoin + // +- Filter + // +- HashAggregate + // +- CoalescedShuffleReader + // +- ShuffleExchange + // +- BroadcastExchange + // +-LocalShuffleReader* + // +- ShuffleExchange + + // The shuffle added by Aggregate can't apply local reader. + checkNumLocalShuffleReaders(adaptivePlan, 1) + } + } + + test("Exchange reuse") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT value FROM testData join testData2 ON key = a " + + "join (SELECT value v from testData join testData3 ON key = a) on value = v") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 3) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 3) + // TODO: check num of localReader + // There is still a SMJ, and its two shuffles can't apply local reader. + checkNumLocalShuffleReaders(adaptivePlan, 0) + // Even with local shuffle reader, the query stage reuse can also work. + val ex = findReusedExchange(adaptivePlan) + assert(ex.size == 1) + } + } + + test("Exchange reuse with subqueries") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT a FROM testData join testData2 ON key = a " + + "where value = (SELECT max(a) from testData join testData2 ON key = a)") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + checkNumLocalShuffleReaders(adaptivePlan) + // Even with local shuffle reader, the query stage reuse can also work. + val ex = findReusedExchange(adaptivePlan) + assert(ex.size == 1) + } + } + + test("Exchange reuse across subqueries") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", + SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT a FROM testData join testData2 ON key = a " + + "where value >= (SELECT max(a) from testData join testData2 ON key = a) " + + "and a <= (SELECT max(a) from testData join testData2 ON key = a)") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + checkNumLocalShuffleReaders(adaptivePlan) + // Even with local shuffle reader, the query stage reuse can also work. + val ex = findReusedExchange(adaptivePlan) + assert(ex.nonEmpty) + val sub = findReusedSubquery(adaptivePlan) + assert(sub.isEmpty) + } + } + + test("Subquery reuse") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT a FROM testData join testData2 ON key = a " + + "where value >= (SELECT max(a) from testData join testData2 ON key = a) " + + "and a <= (SELECT max(a) from testData join testData2 ON key = a)") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + checkNumLocalShuffleReaders(adaptivePlan) + // Even with local shuffle reader, the query stage reuse can also work. + val ex = findReusedExchange(adaptivePlan) + assert(ex.isEmpty) + val sub = findReusedSubquery(adaptivePlan) + assert(sub.nonEmpty) + } + } + + test("Broadcast exchange reuse across subqueries") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000", + SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT a FROM testData join testData2 ON key = a " + + "where value >= (" + + "SELECT /*+ broadcast(testData2) */ max(key) from testData join testData2 ON key = a) " + + "and a <= (" + + "SELECT /*+ broadcast(testData2) */ max(value) from testData join testData2 ON key = a)") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + checkNumLocalShuffleReaders(adaptivePlan) + // Even with local shuffle reader, the query stage reuse can also work. + val ex = findReusedExchange(adaptivePlan) + assert(ex.nonEmpty) + assert(ex.head.child.isInstanceOf[ColumnarBroadcastExchangeExec]) + val sub = findReusedSubquery(adaptivePlan) + assert(sub.isEmpty) + } + } + + // TODO: OmniProjectExec does not support Cast Expression + // TODO: OmniBroadCastHashJoin does not support Left-anti join + ignore("Union/Except/Intersect queries") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + runAdaptiveAndVerifyResult( + """ + |SELECT * FROM testData + |EXCEPT + |SELECT * FROM testData2 + |UNION ALL + |SELECT * FROM testData + |INTERSECT ALL + |SELECT * FROM testData2 + """.stripMargin) + } + } + + // TODO: OmniBroadCastHashJoin does not support Left-semi join + ignore("Subquery de-correlation in Union queries") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + withTempView("a", "b") { + Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("a") + Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("b") + + runAdaptiveAndVerifyResult( + """ + |SELECT id,num,source FROM ( + | SELECT id, num, 'a' as source FROM a + | UNION ALL + | SELECT id, num, 'b' as source FROM b + |) AS c WHERE c.id IN (SELECT id FROM b WHERE num = 2) + """.stripMargin) + } + } + } + + test("Avoid plan change if cost is greater") { + val origPlan = sql("SELECT * FROM testData " + + "join testData2 t2 ON key = t2.a " + + "join testData2 t3 on t2.a = t3.a where t2.b = 1").queryExecution.executedPlan + + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", + SQLConf.BROADCAST_HASH_JOIN_OUTPUT_PARTITIONING_EXPAND_LIMIT.key -> "0") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData " + + "join testData2 t2 ON key = t2.a " + + "join testData2 t3 on t2.a = t3.a where t2.b = 1") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 2) + val smj2 = findTopLevelSortMergeJoin(adaptivePlan) + assert(smj2.size == 2, origPlan.toString) + } + } + + test("Change merge join to broadcast join without local shuffle reader") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.LOCAL_SHUFFLE_READER_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + """ + |SELECT * FROM testData t1 join testData2 t2 + |ON t1.key = t2.a join testData3 t3 on t2.a = t3.a + |where t1.value = 1 + """.stripMargin + ) + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 2) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 2) + // TODO: check why we don't have SMJ + // There is still a SMJ, and its two shuffles can't apply local reader. + checkNumLocalShuffleReaders(adaptivePlan, 0) + } + } + + test("Avoid changing merge join to broadcast join if too many empty partitions on build plan") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") { + // `testData` is small enough to be broadcast but has empty partition ratio over the config. + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData join testData2 ON key = a where value = '1'") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) + assert(bhj.isEmpty) + } + // It is still possible to broadcast `testData2`. + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData join testData2 ON key = a where value = '1'") + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val bhj = findTopLevelColumnarBroadcastHashJoin(adaptivePlan) + assert(bhj.size == 1) + assert(bhj.head.buildSide == BuildRight) + } + } + } + + test("SPARK-29906: AQE should not introduce extra shuffle for outermost limit") { + var numStages = 0 + val listener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + numStages = jobStart.stageInfos.length + } + } + try { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + spark.sparkContext.addSparkListener(listener) + spark.range(0, 100, 1, numPartitions = 10).take(1) + spark.sparkContext.listenerBus.waitUntilEmpty() + // Should be only one stage since there is no shuffle. + assert(numStages == 1) + } + } finally { + spark.sparkContext.removeSparkListener(listener) + } + } + + test("SPARK-30524: Do not optimize skew join if introduce additional shuffle") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100") { + withTempView("skewData1", "skewData2") { + spark + .range(0, 1000, 1, 10) + .selectExpr("id % 3 as key1", "id as value1") + .createOrReplaceTempView("skewData1") + spark + .range(0, 1000, 1, 10) + .selectExpr("id % 1 as key2", "id as value2") + .createOrReplaceTempView("skewData2") + + def checkSkewJoin(query: String, optimizeSkewJoin: Boolean): Unit = { + val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(query) + val innerSmj = findTopLevelColumnarSortMergeJoin(innerAdaptivePlan) + assert(innerSmj.size == 1 && innerSmj.head.isSkewJoin == optimizeSkewJoin) + } + + checkSkewJoin( + "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2", true) + // Additional shuffle introduced, so disable the "OptimizeSkewedJoin" optimization + checkSkewJoin( + "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 GROUP BY key1", false) + } + } + } + + test("SPARK-29544: adaptive skew join with different join types") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", + SQLConf.SHUFFLE_PARTITIONS.key -> "100", + SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") { + withTempView("skewData1", "skewData2") { + spark + .range(0, 1000, 1, 10) + .select( + when('id < 250, 249) + .when('id >= 750, 1000) + .otherwise('id).as("key1"), + 'id as "value1") + .createOrReplaceTempView("skewData1") + spark + .range(0, 1000, 1, 10) + .select( + when('id < 250, 249) + .otherwise('id).as("key2"), + 'id as "value2") + .createOrReplaceTempView("skewData2") + + def checkSkewJoin( + joins: Seq[SortMergeJoinExec], + leftSkewNum: Int, + rightSkewNum: Int): Unit = { + assert(joins.size == 1 && joins.head.isSkewJoin) + assert(joins.head.left.collect { + case r: ColumnarCustomShuffleReaderExec => r + }.head.partitionSpecs.collect { + case p: PartialReducerPartitionSpec => p.reducerIndex + }.distinct.length == leftSkewNum) + assert(joins.head.right.collect { + case r: ColumnarCustomShuffleReaderExec => r + }.head.partitionSpecs.collect { + case p: PartialReducerPartitionSpec => p.reducerIndex + }.distinct.length == rightSkewNum) + } + + // skewed inner join optimization + val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM skewData1 join skewData2 ON key1 = key2") + val innerSmj = findTopLevelColumnarSortMergeJoin(innerAdaptivePlan) + checkSkewJoin(innerSmj, 1, 1) + + // TODO: Currently, we don't support leftSmj and rightSmj + // skewed left outer join optimization +// val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult( +// "SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2") +// val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan) +// checkSkewJoin(leftSmj, 2, 0) + + // skewed right outer join optimization +// val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult( +// "SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2") +// val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan) +// checkSkewJoin(rightSmj, 0, 1) + } + } + } + + test("SPARK-30291: AQE should catch the exceptions when doing materialize") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + withTable("bucketed_table") { + val df1 = + (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k").as("df1") + df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") + val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath + val tableDir = new File(warehouseFilePath, "bucketed_table") + Utils.deleteRecursively(tableDir) + df1.write.parquet(tableDir.getAbsolutePath) + + val aggregated = spark.table("bucketed_table").groupBy("i").count() + val error = intercept[Exception] { + aggregated.count() + } + assert(error.getMessage contains "Invalid bucket file") + assert(error.getSuppressed.size === 0) + } + } + } + + // TODO: OmniBroadCastHashJoin does not support Left-anti join + ignore("SPARK-30403: AQE should handle InSubquery") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + runAdaptiveAndVerifyResult("SELECT * FROM testData LEFT OUTER join testData2" + + " ON key = a AND key NOT IN (select a from testData3) where value = '1'" + ) + } + } \ No newline at end of file -- Gitee From de77b4a8ff5eb2c688823d9c8698aebdb8a0b2c3 Mon Sep 17 00:00:00 2001 From: maxiaoqi2020 Date: Thu, 21 Jul 2022 19:07:26 +0800 Subject: [PATCH 2/2] add UT testcase CoalesceShufflePartitionsSuite --- .../CoalesceShufflePartitionsSuite.scala | 397 ++++++++++++++++++ .../adaptive/AdaptiveQueryExecSuite.scala | 13 +- 2 files changed, 404 insertions(+), 6 deletions(-) create mode 100644 omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala new file mode 100644 index 000000000..84cb078c6 --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.UI.UI_ENABLED +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} + +class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAll { + + private var originalActiveSparkSession: Option[SparkSession] = _ + private var originalInstantiatedSparkSession: Option[SparkSession] = _ + + override protected def beforeAll(): Unit = { + super.beforeAll() + originalActiveSparkSession = SparkSession.getActiveSession + originalInstantiatedSparkSession = SparkSession.getDefaultSession + + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + } + + override protected def afterAll(): Unit = { + try { + // Set these states back. + originalActiveSparkSession.foreach(ctx => SparkSession.setActiveSession(ctx)) + originalInstantiatedSparkSession.foreach(ctx => SparkSession.setDefaultSession(ctx)) + } finally { + super.afterAll() + } + } + + val numInputPartitions: Int = 10 + + def withSparkSession( + f: SparkSession => Unit, + targetPostShuffleInputSize: Int, + minNumPostShufflePartitions: Option[Int]): Unit = { + val sparkConf = + new SparkConf(false) + .setMaster("local[*]") + .setAppName("test") + .set(UI_ENABLED, false) + .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") + .set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5") + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") + .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") + .set( + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, + targetPostShuffleInputSize.toString) + .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, "com.huawei.boostkit.spark.ColumnarPlugin") + .set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + minNumPostShufflePartitions match { + case Some(numPartitions) => + sparkConf.set(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, numPartitions.toString) + case None => + sparkConf.set(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key, "1") + } + + val spark = SparkSession.builder() + .config(sparkConf) + .getOrCreate() + try f(spark) finally spark.stop() + } + + Seq(Some(5)).foreach { minNumPostShufflePartitions => + val testNameNote = minNumPostShufflePartitions match { + case Some(numPartitions) => "(minNumPostShufflePartitions: " + numPartitions + ")" + } + + test(s"determining the number of reducers: aggregate operator$testNameNote") { + val test = { spark: SparkSession => + val df = + spark + .range(0, 1000, 1, numInputPartitions) + .selectExpr("id % 20 as key", "id as value") + val agg = df.groupBy("key").count() + + // Check the answer first. + QueryTest.checkAnswer( + agg, + spark.range(0, 20).selectExpr("id", "50 as cnt").collect()) + + // Then, let's look at the number of post-shuffle partitions estimated + // by the ExchangeCoordinator. + val finalPlan = agg.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + val shuffleReaders = finalPlan.collect { + case r @ CoalescedShuffleReader() => r + } + assert(shuffleReaders.length === 1) + minNumPostShufflePartitions match { + case Some(numPartitions) => + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === numPartitions) + } + } + } + + withSparkSession(test, 2000, minNumPostShufflePartitions) + } + + test(s"determining the number of reducers: join operator$testNameNote") { + val test = { spark: SparkSession => + val df1 = + spark + .range(0, 1000, 1, numInputPartitions) + .selectExpr("id % 500 as key1", "id as value1") + val df2 = + spark + .range(0, 1000, 1, numInputPartitions) + .selectExpr("id % 500 as key2", "id as value2") + + val join = df1.join(df2, col("key1") === col("key2")).select(col("key1"), col("value2")) + + // Check the answer first. + val expectedAnswer = + spark + .range(0, 1000) + .selectExpr("id % 500 as key", "id as value") + .union(spark.range(0, 1000).selectExpr("id % 500 as key", "id as value")) + QueryTest.checkAnswer( + join, + expectedAnswer.collect()) + + // Then, let's look at the number of post-shuffle partitions estimated + // by the ExchangeCoordinator. + val finalPlan = join.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + val shuffleReaders = finalPlan.collect { + case r @ CoalescedShuffleReader() => r + } + assert(shuffleReaders.length === 2) + minNumPostShufflePartitions match { + case Some(numPartitions) => + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === numPartitions) + } + } + } + + withSparkSession(test, 16384, minNumPostShufflePartitions) + } + + test(s"determining the number of reducers: complex query 1$testNameNote") { + val test: (SparkSession) => Unit = { spark: SparkSession => + val df1 = + spark + .range(0, 1000, 1, numInputPartitions) + .selectExpr("id % 500 as key1", "id as value1") + .groupBy("key1") + .count() + .toDF("key1", "cnt1") + val df2 = + spark + .range(0, 1000, 1, numInputPartitions) + .selectExpr("id % 500 as key2", "id as value2") + .groupBy("key2") + .count() + .toDF("key2", "cnt2") + + val join = df1.join(df2, col("key1") === col("key2")).select(col("key1"), col("cnt2")) + + // Check the answer first. + val expectedAnswer = + spark + .range(0, 500) + .selectExpr("id", "2 as cnt") + QueryTest.checkAnswer( + join, + expectedAnswer.collect()) + + // Then, let's look at the number of post-shuffle partitions estimated + // by the ExchangeCoordinator. + val finalPlan = join.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + val shuffleReaders = finalPlan.collect { + case r @ CoalescedShuffleReader() => r + } + assert(shuffleReaders.length === 2) + minNumPostShufflePartitions match { + case Some(numPartitions) => + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === numPartitions) + } + } + } + + withSparkSession(test, 16384, minNumPostShufflePartitions) + } + + test(s"determining the number of reducers: complex query 2$testNameNote") { + val test: (SparkSession) => Unit = { spark: SparkSession => + val df1 = + spark + .range(0, 1000, 1, numInputPartitions) + .selectExpr("id % 500 as key1", "id as value1") + .groupBy("key1") + .count() + .toDF("key1", "cnt1") + val df2 = + spark + .range(0, 1000, 1, numInputPartitions) + .selectExpr("id % 500 as key2", "id as value2") + + val join = + df1 + .join(df2, col("key1") === col("key2")) + .select(col("key1"), col("cnt1"), col("value2")) + + // Check the answer first. + val expectedAnswer = + spark + .range(0, 1000) + .selectExpr("id % 500 as key", "2 as cnt", "id as value") + QueryTest.checkAnswer( + join, + expectedAnswer.collect()) + + // Then, let's look at the number of post-shuffle partitions estimated + // by the ExchangeCoordinator. + val finalPlan = join.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + val shuffleReaders = finalPlan.collect { + case r @ CoalescedShuffleReader() => r + } + assert(shuffleReaders.length === 2) + minNumPostShufflePartitions match { + case Some(numPartitions) => + shuffleReaders.foreach { reader => + assert(reader.outputPartitioning.numPartitions === numPartitions) + } + } + } + + withSparkSession(test, 12000, minNumPostShufflePartitions) + } + + test(s"determining the number of reducers: plan already partitioned$testNameNote") { + val test: SparkSession => Unit = { spark: SparkSession => + try { + spark.range(1000).write.bucketBy(30, "id").saveAsTable("t") + // `df1` is hash partitioned by `id`. + val df1 = spark.read.table("t") + val df2 = + spark + .range(0, 1000, 1, numInputPartitions) + .selectExpr("id % 500 as key2", "id as value2") + + val join = df1.join(df2, col("id") === col("key2")).select(col("id"), col("value2")) + + // Check the answer first. + val expectedAnswer = spark.range(0, 500).selectExpr("id % 500", "id as value") + .union(spark.range(500, 1000).selectExpr("id % 500", "id as value")) + QueryTest.checkAnswer( + join, + expectedAnswer.collect()) + + // Then, let's make sure we do not reduce number of post shuffle partitions. + val finalPlan = join.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + val shuffleReaders = finalPlan.collect { + case r @ CoalescedShuffleReader() => r + } + assert(shuffleReaders.length === 0) + } finally { + spark.sql("drop table t") + } + } + withSparkSession(test, 12000, minNumPostShufflePartitions) + } + } + + test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") { + val test: SparkSession => Unit = { spark: SparkSession => + spark.sql("SET spark.sql.exchange.reuse=true") + val df = spark.range(1).selectExpr("id AS key", "id AS value") + + // test case 1: a query stage has 3 child stages but they are the same stage. + // Final Stage 1 + // ShuffleQueryStage 0 + // ReusedQueryStage 0 + // ReusedQueryStage 0 + val resultDf = df.join(df, "key").join(df, "key") + QueryTest.checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil) + val finalPlan = resultDf.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + assert(finalPlan.collect { + case ShuffleQueryStageExec(_, r: ReusedExchangeExec) => r + }.length == 2) + assert( + finalPlan.collect { + case r @ CoalescedShuffleReader() => r + }.length == 3) + + + // test case 2: a query stage has 2 parent stages. + // Final Stage 3 + // ShuffleQueryStage 1 + // ShuffleQueryStage 0 + // ShuffleQueryStage 2 + // ReusedQueryStage 0 + val grouped = df.groupBy("key").agg(max("value").as("value")) + val resultDf2 = grouped.groupBy(col("key") + 1).max("value") + .union(grouped.groupBy(col("key") + 2).max("value")) + QueryTest.checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Nil) + + val finalPlan2 = resultDf2.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + + // The result stage has 2 children + val level1Stages = finalPlan2.collect { case q: QueryStageExec => q } + assert(level1Stages.length == 2) + + val leafStages = level1Stages.flatMap { stage => + // All of the child stages of result stage have only one child stage. + val children = stage.plan.collect { case q: QueryStageExec => q } + assert(children.length == 1) + children + } + assert(leafStages.length == 2) + + val reusedStages = level1Stages.flatMap { stage => + stage.plan.collect { + case ShuffleQueryStageExec(_, r: ReusedExchangeExec) => r + } + } + assert(reusedStages.length == 1) + } + withSparkSession(test, 4, None) + } + + test("Do not reduce the number of shuffle partition for repartition") { + val test: SparkSession => Unit = { spark: SparkSession => + val ds = spark.range(3) + val resultDf = ds.repartition(2, ds.col("id")).toDF() + + QueryTest.checkAnswer(resultDf, + Seq(0, 1, 2).map(i => Row(i))) + val finalPlan = resultDf.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + assert( + finalPlan.collect { + case r @ CoalescedShuffleReader() => r + }.isEmpty) + } + withSparkSession(test, 200, None) + } + + test("Union two datasets with different pre-shuffle partition number") { + val test: SparkSession => Unit = { spark: SparkSession => + val df1 = spark.range(3).join(spark.range(3), "id").toDF() + val df2 = spark.range(3).groupBy().sum() + + val resultDf = df1.union(df2) + + QueryTest.checkAnswer(resultDf, Seq((0), (1), (2), (3)).map(i => Row(i))) + + val finalPlan = resultDf.queryExecution.executedPlan + .asInstanceOf[AdaptiveSparkPlanExec].executedPlan + // As the pre-shuffle partition number are different, we will skip reducing + // the shuffle partition numbers. + assert( + finalPlan.collect { + case r @ CoalescedShuffleReader() => r + }.isEmpty) + } + withSparkSession(test, 100, None) + } +} + +object CoalescedShuffleReader { + def unapply(reader: ColumnarCustomShuffleReaderExec): Boolean = { + !reader.isLocalReader && !reader.hasSkewedPartition && reader.hasCoalescedPartition + } +} diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 4b05f850e..90c1194ee 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.optimizer.BuildRight -import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarSparkPlanTest, PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffleColumnarRDD, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarSparkPlanTest, PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledColumnarRDD, SparkPlan} import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, ColumnarBroadcastHashJoinExec, ColumnarSortMergeJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate @@ -102,9 +102,9 @@ class AdaptiveQueryExecSuite extends ColumnarSparkPlanTest } } - private def findTopLevelColumnarSortMergeJoin(plan: SparkPlan): Seq[ColumnarSortMergeJoin] = { + private def findTopLevelColumnarSortMergeJoin(plan: SparkPlan): Seq[ColumnarSortMergeJoinExec] = { collect(plan) { - case j: ColumnarSortMergeJoin => j + case j: ColumnarSortMergeJoinExec => j } } @@ -247,7 +247,8 @@ class AdaptiveQueryExecSuite extends ColumnarSparkPlanTest } withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { - val testDf = df1.where('a > 10).join(df2.where('b > 10), Seq("id"), "left_outer") + // currently, we only support "inner" join type + val testDf = df1.where('a > 10).join(df2.where('b > 10), Seq("id"), "inner") .groupBy('a).count() checkAnswer(testDf, Seq()) val plan = testDf.queryExecution.executedPlan @@ -767,9 +768,9 @@ class AdaptiveQueryExecSuite extends ColumnarSparkPlanTest Utils.deleteRecursively(tableDir) df1.write.parquet(tableDir.getAbsolutePath) - val aggregated = spark.table("bucketed_table").groupBy("i").count() + val aggregated = spark.table("bucketed_table").groupBy("i").sum() val error = intercept[Exception] { - aggregated.count() + aggregated.head() } assert(error.getMessage contains "Invalid bucket file") assert(error.getSuppressed.size === 0) -- Gitee