From 697a38cfe2b68bbf09d281aeceef5d56145e1a3b Mon Sep 17 00:00:00 2001 From: linlong Date: Mon, 19 Jun 2023 19:55:31 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E3=80=90spark-extension=E3=80=91add=20heur?= =?UTF-8?q?istic=20join=20reorder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../boostkit/spark/ColumnarPlugin.scala | 3 + .../boostkit/spark/ColumnarPluginConfig.scala | 8 + .../optimizer/HeuristicJoinReorder.scala | 341 ++++++++++++++++++ 3 files changed, 352 insertions(+) create mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorder.scala diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala index 8e7a5786f..9ee27d17a 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala @@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.expressions.{Ascending, DynamicPruningSubquery, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate.Partial +import org.apache.spark.sql.catalyst.optimizer.{DelayCartesianProduct, HeuristicJoinReorder} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowToOmniColumnarExec, _} import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ColumnarCustomShuffleReaderExec, CustomShuffleReaderExec, QueryStageExec, ShuffleQueryStageExec} @@ -539,5 +540,7 @@ class ColumnarPlugin extends (SparkSessionExtensions => Unit) with Logging { logInfo("Using BoostKit Spark Native Sql Engine Extension to Speed Up Your Queries.") extensions.injectColumnar(session => ColumnarOverrideRules(session)) extensions.injectPlannerStrategy(_ => ShuffleJoinStrategy) + extensions.injectOptimizerRule(_ => DelayCartesianProduct) + extensions.injectOptimizerRule(_ => HeuristicJoinReorder) } } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index 483071209..67a99296c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -75,6 +75,14 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { .getConfString("spark.omni.sql.columnar.broadcastJoin", "true") .toBoolean + // enable or disable heuristic join reorder + val enableHeuristicJoinReorder: Boolean = + conf.getConfString("spark.sql.heuristicJoinReorder.enabled", "true").toBoolean + + // enable or disable delay cartesian product + val enableDelayCartesianProduct: Boolean = + conf.getConfString("spark.sql.enableDelayCartesianProduct.enabled", "true").toBoolean + // enable native table scan val enableColumnarFileScan: Boolean = conf .getConfString("spark.omni.sql.columnar.nativefilescan", "true") diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorder.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorder.scala new file mode 100644 index 000000000..d481ed01f --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorder.scala @@ -0,0 +1,341 @@ + +package org.apache.spark.sql.catalyst.optimizer + +import scala.annotation.tailrec +import scala.collection.mutable + +import com.huawei.boostkit.spark.ColumnarPluginConfig + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, EqualNullSafe, EqualTo, Expression, IsNotNull, PredicateHelper} +import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.util.sideBySide + + + + +/** + * Move all cartesian products to the root of the plan + */ +object DelayCartesianProduct extends Rule[LogicalPlan] with PredicateHelper { + + /** + * Extract cliques from the input plans. + * A cliques is a sub-tree(sub-plan) which doesn't have any join with other sub-plan. + * The input plans are picked from left to right + * , until we can't find join condition in the remaining plans. + * The same logic is applied to the remaining plans, until all plans are picked. + * This function can produce a left-deep tree or a bushy tree. + * + * @param input a list of LogicalPlans to inner join and the type of inner join. + * @param conditions a list of condition for join. + */ + private def extractCliques(input: Seq[(LogicalPlan, InnerLike)], conditions: Seq[Expression]) + : Seq[(LogicalPlan, InnerLike)] = { + if (input.size == 1) { + input + } else { + val (leftPlan, leftInnerJoinType) :: linearSeq = input + // discover the initial join that contains at least one join condition + val conditionalOption = linearSeq.find { planJoinPair => + val plan = planJoinPair._1 + val refs = leftPlan.outputSet ++ plan.outputSet + conditions + .filterNot(l => l.references.nonEmpty && canEvaluate(l, leftPlan)) + .filterNot(r => r.references.nonEmpty && canEvaluate(r, plan)) + .exists(_.references.subsetOf(refs)) + } + + if (conditionalOption.isEmpty) { + Seq((leftPlan, leftInnerJoinType)) ++ extractCliques(linearSeq, conditions) + } else { + val (rightPlan, rightInnerJoinType) = conditionalOption.get + + val joinedRefs = leftPlan.outputSet ++ rightPlan.outputSet + val (joinConditions, otherConditions) = conditions.partition( + e => e.references.subsetOf(joinedRefs) && canEvaluateWithinJoin(e)) + val joined = Join(leftPlan, rightPlan, rightInnerJoinType, + joinConditions.reduceLeftOption(And), JoinHint.NONE) + + // must not make reference to the same logical plan + extractCliques(Seq((joined, Inner)) + ++ linearSeq.filterNot(_._1 eq rightPlan), otherConditions) + } + } + } + + /** + * Link cliques by cartesian product + * + * @param input + * @return + */ + private def linkCliques(input: Seq[(LogicalPlan, InnerLike)]) + : LogicalPlan = { + if (input.length == 1) { + input.head._1 + } else if (input.length == 2) { + val ((left, innerJoinType1), (right, innerJoinType2)) = (input(0), input(1)) + val joinType = resetJoinType(innerJoinType1, innerJoinType2) + Join(left, right, joinType, None, JoinHint.NONE) + } else { + val (left, innerJoinType1) :: (right, innerJoinType2) :: rest = input + val joinType = resetJoinType(innerJoinType1, innerJoinType2) + linkCliques(Seq((Join(left, right, joinType, None, JoinHint.NONE), joinType)) ++ rest) + } + } + + /** + * This is to reset the join type before reordering. + * + * @param leftJoinType + * @param rightJoinType + * @return + */ + private def resetJoinType(leftJoinType: InnerLike, rightJoinType: InnerLike): InnerLike = { + (leftJoinType, rightJoinType) match { + case (_, Cross) | (Cross, _) => Cross + case _ => Inner + } + } + + def apply(plan: LogicalPlan): LogicalPlan = { + if (!ColumnarPluginConfig.getSessionConf.enableDelayCartesianProduct) { + return plan + } + + // Reorder joins only when there are cartesian products. + var existCartesianProduct = false + plan foreach { + case Join(_, _, _: InnerLike, None, _) => existCartesianProduct = true + case _ => + } + + if (existCartesianProduct) { + plan.transform { + case originalPlan@ExtractFiltersAndInnerJoins(input, conditions) + if input.size > 2 && conditions.nonEmpty => + val cliques = extractCliques(input, conditions) + val reorderedPlan = linkCliques(cliques) + + reorderedPlan match { + // Generate a bushy tree after reordering. + case ExtractFiltersAndInnerJoinsForBushy(_, joinConditions) => + val primalConditions = conditions.flatMap(splitConjunctivePredicates) + val reorderedConditions = joinConditions.flatMap(splitConjunctivePredicates).toSet + val missingConditions = primalConditions.filterNot(reorderedConditions.contains) + if (missingConditions.nonEmpty) { + val comparedPlans = + sideBySide(originalPlan.treeString, reorderedPlan.treeString).mkString("\n") + logWarning("There are missing conditions after reordering, falling back to the " + + s"original plan. == Comparing two plans ===\n$comparedPlans") + originalPlan + } else { + reorderedPlan + } + case _ => throw new AnalysisException( + s"There is no join node in the plan, this should not happen: $reorderedPlan") + } + } + } else { + plan + } + } +} + +/** + * Firstly, Heuristic reorder join need to execute small joins with filters + * , which can reduce intermediate results + */ +object HeuristicJoinReorder extends Rule[LogicalPlan] + with PredicateHelper with JoinSelectionHelper { + + /** + * Join a list of plans together and push down the conditions into them. + * The joined plan are picked from left to right, thus the final result is a left-deep tree. + * + * @param input a list of LogicalPlans to inner join and the type of inner join. + * @param conditions a list of condition for join. + */ + @tailrec + final def createReorderJoin(input: Seq[(LogicalPlan, InnerLike)], conditions: Seq[Expression]) + : LogicalPlan = { + assert(input.size >= 2) + if (input.size == 2) { + val (joinConditions, others) = conditions.partition(canEvaluateWithinJoin) + val ((leftPlan, leftJoinType), (rightPlan, rightJoinType)) = (input(0), input(1)) + val innerJoinType = (leftJoinType, rightJoinType) match { + case (Inner, Inner) => Inner + case (_, _) => Cross + } + // Set the join node ordered so that we don't need to transform them again. + val orderJoin = OrderedJoin(leftPlan, rightPlan, innerJoinType, joinConditions.reduceLeftOption(And)) + if (others.nonEmpty) { + Filter(others.reduceLeft(And), orderJoin) + } else { + orderJoin + } + } else { + val (left, _) :: rest = input.toList + val candidates = rest.filter { planJoinPair => + val plan = planJoinPair._1 + // 1. it has join conditions with the left node + // 2. it has a filter + // 3. it can be broadcast + val isEqualJoinCondition = conditions.flatMap { + case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None + case EqualNullSafe(l, r) if l.references.isEmpty || r.references.isEmpty => None + case e@EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, plan) => Some(e) + case e@EqualTo(l, r) if canEvaluate(l, plan) && canEvaluate(r, left) => Some(e) + case e@EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, plan) => Some(e) + case e@EqualNullSafe(l, r) if canEvaluate(l, plan) && canEvaluate(r, left) => Some(e) + case _ => None + }.nonEmpty + + val hasFilter = plan match { + case f: Filter if hasValuableCondition(f.condition) => true + case Project(_, f: Filter) if hasValuableCondition(f.condition) => true + case _ => false + } + + isEqualJoinCondition && hasFilter + } + val (right, innerJoinType) = if (candidates.nonEmpty) { + candidates.minBy(_._1.stats.sizeInBytes) + } else { + rest.head + } + + val joinedRefs = left.outputSet ++ right.outputSet + val selectedJoinConditions = mutable.HashSet.empty[Expression] + val (joinConditions, others) = conditions.partition { e => + // If there are semantically equal conditions, they should come from two different joins. + // So we should not put them into one join. + if (!selectedJoinConditions.contains(e.canonicalized) && e.references.subsetOf(joinedRefs) + && canEvaluateWithinJoin(e)) { + selectedJoinConditions.add(e.canonicalized) + true + } else { + false + } + } + // Set the join node ordered so that we don't need to transform them again. + val joined = OrderedJoin(left, right, innerJoinType, joinConditions.reduceLeftOption(And)) + + // should not have reference to same logical plan + createReorderJoin(Seq((joined, Inner)) ++ rest.filterNot(_._1 eq right), others) + } + } + + private def hasValuableCondition(condition: Expression): Boolean = { + val conditions = splitConjunctivePredicates(condition) + !conditions.forall(_.isInstanceOf[IsNotNull]) + } + + def apply(plan: LogicalPlan): LogicalPlan = { + if (ColumnarPluginConfig.getSessionConf.enableHeuristicJoinReorder) { + val newPlan = plan.transform { + case p@ExtractFiltersAndInnerJoinsByIgnoreProjects(input, conditions) + if input.size > 2 && conditions.nonEmpty => + val reordered = createReorderJoin(input, conditions) + if (p.sameOutput(reordered)) { + reordered + } else { + // Reordering the joins have changed the order of the columns. + // Inject a projection to make sure we restore to the expected ordering. + Project(p.output, reordered) + } + } + + // After reordering is finished, convert OrderedJoin back to Join + val result = newPlan.transformDown { + case OrderedJoin(left, right, jt, cond) => Join(left, right, jt, cond, JoinHint.NONE) + } + if (!result.resolved) { + // In some special cases related to subqueries, we find that after reordering, + val comparedPlans = sideBySide(plan.treeString, result.treeString).mkString("\n") + logWarning("The structural integrity of the plan is broken, falling back to the " + + s"original plan. == Comparing two plans ===\n$comparedPlans") + plan + } else { + result + } + } else { + plan + } + } +} + +/** + * This is different from [[ExtractFiltersAndInnerJoins]] in that it can collect filters and + * inner joins by ignoring projects on top of joins, which are produced by column pruning. + */ +private object ExtractFiltersAndInnerJoinsByIgnoreProjects extends PredicateHelper { + + /** + * Flatten all inner joins, which are next to each other. + * Return a list of logical plans to be joined with a boolean for each plan indicating if it + * was involved in an explicit cross join. Also returns the entire list of join conditions for + * the left-deep tree. + */ + def flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner) + : (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match { + case Join(left, right, joinType: InnerLike, cond, hint) if hint == JoinHint.NONE => + val (plans, conditions) = flattenJoin(left, joinType) + (plans ++ Seq((right, joinType)), conditions ++ + cond.toSeq.flatMap(splitConjunctivePredicates)) + case Filter(filterCondition, j@Join(_, _, _: InnerLike, _, hint)) if hint == JoinHint.NONE => + val (plans, conditions) = flattenJoin(j) + (plans, conditions ++ splitConjunctivePredicates(filterCondition)) + case Project(projectList, child) + if projectList.forall(_.isInstanceOf[Attribute]) => flattenJoin(child) + + case _ => (Seq((plan, parentJoinType)), Seq.empty) + } + + def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] + = plan match { + case f@Filter(_, Join(_, _, _: InnerLike, _, _)) => + Some(flattenJoin(f)) + case j@Join(_, _, _, _, hint) if hint == JoinHint.NONE => + Some(flattenJoin(j)) + case _ => None + } +} + +private object ExtractFiltersAndInnerJoinsForBushy extends PredicateHelper { + + /** + * This function works for both left-deep and bushy trees. + * + * @param plan + * @param parentJoinType + * @return + */ + def flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner) + : (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match { + case Join(left, right, joinType: InnerLike, cond, _) => + val (lPlans, lConds) = flattenJoin(left, joinType) + val (rPlans, rConds) = flattenJoin(right, joinType) + (lPlans ++ rPlans, lConds ++ rConds ++ cond.toSeq) + + case Filter(filterCondition, j@Join(_, _, _: InnerLike, _, _)) => + val (plans, conditions) = flattenJoin(j) + (plans, conditions ++ splitConjunctivePredicates(filterCondition)) + + case _ => (Seq((plan, parentJoinType)), Seq()) + } + + def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] = { + plan match { + case f@Filter(_, Join(_, _, _: InnerLike, _, _)) => + Some(flattenJoin(f)) + case j@Join(_, _, _, _, _) => + Some(flattenJoin(j)) + case _ => None + } + } +} \ No newline at end of file -- Gitee From 41c29c9f35cd07a211ba03909c39579d493d2c6e Mon Sep 17 00:00:00 2001 From: linlong Date: Sun, 25 Jun 2023 12:18:21 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E3=80=90spark-extension=E3=80=91add=20heur?= =?UTF-8?q?istic=20join=20reorder=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../HeuristicJoinReorderPlanTestBase.scala | 62 ++++++++++++++++++ .../optimizer/HeuristicJoinReorderSuite.scala | 65 +++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorderPlanTestBase.scala create mode 100644 omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorderSuite.scala diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorderPlanTestBase.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorderPlanTestBase.scala new file mode 100644 index 000000000..9e92cd528 --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorderPlanTestBase.scala @@ -0,0 +1,62 @@ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.util.sideBySide + +trait HeuristicJoinReorderPlanTestBase extends PlanTest { + + def outputsOf(plans: LogicalPlan*): Seq[Attribute] = { + plans.map(_.output).reduce(_ ++ _) + } + + def assertEqualJoinPlans( + optimizer: RuleExecutor[LogicalPlan], + originalPlan: LogicalPlan, + groundTruthBestPlan: LogicalPlan): Unit = { + val analyzed = originalPlan.analyze + val optimized = optimizer.execute(analyzed) + val expected = EliminateResolvedHint.apply(groundTruthBestPlan.analyze) + + assert(equivalentOutput(analyzed, expected)) + assert(equivalentOutput(analyzed, optimized)) + + compareJoinOrder(optimized, expected) + } + + protected def equivalentOutput(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { + normalizeExprIds(plan1).output == normalizeExprIds(plan2).output + } + + protected def compareJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): Unit = { + val normalized1 = normalizePlan(normalizeExprIds(plan1)) + val normalized2 = normalizePlan(normalizeExprIds(plan2)) + if (!sameJoinPlan(normalized1, normalized2)) { + fail( + s""" + |== FAIL: Plans do not match === + |${sideBySide( + rewriteNameFromAttrNullability(normalized1).treeString, + rewriteNameFromAttrNullability(normalized2).treeString).mkString("\n")} + """.stripMargin) + } + } + + private def sameJoinPlan(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = { + (plan1, plan2) match { + case (j1: Join, j2: Join) => + (sameJoinPlan(j1.left, j2.left) && sameJoinPlan(j1.right, j2.right) + && j1.hint.leftHint == j2.hint.leftHint && j1.hint.rightHint == j2.hint.rightHint) || + (sameJoinPlan(j1.left, j2.right) && sameJoinPlan(j1.right, j2.left) + && j1.hint.leftHint == j2.hint.rightHint && j1.hint.rightHint == j2.hint.leftHint) + case (p1: Project, p2: Project) => + p1.projectList == p2.projectList && sameJoinPlan(p1.child, p2.child) + case _ => + plan1 == plan2 + } + } +} diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorderSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorderSuite.scala new file mode 100644 index 000000000..c8263228f --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/optimizer/HeuristicJoinReorderSuite.scala @@ -0,0 +1,65 @@ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} + +class HeuristicJoinReorderSuite + extends HeuristicJoinReorderPlanTestBase with StatsEstimationTestBase { + + private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( + attr("t1.k-1-2") -> rangeColumnStat(2, 0), + attr("t1.v-1-10") -> rangeColumnStat(10, 0), + attr("t2.k-1-5") -> rangeColumnStat(5, 0), + attr("t3.v-1-100") -> rangeColumnStat(100, 0), + attr("t4.k-1-2") -> rangeColumnStat(2, 0), + attr("t4.v-1-10") -> rangeColumnStat(10, 0), + attr("t5.k-1-5") -> rangeColumnStat(5, 0), + attr("t5.v-1-5") -> rangeColumnStat(5, 0) + )) + + private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1) + private val nameToColInfo: Map[String, (Attribute, ColumnStat)] = + columnInfo.map(kv => kv._1.name -> kv) + + private val t1 = StatsTestPlan( + outputList = Seq("t1.k-1-2", "t1.v-1-10").map(nameToAttr), + rowCount = 1000, + size = Some(1000 * (8 + 4 + 4)), + attributeStats = AttributeMap(Seq("t1.k-1-2", "t1.v-1-10").map(nameToColInfo))) + + private val t2 = StatsTestPlan( + outputList = Seq("t2.k-1-5").map(nameToAttr), + rowCount = 20, + size = Some(20 * (8 + 4)), + attributeStats = AttributeMap(Seq("t2.k-1-5").map(nameToColInfo))) + + private val t3 = StatsTestPlan( + outputList = Seq("t3.v-1-100").map(nameToAttr), + rowCount = 100, + size = Some(100 * (8 + 4)), + attributeStats = AttributeMap(Seq("t3.v-1-100").map(nameToColInfo))) + + test("reorder 3 tables") { + val originalPlan = + t1.join(t2).join(t3) + .where((nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5")) && + (nameToAttr("t1.v-1-10") === nameToAttr("t3.v-1-100"))) + + val analyzed = originalPlan.analyze + val optimized = HeuristicJoinReorder.apply(analyzed).select(outputsOf(t1, t2, t3): _*) + val expected = + t1.join(t2, Inner, Some(nameToAttr("t1.k-1-2") === nameToAttr("t2.k-1-5"))) + .join(t3, Inner, Some(nameToAttr("t1.v-1-10") === nameToAttr("t3.v-1-100"))) + .select(outputsOf(t1, t2, t3): _*) + + assert(equivalentOutput(analyzed, expected)) + assert(equivalentOutput(analyzed, optimized)) + + compareJoinOrder(optimized, expected) + } +} -- Gitee