From 4e6398d18c726ec25f594bdbf61bc7c832a732aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Mon, 2 Dec 2024 10:56:59 +0800 Subject: [PATCH 01/15] =?UTF-8?q?=E6=96=B0=E5=A2=9ECoulmnarBroadCastNested?= =?UTF-8?q?LoopJoinExec=E8=BF=99=E4=B8=AA=E7=B1=BB=EF=BC=8C=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0NestedLoopJoin=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ColumnarBroadcastNestedLoopJoinExec.scala | 392 ++++++++++++++++++ 1 file changed, 392 insertions(+) create mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala new file mode 100644 index 000000000..6d87c7685 --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import java.util.Optional +import java.util.concurrent.TimeUnit.NANOSECONDS +import scala.collection.mutable +import com.huawei.boostkit.spark.ColumnarPluginConfig +import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP +import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor +import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{checkOmniJsonWhiteList, isSimpleColumn, isSimpleColumnForAll} +import com.huawei.boostkit.spark.util.OmniAdaptorUtil +import com.huawei.boostkit.spark.util.OmniAdaptorUtil.{getExprIdForProjectList, getIndexArray, getProjectListIndex,pruneOutput, reorderOutputVecs, transColBatchToOmniVecs} +import nova.hetu.omniruntime.constants.JoinType._ +import nova.hetu.omniruntime.`type`.DataType +import nova.hetu.omniruntime.operator.OmniOperator +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} +import nova.hetu.omniruntime.operator.join.{NestedLoopJoinBuildOperatorFactory, NestedLoopJoinLookupOperatorFactory} +import nova.hetu.omniruntime.vector.VecBatch +import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{CodegenSupport, ColumnarHashedRelation, ExplainUtils, SparkPlan} +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.execution.util.{MergeIterator, SparkMemoryUtils} +import org.apache.spark.sql.execution.vectorized.OmniColumnVector +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * Performs an inner hash join of two child relations. When the output RDD of this operator is + * being constructed, a Spark job is asynchronously started to calculate the values for the + * broadcast relation. This data is then placed in a Spark broadcast variable. The streamedPlan + * relation is not shuffled. + */ +case class ColumnarBroadcastNestedLoopJoinExec( + left: SparkPlan, + right: SparkPlan, + buildSide: BuildSide, + joinType: JoinType, + condition: Option[Expression], + projectList: Seq[NamedExpression] = Seq.empty) extends JoinCodegenSupport { + + override def verboseStringWithOperatorId(): String = { + val joinCondStr = if (condition.isDefined) { + s"${condition.get}${condition.get.dataType}" + } else "None" + s""" + |$formattedNodeName + |$simpleStringWithNodeId + |${ExplainUtils.generateFieldString("buildInput", buildOutput ++ buildOutput.map(_.dataType))} + |${ExplainUtils.generateFieldString("streamedInput", streamedOutput ++ streamedOutput.map(_.dataType))} + |${ExplainUtils.generateFieldString("leftKeys", leftKeys ++ leftKeys.map(_.dataType))} + |${ExplainUtils.generateFieldString("rightKeys", rightKeys ++ rightKeys.map(_.dataType))} + |${ExplainUtils.generateFieldString("condition", joinCondStr)} + |${ExplainUtils.generateFieldString("projectList", projectList.map(_.toAttribute) ++ projectList.map(_.toAttribute).map(_.dataType))} + |${ExplainUtils.generateFieldString("output", output ++ output.map(_.dataType))} + |Condition : $condition + |""".stripMargin + } + + private val (buildOutput, streamedOutput) = buildSide match { + case BuildRight => (left.output, right.output) + case BuildLeft => (right.output, left.output) + } + + override def leftKeys: Seq[Expression] = Nil; + + override def rightKeys: Seq[Expression] = Nil; + + private val (streamedPlan, buildPlan) = buildSide match { + case BuildRight => (left, right) + case BuildLeft => (right, left) + } + + protected lazy val (buildKeys, streamedKeys) = { + require(leftKeys.length == rightKeys.length && + leftKeys.map(_.dataType) + .zip(rightKeys.map(_.dataType)) + .forall(types => types._1.sameType(types._2)), + "Join keys from two sides should have same length and types") + buildSide match { + case BuildLeft => (leftKeys, rightKeys) + case BuildRight => (rightKeys, leftKeys) + } + } + + override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "lookupAddInputTime" -> + SQLMetrics.createTimingMetric(sparkContext, "time in omni lookup addInput"), + "lookupGetOutputTime" -> + SQLMetrics.createTimingMetric(sparkContext, "time in omni lookup getOutput"), + "lookupCodegenTime" -> + SQLMetrics.createTimingMetric(sparkContext, "time in omni lookup codegen"), + "buildAddInputTime" -> + SQLMetrics.createTimingMetric(sparkContext, "time in omni build addInput"), + "buildGetOutputTime" -> + SQLMetrics.createTimingMetric(sparkContext, "time in omni build getOutput"), + "buildCodegenTime" -> + SQLMetrics.createTimingMetric(sparkContext, "time in omni build codegen"), + "numOutputVecBatches" -> SQLMetrics.createMetric(sparkContext, "number of output vecBatches"), + "numMergedVecBatches" -> SQLMetrics.createMetric(sparkContext, "number of merged vecBatches") + ) + + override protected def withNewChildrenInternal( + newLeft: SparkPlan, newRight: SparkPlan): ColumnarBroadcastNestedLoopJoinExec = + copy(left = newLeft, right = newRight) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + streamedPlan.asInstanceOf[CodegenSupport].inputRDDs() + } + + override def supportsColumnar: Boolean = true + + override def supportCodegen: Boolean = false + + override def nodeName: String = "OmniColumnarBroadcastNestLoopJoin" + + /** only for operator fusion */ + def getBuildOutput: Seq[Attribute] = { + buildOutput + } + + def getBuildKeys: Seq[Expression] = { + buildKeys + } + + def getBuildPlan: SparkPlan = { + buildPlan + } + + def getStreamedOutput: Seq[Attribute] = { + streamedOutput + } + + def getStreamedKeys: Seq[Expression] = { + streamedKeys + } + + def getstreamedPlan: SparkPlan = { + streamedPlan + } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + // input/output: {col1#10,col2#11,col1#12,col2#13} + val numOutputRows = longMetric("numOutputRows") + val numOutputVecBatches = longMetric("numOutputVecBatches") + val numMergedVecBatches = longMetric("numMergedVecBatches") + val bypassVecBatches = longMetric("bypassVecBatches") + val buildAddInputTime = longMetric("buildAddInputTime") + val buildCodegenTime = longMetric("buildCodegenTime") + val buildGetOutputTime = longMetric("buildGetOutputTime") + val lookupAddInputTime = longMetric("lookupAddInputTime") + val lookupCodegenTime = longMetric("lookupCodegenTime") + val lookupGetOutputTime = longMetric("lookupGetOutputTime") + + val buildTypes = new Array[DataType](buildOutput.size) // {2,2}, buildOutput:col1#12,col2#13 + buildOutput.zipWithIndex.foreach { case (att, i) => + buildTypes(i) = OmniExpressionAdaptor.sparkTypeToOmniType(att.dataType, att.metadata) + } + + val columnarConf: ColumnarPluginConfig = ColumnarPluginConfig.getSessionConf + val enableJoinBatchMerge: Boolean = columnarConf.enableJoinBatchMerge + + val projectExprIdList = getExprIdForProjectList(projectList) + // {0}, buildKeys: col1#12 + val buildOutputCols: Array[Int] = joinType match { + case Inner | LeftOuter | RightOuter => + getIndexArray(buildOutput, projectExprIdList) + case LeftExistence(_) => + Array[Int]() + case x => + throw new UnsupportedOperationException(s"ColumnBroadcastNestedLoopJoin Join-type[$x] is not supported!") + } + + val buildOutputExprIdMap = OmniExpressionAdaptor.getExprIdMap(buildOutput.map(_.toAttribute)) + + val prunedBuildOutput = pruneOutput(buildOutput, projectExprIdList) + val buildOutputTypes = new Array[DataType](prunedBuildOutput.size) // {2,2}, buildOutput:col1#12,col2#13 + prunedBuildOutput.zipWithIndex.foreach { case (att, i) => + buildOutputTypes(i) = OmniExpressionAdaptor.sparkTypeToOmniType(att.dataType, att.metadata) + } + + val probeTypes = new Array[DataType](streamedOutput.size) // {2,2}, streamedOutput:col1#10,col2#11 + streamedOutput.zipWithIndex.foreach { case (attr, i) => + probeTypes(i) = OmniExpressionAdaptor.sparkTypeToOmniType(attr.dataType, attr.metadata) + } + val probeOutputCols = getIndexArray(streamedOutput, projectExprIdList) // {0,1} + val prunedStreamedOutput = pruneOutput(streamedOutput, projectExprIdList) + + val projectListIndex = getProjectListIndex(projectExprIdList, prunedStreamedOutput, prunedBuildOutput) + val lookupJoinType = OmniExpressionAdaptor.toOmniJoinType(joinType) + streamedPlan.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => + val filter: Optional[String] = condition match { + case Some(expr) => + Optional.of(OmniExpressionAdaptor.rewriteToOmniJsonExpressionLiteral(expr, + OmniExpressionAdaptor.getExprIdMap((streamedOutput ++ buildOutput).map(_.toAttribute)))) + case _ => + Optional.empty() + } + + def createBuildOpFactoryAndOp(): (NestedLoopJoinBuildOperatorFactory, OmniOperator) = { + val startBuildCodegen = System.nanoTime() + val opFactory = + new NestedLoopJoinBuildOperatorFactory(buildTypes, buildOutputCols, + new OperatorConfig(SpillConfig.NONE, + new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) + val op = opFactory.createOperator() + buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) + + + val startBuildGetOp = System.nanoTime() + try { + op.getOutput + } catch { + case e: Exception => { + op.close() + opFactory.close() + throw new RuntimeException("HashBuilder getOutput failed") + } + } + buildGetOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildGetOp) + (opFactory, op) + } + + var buildOp: OmniOperator = null + var buildOpFactory: NestedLoopJoinBuildOperatorFactory = null + + val (opFactory, op) = createBuildOpFactoryAndOp() + buildOpFactory = opFactory + buildOp = op + + + val startLookupCodegen = System.nanoTime() + val lookupOpFactory = new NestedLoopJoinLookupOperatorFactory(lookupJoinType, probeTypes,probeOutputCols,filter,buildOpFactory, + new OperatorConfig(SpillConfig.NONE, + new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) + val lookupOp = lookupOpFactory.createOperator() + lookupCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupCodegen) + + // close operator + SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { + lookupOp.close() + lookupOpFactory.close() + buildOp.close() + buildOpFactory.close() + + }) + + val resultSchema = this.schema + val reverse = buildSide == BuildLeft + var left = 0 + var leftLen = prunedStreamedOutput.size + var right = prunedStreamedOutput.size + var rightLen = output.size + if (reverse) { + left = prunedStreamedOutput.size + leftLen = output.size + right = 0 + rightLen = prunedStreamedOutput.size + } + + val iterBatch = new Iterator[ColumnarBatch] { + private var results: java.util.Iterator[VecBatch] = _ + var res: Boolean = true + + override def hasNext: Boolean = { + while ((results == null || !res) && iter.hasNext) { + val batch = iter.next() + val input = transColBatchToOmniVecs(batch) + val vecBatch = new VecBatch(input, batch.numRows()) + val startlookupInput = System.nanoTime() + lookupOp.addInput(vecBatch) + lookupAddInputTime += NANOSECONDS.toMillis(System.nanoTime() - startlookupInput) + + val startLookupGetOp = System.nanoTime() + results = lookupOp.getOutput + res = results.hasNext + lookupGetOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupGetOp) + + } + if (results == null) { + false + } else { + if (!res) { + false + } else { + val startLookupGetOp = System.nanoTime() + res = results.hasNext + lookupGetOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupGetOp) + res + } + } + + } + + override def next(): ColumnarBatch = { + val startLookupGetOp = System.nanoTime() + val result = results.next() + res = results.hasNext + lookupGetOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupGetOp) + val resultVecs = result.getVectors + val vecs = OmniColumnVector + .allocateColumns(result.getRowCount, resultSchema, false) + if (projectList.nonEmpty) { + reorderOutputVecs(projectListIndex, resultVecs, vecs) + } else { + var index = 0 + for (i <- left until leftLen) { + val v = vecs(index) + v.reset() + v.setVec(resultVecs(i)) + index += 1 + } + for (i <- right until rightLen) { + val v = vecs(index) + v.reset() + v.setVec(resultVecs(i)) + index += 1 + } + } + val rowCnt: Int = result.getRowCount + numOutputRows += rowCnt + numOutputVecBatches += 1 + result.close() + new ColumnarBatch(vecs.toArray, rowCnt) + } + } + + if (enableJoinBatchMerge) { + val mergeIterator = new MergeIterator(iterBatch, resultSchema, numMergedVecBatches) + SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { + mergeIterator.close() + }) + mergeIterator + } else { + iterBatch + } + } + } + + override protected def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") + } + + override def doProduce(ctx: CodegenContext): String = { + throw new UnsupportedOperationException(s"This operator doesn't support doProduce().") + } + + override def output: Seq[Attribute] = { + if (projectList.nonEmpty) { + projectList.map(_.toAttribute) + } else { + joinType match { + case Inner => + left.output ++ right.output + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case j: ExistenceJoin => + left.output :+ j.exists + case LeftExistence(_) => + left.output + case x => + throw new IllegalArgumentException(s"NestedLoopJoin should not take $x as the JoinType") + } + } + } + +} -- Gitee From 37bc62ddb7d21d19424c59667e00de2a69589422 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Tue, 3 Dec 2024 10:20:03 +0800 Subject: [PATCH 02/15] =?UTF-8?q?=E6=96=B0=E5=A2=9ECoulmnarBroadCastNested?= =?UTF-8?q?LoopJoinExec=E8=BF=99=E4=B8=AA=E7=B1=BB=EF=BC=8C=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0NestedLoopJoin=E5=8A=9F=E8=83=BD,=E6=A0=B9=E6=8D=AEcom?= =?UTF-8?q?ment=E6=84=8F=E8=A7=81=E8=A1=A5=E5=85=85buildCheck=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ColumnarBroadcastNestedLoopJoinExec.scala | 72 ++++++++++++------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 6d87c7685..928a92e5c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -30,7 +30,7 @@ import nova.hetu.omniruntime.constants.JoinType._ import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.operator.OmniOperator import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} -import nova.hetu.omniruntime.operator.join.{NestedLoopJoinBuildOperatorFactory, NestedLoopJoinLookupOperatorFactory} +import nova.hetu.omniruntime.operator.join.{OmniNestedLoopJoinBuildOperatorFactory, OmniNestedLoopJoinLookupOperatorFactory} import nova.hetu.omniruntime.vector.VecBatch import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory import org.apache.spark.broadcast.Broadcast @@ -70,8 +70,6 @@ case class ColumnarBroadcastNestedLoopJoinExec( |$simpleStringWithNodeId |${ExplainUtils.generateFieldString("buildInput", buildOutput ++ buildOutput.map(_.dataType))} |${ExplainUtils.generateFieldString("streamedInput", streamedOutput ++ streamedOutput.map(_.dataType))} - |${ExplainUtils.generateFieldString("leftKeys", leftKeys ++ leftKeys.map(_.dataType))} - |${ExplainUtils.generateFieldString("rightKeys", rightKeys ++ rightKeys.map(_.dataType))} |${ExplainUtils.generateFieldString("condition", joinCondStr)} |${ExplainUtils.generateFieldString("projectList", projectList.map(_.toAttribute) ++ projectList.map(_.toAttribute).map(_.dataType))} |${ExplainUtils.generateFieldString("output", output ++ output.map(_.dataType))} @@ -84,9 +82,9 @@ case class ColumnarBroadcastNestedLoopJoinExec( case BuildLeft => (right.output, left.output) } - override def leftKeys: Seq[Expression] = Nil; + override def leftKeys: Seq[Expression] = Nil - override def rightKeys: Seq[Expression] = Nil; + override def rightKeys: Seq[Expression] = Nil private val (streamedPlan, buildPlan) = buildSide match { case BuildRight => (left, right) @@ -135,17 +133,13 @@ case class ColumnarBroadcastNestedLoopJoinExec( override def supportCodegen: Boolean = false - override def nodeName: String = "OmniColumnarBroadcastNestLoopJoin" + override def nodeName: String = "OmniColumnarBroadcastNestedLoopJoin" /** only for operator fusion */ def getBuildOutput: Seq[Attribute] = { buildOutput } - - def getBuildKeys: Seq[Expression] = { - buildKeys - } - + def getBuildPlan: SparkPlan = { buildPlan } @@ -153,14 +147,47 @@ case class ColumnarBroadcastNestedLoopJoinExec( def getStreamedOutput: Seq[Attribute] = { streamedOutput } - - def getStreamedKeys: Seq[Expression] = { - streamedKeys - } - + def getstreamedPlan: SparkPlan = { streamedPlan } + + def buildCheck(): Unit = { + if (isNullAwareAntiJoin) { + throw new UnsupportedOperationException(s"isNullAwareAntiJoin is not supported " + + s"in ${this.nodeName}") + } + joinType match { + case LeftOuter | Inner | RightOuter => + case _ => + throw new UnsupportedOperationException(s"Join-type[${joinType}] is not supported " + + s"in ${this.nodeName}") + } + val buildTypes = new Array[DataType](buildOutput.size) // {2, 2}, buildOutput:col1#12,col2#13 + buildOutput.zipWithIndex.foreach {case (att, i) => + buildTypes(i) = OmniExpressionAdaptor.sparkTypeToOmniType(att.dataType, att.metadata) + } + val buildJoinColsExp: Array[AnyRef] = buildKeys.map { x => + OmniExpressionAdaptor.rewriteToOmniJsonExpressionLiteral(x, + OmniExpressionAdaptor.getExprIdMap(buildOutput.map(_.toAttribute))) + }.toArray + val probeTypes = new Array[DataType](streamedOutput.size) + streamedOutput.zipWithIndex.foreach { case (attr, i) => + probeTypes(i) = OmniExpressionAdaptor.sparkTypeToOmniType(attr.dataType, attr.metadata) + } + if (!isSimpleColumnForAll(buildJoinColsExp.map(expr => expr.toString))) { + checkOmniJsonWhiteList("", buildJoinColsExp) + } + condition match { + case Some(expr) => + val filterExpr: String = OmniExpressionAdaptor.rewriteToOmniJsonExpressionLiteral(expr, + OmniExpressionAdaptor.getExprIdMap((streamedOutput ++ buildOutput).map(_.toAttribute))) + if (!isSimpleColumn(filterExpr)) { + checkOmniJsonWhiteList(filterExpr, new Array[AnyRef](0)) + } + case _ => Optional.empty() + } + } override def doExecuteColumnar(): RDD[ColumnarBatch] = { // input/output: {col1#10,col2#11,col1#12,col2#13} @@ -188,14 +215,9 @@ case class ColumnarBroadcastNestedLoopJoinExec( val buildOutputCols: Array[Int] = joinType match { case Inner | LeftOuter | RightOuter => getIndexArray(buildOutput, projectExprIdList) - case LeftExistence(_) => - Array[Int]() case x => throw new UnsupportedOperationException(s"ColumnBroadcastNestedLoopJoin Join-type[$x] is not supported!") } - - val buildOutputExprIdMap = OmniExpressionAdaptor.getExprIdMap(buildOutput.map(_.toAttribute)) - val prunedBuildOutput = pruneOutput(buildOutput, projectExprIdList) val buildOutputTypes = new Array[DataType](prunedBuildOutput.size) // {2,2}, buildOutput:col1#12,col2#13 prunedBuildOutput.zipWithIndex.foreach { case (att, i) => @@ -220,10 +242,10 @@ case class ColumnarBroadcastNestedLoopJoinExec( Optional.empty() } - def createBuildOpFactoryAndOp(): (NestedLoopJoinBuildOperatorFactory, OmniOperator) = { + def createBuildOpFactoryAndOp(): (OmniNestedLoopJoinBuildOperatorFactory, OmniOperator) = { val startBuildCodegen = System.nanoTime() val opFactory = - new NestedLoopJoinBuildOperatorFactory(buildTypes, buildOutputCols, + new OmniNestedLoopJoinBuildOperatorFactory(buildTypes, buildOutputCols, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val op = opFactory.createOperator() @@ -245,7 +267,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( } var buildOp: OmniOperator = null - var buildOpFactory: NestedLoopJoinBuildOperatorFactory = null + var buildOpFactory: OmniNestedLoopJoinBuildOperatorFactory = null val (opFactory, op) = createBuildOpFactoryAndOp() buildOpFactory = opFactory @@ -253,7 +275,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( val startLookupCodegen = System.nanoTime() - val lookupOpFactory = new NestedLoopJoinLookupOperatorFactory(lookupJoinType, probeTypes,probeOutputCols,filter,buildOpFactory, + val lookupOpFactory = new OmniNestedLoopJoinLookupOperatorFactory(lookupJoinType, probeTypes,probeOutputCols,filter,buildOpFactory, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val lookupOp = lookupOpFactory.createOperator() -- Gitee From b8c13850942e15925d9da34cecaed41602024645 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Tue, 3 Dec 2024 11:49:59 +0800 Subject: [PATCH 03/15] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E7=AE=97=E5=AD=90?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../boostkit/spark/ColumnarPlugin.scala | 24 ++++++++++ .../boostkit/spark/TransformHintRule.scala | 45 ++++++++++++++++++- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala index d19d1a467..0cdd5ce4b 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala @@ -116,6 +116,18 @@ case class ColumnarPreOverrides(isSupportAdaptive: Boolean = true) } else { ColumnarProjectExec(plan.projectList, child) } + case join: ColumnarBroadcastNestedLoopJoinExec => + if (plan.projectList.forall(project => OmniExpressionAdaptor.isSimpleProjectForAll(project)) && enableColumnarProjectFusion) { + ColumnarBroadcastNestedLoopJoinExec( + join.left, + join.right, + join.buildSide, + join.joinType, + join.condition, + plan.projectList) + } else { + ColumnarProjectExec(plan.projectList, child) + } case join: ColumnarShuffledHashJoinExec => if (plan.projectList.forall(project => OmniExpressionAdaptor.isSimpleProjectForAll(project)) && enableColumnarProjectFusion) { ColumnarShuffledHashJoinExec( @@ -392,6 +404,18 @@ case class ColumnarPreOverrides(isSupportAdaptive: Boolean = true) plan.condition, left, right) + case plan: BroadcastNestedLoopJoinExec => + logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") + val left = replaceWithColumnarPlan(plan.left) + val right = replaceWithColumnarPlan(plan.right) + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + ColumnarBroadcastNestedLoopJoinExec( + left, + right, + plan.buildSide, + plan.joinType, + plan.condition, + ) case plan: ShuffledHashJoinExec if enableDedupLeftSemiJoin && !SQLConf.get.adaptiveExecutionEnabled => { plan.joinType match { case LeftSemi => { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/TransformHintRule.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/TransformHintRule.scala index 553463d56..c53c463d1 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/TransformHintRule.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/TransformHintRule.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, OmniAQE import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ColumnarBroadcastHashJoinExec, ColumnarShuffledHashJoinExec, ColumnarSortMergeJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ColumnarBroadcastHashJoinExec, ColumnarBroadcastNestedLoopJoinExec, ColumnarShuffledHashJoinExec, ColumnarSortMergeJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.execution.{CoalesceExec, CodegenSupport, ColumnarBroadcastExchangeExec, ColumnarCoalesceExec, ColumnarDataWritingCommandExec, ColumnarExpandExec, ColumnarFileSourceScanExec, ColumnarFilterExec, ColumnarGlobalLimitExec, ColumnarHashAggregateExec, ColumnarLocalLimitExec, ColumnarProjectExec, ColumnarShuffleExchangeExec, ColumnarSortExec, ColumnarTakeOrderedAndProjectExec, ColumnarTopNSortExec, ColumnarUnionExec, ColumnarWindowExec, ExpandExec, FileSourceScanExec, FilterExec, GlobalLimitExec, LocalLimitExec, ProjectExec, SortExec, SparkPlan, TakeOrderedAndProjectExec, TopNSortExec, UnionExec} import org.apache.spark.sql.types.ColumnarBatchSupportUtil.checkColumnarBatchSupport @@ -315,6 +315,49 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { plan.right, plan.isNullAwareAntiJoin).buildCheck() TransformHints.tagTransformable(plan) + case plan: BroadcastNestedLoopJoinExec => + // We need to check if BroadcastExchangeExec can be converted to columnar-based. + // If not, BHJ should also be row-based. + if (!enableColumnarBroadcastJoin) { + TransformHints.tagNotTransformable( + plan, "columnar BroadcastNestedLoopJoin is not enabled in BroadcastNestedLoopJoinExec") + return + } + val left = plan.left + left match { + case exec: BroadcastExchangeExec => + new ColumnarBroadcastExchangeExec(exec.mode, exec.child) + case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec, _) => + new ColumnarBroadcastExchangeExec(plan.mode, plan.child) + case BroadcastQueryStageExec(_, plan: ReusedExchangeExec, _) => + plan match { + case ReusedExchangeExec(_, b: BroadcastExchangeExec) => + new ColumnarBroadcastExchangeExec(b.mode, b.child) + case _ => + } + case _ => + } + val right = plan.right + right match { + case exec: BroadcastExchangeExec => + new ColumnarBroadcastExchangeExec(exec.mode, exec.child) + case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec, _) => + new ColumnarBroadcastExchangeExec(plan.mode, plan.child) + case BroadcastQueryStageExec(_, plan: ReusedExchangeExec, _) => + plan match { + case ReusedExchangeExec(_, b: BroadcastExchangeExec) => + new ColumnarBroadcastExchangeExec(b.mode, b.child) + case _ => + } + case _ => + } + ColumnarBroadcastNestedLoopJoinExec( + plan.left, + plan.right, + plan.buildSide, + plan.joinType, + plan.condition).buildCheck() + TransformHints.tagTransformable(plan) case plan: SortMergeJoinExec => if (!enableColumnarSortMergeJoin) { TransformHints.tagNotTransformable( -- Gitee From 4fb5eea3ad3798005f701e9d3661afa4cb29b067 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Tue, 3 Dec 2024 11:56:34 +0800 Subject: [PATCH 04/15] =?UTF-8?q?=E8=A1=A5=E5=85=85=E5=A4=B4=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 928a92e5c..7050cbfc8 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -1,4 +1,5 @@ /* + * Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. -- Gitee From 989ed6f12d481ae049eea587783830aed39473b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Tue, 3 Dec 2024 15:17:31 +0800 Subject: [PATCH 05/15] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ColumnarBroadcastNestedLoopJoinExec.scala | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 7050cbfc8..fae610367 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -148,16 +148,8 @@ case class ColumnarBroadcastNestedLoopJoinExec( def getStreamedOutput: Seq[Attribute] = { streamedOutput } - - def getstreamedPlan: SparkPlan = { - streamedPlan - } def buildCheck(): Unit = { - if (isNullAwareAntiJoin) { - throw new UnsupportedOperationException(s"isNullAwareAntiJoin is not supported " + - s"in ${this.nodeName}") - } joinType match { case LeftOuter | Inner | RightOuter => case _ => @@ -168,17 +160,10 @@ case class ColumnarBroadcastNestedLoopJoinExec( buildOutput.zipWithIndex.foreach {case (att, i) => buildTypes(i) = OmniExpressionAdaptor.sparkTypeToOmniType(att.dataType, att.metadata) } - val buildJoinColsExp: Array[AnyRef] = buildKeys.map { x => - OmniExpressionAdaptor.rewriteToOmniJsonExpressionLiteral(x, - OmniExpressionAdaptor.getExprIdMap(buildOutput.map(_.toAttribute))) - }.toArray val probeTypes = new Array[DataType](streamedOutput.size) streamedOutput.zipWithIndex.foreach { case (attr, i) => probeTypes(i) = OmniExpressionAdaptor.sparkTypeToOmniType(attr.dataType, attr.metadata) } - if (!isSimpleColumnForAll(buildJoinColsExp.map(expr => expr.toString))) { - checkOmniJsonWhiteList("", buildJoinColsExp) - } condition match { case Some(expr) => val filterExpr: String = OmniExpressionAdaptor.rewriteToOmniJsonExpressionLiteral(expr, -- Gitee From f74a7a7e5035462a28193b68a4a5efd235e06a41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Tue, 3 Dec 2024 15:20:29 +0800 Subject: [PATCH 06/15] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index fae610367..9ed062911 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -231,9 +231,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( def createBuildOpFactoryAndOp(): (OmniNestedLoopJoinBuildOperatorFactory, OmniOperator) = { val startBuildCodegen = System.nanoTime() val opFactory = - new OmniNestedLoopJoinBuildOperatorFactory(buildTypes, buildOutputCols, - new OperatorConfig(SpillConfig.NONE, - new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) + new OmniNestedLoopJoinBuildOperatorFactory(buildTypes, buildOutputCols) val op = opFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) -- Gitee From e6336b1dae3686fd96db19344984ac25a853405e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Tue, 3 Dec 2024 15:48:31 +0800 Subject: [PATCH 07/15] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=94=99=E8=AF=AF,=E5=8A=9F=E8=83=BD=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../joins/ColumnarBroadcastNestedLoopJoinExec.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 9ed062911..44d0af090 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -180,7 +180,6 @@ case class ColumnarBroadcastNestedLoopJoinExec( val numOutputRows = longMetric("numOutputRows") val numOutputVecBatches = longMetric("numOutputVecBatches") val numMergedVecBatches = longMetric("numMergedVecBatches") - val bypassVecBatches = longMetric("bypassVecBatches") val buildAddInputTime = longMetric("buildAddInputTime") val buildCodegenTime = longMetric("buildCodegenTime") val buildGetOutputTime = longMetric("buildGetOutputTime") @@ -219,6 +218,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( val projectListIndex = getProjectListIndex(projectExprIdList, prunedStreamedOutput, prunedBuildOutput) val lookupJoinType = OmniExpressionAdaptor.toOmniJoinType(joinType) + val relation = buildPlan.executeBroadcast[ColumnarHashedRelation]() streamedPlan.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => val filter: Optional[String] = condition match { case Some(expr) => @@ -235,7 +235,12 @@ case class ColumnarBroadcastNestedLoopJoinExec( val op = opFactory.createOperator() buildCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildCodegen) - + val deserializer = VecBatchSerializerFactory.create() + relation.value.buildData.foreach { input => + val startBuildInput = System.nanoTime() + op.addInput(deserializer.deserialize(input)) + buildAddInputTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildInput) + } val startBuildGetOp = System.nanoTime() try { op.getOutput -- Gitee From e0b8b10acd34b7ee53fac5124d7202a6d152938a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Tue, 3 Dec 2024 20:39:36 +0800 Subject: [PATCH 08/15] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=94=99=E8=AF=AF,=E5=8A=9F=E8=83=BD=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 44d0af090..64a1756db 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -79,8 +79,8 @@ case class ColumnarBroadcastNestedLoopJoinExec( } private val (buildOutput, streamedOutput) = buildSide match { - case BuildRight => (left.output, right.output) - case BuildLeft => (right.output, left.output) + case BuildLeft => (left.output, right.output) + case BuildRight => (right.output, left.output) } override def leftKeys: Seq[Expression] = Nil -- Gitee From 10ba43afe4e4264570e68ec9e62fe1a448020009 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Mon, 9 Dec 2024 15:42:41 +0800 Subject: [PATCH 09/15] =?UTF-8?q?=E7=BC=96=E5=86=99extennsion=E4=BE=A7?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ColumnarNestedLoopJoinExecSuite.scala | 378 ++++++++++++++++++ 1 file changed, 378 insertions(+) create mode 100644 omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala new file mode 100644 index 000000000..4269a8fe0 --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala @@ -0,0 +1,378 @@ +/* + * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.BuildRight +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.execution.joins._ +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} +import org.apache.spark.sql.{DataFrame, Row} + +// refer to joins package +class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { + + import testImplicits.{localSeqToDatasetHolder, newProductEncoder} + + private var left: DataFrame = _ + private var right: DataFrame = _ + private var leftWithNull: DataFrame = _ + private var rightWithNull: DataFrame = _ + private var person_test: DataFrame = _ + private var order_test: DataFrame = _ + + protected override def beforeAll(): Unit = { + super.beforeAll() + left = Seq[(String, String, java.lang.Integer, java.lang.Double)]( + ("abc", "", 4, 2.0), + ("", "Hello", 1, 1.0), + (" add", "World", 8, 3.0), + (" yeah ", "yeah", 10, 8.0) + ).toDF("a", "b", "q", "d") + + right = Seq[(String, String, java.lang.Integer, java.lang.Double)]( + ("abc", "", 4, 1.0), + ("", "Hello", 2, 2.0), + (" add", "World", 1, 3.0), + (" yeah ", "yeah", 0, 4.0) + ).toDF("a", "b", "c", "d") + + leftWithNull = Seq[(String, String, java.lang.Integer, java.lang.Double)]( + ("abc", null, 4, 2.0), + ("", "Hello", null, 1.0), + (" add", "World", 8, 3.0), + (" yeah ", "yeah", 10, 8.0) + ).toDF("a", "b", "q", "d") + + rightWithNull = Seq[(String, String, java.lang.Integer, java.lang.Double)]( + ("abc", "", 4, 1.0), + ("", "Hello", 2, 2.0), + (" add", null, 1, null), + (" yeah ", null, null, 4.0) + ).toDF("a", "b", "c", "d") + + person_test = spark.createDataFrame( + sparkContext.parallelize(Seq( + Row(3, "Carter"), + Row(1, "Adams"), + Row(2, "Bush") + )), new StructType() + .add("id_p", IntegerType) + .add("name", StringType)) + person_test.createOrReplaceTempView("person_test") + + order_test = spark.createDataFrame( + sparkContext.parallelize(Seq( + Row(5, 34764, 65), + Row(1, 77895, 3), + Row(2, 44678, 3), + Row(4, 24562, 1), + Row(3, 22456, 1) + )), new StructType() + .add("id_o", IntegerType) + .add("order_no", IntegerType) + .add("id_p", IntegerType)) + order_test.createOrReplaceTempView("order_test") + } + + test("validate columnar nestedLoopJoin exec happened") { + val res = left.join(right.hint("broadcast"), col("q") =!= col("c")) + assert( + res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, + s"ColumnarBroadcastNestedLoopJoinExec not happened, " + + s"executedPlan as follows: \n${res.queryExecution.executedPlan}") + } + + test("columnar nestedLoopJoin Inner Join is equal to native") { + val df = left.join(right.hint("mergejoin"), col("q") =!= col("c")) + val leftKeys = Seq(left.col("q").expr) + val rightKeys = Seq(right.col("c").expr) + checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, Inner) + } + + test("columnar nestedLoopJoin Inner Join is equal to native With NULL") { + val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") =!= col("c")) + val leftKeys = Seq(leftWithNull.col("q").expr) + val rightKeys = Seq(rightWithNull.col("c").expr) + checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, Inner) + } + + test("columnar nestedLoopJoin LeftOuter Join is equal to native") { + val df = left.join(right.hint("mergejoin"), col("q") =!= col("c")) + val leftKeys = Seq(left.col("q").expr) + val rightKeys = Seq(right.col("c").expr) + checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, LeftOuter) + } + + test("columnar nestedLoopJoin LeftOuter Join is equal to native With NULL") { + val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") =!= col("c")) + val leftKeys = Seq(leftWithNull.col("q").expr) + val rightKeys = Seq(rightWithNull.col("c").expr) + checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, LeftOuter) + } + + test("columnar nestedLoopJoin right outer join is equal to native") { + val df = left.join(right.hint("SHUFFLE_HASH"), col("q") =!= col("c"), "rightouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), + Row(null, null, null, null, "", "Hello", 2, 2.0), + Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), + Row(null, null, null, null, " yeah ", "yeah", 0, 4.0) + ), false) + } + + test("columnar nestedLoopJoin right outer join is equal to native with null") { + val df = leftWithNull.join(rightWithNull.hint("SHUFFLE_HASH"), + col("q") =!= col("c"), "rightouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), + Row(null, null, null, null, "", "Hello", 2, 2.0), + Row(null, null, null, null, " add", null, 1, null), + Row(null, null, null, null, " yeah ", null, null, 4.0) + ), false) + } + + test("columnar nestedLoopJoin FullOuter Join is equal to native") { + val df = left.join(right.hint("mergejoin"), col("q") =!= col("c")) + val leftKeys = Seq(left.col("q").expr) + val rightKeys = Seq(right.col("c").expr) + checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, FullOuter) + } + + test("columnar nestedLoopJoin FullOuter Join is equal to native With NULL") { + val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") =!= col("c")) + val leftKeys = Seq(leftWithNull.col("q").expr) + val rightKeys = Seq(rightWithNull.col("c").expr) + checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, FullOuter) + } + + def checkThatPlansAgreeTemplateForNLJ(df: DataFrame, leftKeys: Seq[Expression], + rightKeys: Seq[Expression], joinType: JoinType): Unit = { + checkThatPlansAgree( + df, + (child: SparkPlan) => + new ColumnarBroadcastNestedLoopJoinExec(child, child, BuildRight,joinType, + None), + (child: SparkPlan) => + BroadcastNestedLoopJoinExec(child, child, BuildRight,joinType, + None), + sortAnswers = false) + } + + + // + + test("columnar broadcastHashJoin is equal to native with null") { + val df = leftWithNull.join(rightWithNull.hint("broadcast"), + col("q").isNotNull === col("c").isNotNull) + val leftKeys = Seq(leftWithNull.col("q").isNotNull.expr) + val rightKeys = Seq(rightWithNull.col("c").isNotNull.expr) + checkThatPlansAgreeTemplateForBHJ(df, leftKeys, rightKeys) + } + + def checkThatPlansAgreeTemplateForBHJ(df: DataFrame, leftKeys: Seq[Expression], + rightKeys: Seq[Expression], joinType: JoinType = Inner): Unit = { + checkThatPlansAgree( + df, + (child: SparkPlan) => + ColumnarBroadcastNestedLoopJoinExec(child, child, BuildRight,joinType, + None), + (child: SparkPlan) => + BroadcastNestedLoopJoinExec(child, child, BuildRight,joinType, + None), + sortAnswers = false) + } + + test("validate columnar broadcastHashJoin left outer join happened") { + val res = left.join(right.hint("broadcast"), col("q") === col("c"), "leftouter") + assert( + res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, + s"ColumnarBroadcastNestedLoopJoinExec not happened," + + s" executedPlan as follows: \n${res.queryExecution.executedPlan}") + } + + test("columnar broadcastHashJoin left outer join is equal to native") { + val df = left.join(right.hint("broadcast"), col("q") === col("c"), "leftouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), + Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), + Row(" add", "World", 8, 3.0, null, null, null, null), + Row(" yeah ", "yeah", 10, 8.0, null, null, null, null) + ), false) + } + + test("columnar broadcastHashJoin left outer join is equal to native with null") { + val df = leftWithNull.join(rightWithNull.hint("broadcast"), + col("q").isNotNull === col("c").isNotNull, "leftouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), + Row("abc", null, 4, 2.0, "", "Hello", 2, 2.0), + Row("abc", null, 4, 2.0, " add", null, 1, null), + Row("", "Hello", null, 1.0, " yeah ", null, null, 4.0), + Row(" add", "World", 8, 3.0, "abc", "", 4, 1.0), + Row(" add", "World", 8, 3.0, "", "Hello", 2, 2.0), + Row(" add", "World", 8, 3.0, " add", null, 1, null), + Row(" yeah ", "yeah", 10, 8.0, "abc", "", 4, 1.0), + Row(" yeah ", "yeah", 10, 8.0, "", "Hello", 2, 2.0), + Row(" yeah ", "yeah", 10, 8.0, " add", null, 1, null) + + ), false) + } + + test("validate columnar shuffledHashJoin full outer join happened") { + val res = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "fullouter") + assert( + res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffledHashJoinExec]).isDefined, + s"ColumnarShuffledHashJoinExec not happened," + + s" executedPlan as follows: \n${res.queryExecution.executedPlan}") + } + + test("columnar shuffledHashJoin full outer join is equal to native") { + val df = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "fullouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row(null, null, null, null, " yeah ", "yeah", 0, 4.0), + Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), + Row(" yeah ", "yeah", 10, 8.0, null, null, null, null), + Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), + Row(" add", "World", 8, 3.0, null, null, null, null), + Row(null, null, null, null, "", "Hello", 2, 2.0) + ), false) + } + + test("columnar shuffledHashJoin full outer join is equal to native with null") { + val df = leftWithNull.join(rightWithNull.hint("SHUFFLE_HASH"), + col("q").isNotNull === col("c").isNotNull, "fullouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("", "Hello", null, 1.0, " yeah ", null, null, 4.0), + Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), + Row("abc", null, 4, 2.0, "", "Hello", 2, 2.0), + Row("abc", null, 4, 2.0, " add", null, 1, null), + Row(" add", "World", 8, 3.0, "abc", "", 4, 1.0), + Row(" add", "World", 8, 3.0, "", "Hello", 2, 2.0), + Row(" add", "World", 8, 3.0, " add", null, 1, null), + Row(" yeah ", "yeah", 10, 8.0, "abc", "", 4, 1.0), + Row(" yeah ", "yeah", 10, 8.0, "", "Hello", 2, 2.0), + Row(" yeah ", "yeah", 10, 8.0, " add", null, 1, null) + ), false) + } + + test("validate columnar shuffledHashJoin left outer join happened") { + val res = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "leftouter") + assert( + res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffledHashJoinExec]).isDefined, + s"ColumnarShuffledHashJoinExec not happened," + + s" executedPlan as follows: \n${res.queryExecution.executedPlan}") + } + + test("columnar shuffledHashJoin left outer join is equal to native") { + val df = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "leftouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), + Row(" yeah ", "yeah", 10, 8.0, null, null, null, null), + Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), + Row(" add", "World", 8, 3.0, null, null, null, null) + ), false) + } + + test("columnar shuffledHashJoin left outer join is equal to native with null") { + val df = leftWithNull.join(rightWithNull.hint("SHUFFLE_HASH"), + col("q") === col("c"), "leftouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), + Row("", "Hello", null, 1.0, null, null, null, null), + Row(" yeah ", "yeah", 10, 8.0, null, null, null, null), + Row(" add", "World", 8, 3.0, null, null, null, null) + ), false) + } + + + def checkThatPlansAgreeTemplateForSMJ(df: DataFrame, leftKeys: Seq[Expression], + rightKeys: Seq[Expression], joinType: JoinType): Unit = { + checkThatPlansAgree( + df, + (child: SparkPlan) => + new ColumnarSortMergeJoinExec(leftKeys, rightKeys, joinType, + None, child, child), + (child: SparkPlan) => + SortMergeJoinExec(leftKeys, rightKeys, joinType, + None, child, child), + sortAnswers = true) + } + + + + test("columnar ShuffledHashJoin right outer join is equal to native") { + val df = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "rightouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), + Row(null, null, null, null, "", "Hello", 2, 2.0), + Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), + Row(null, null, null, null, " yeah ", "yeah", 0, 4.0) + ), false) + } + + test("columnar ShuffledHashJoin right outer join is equal to native with null") { + val df = leftWithNull.join(rightWithNull.hint("SHUFFLE_HASH"), + col("q") === col("c"), "rightouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), + Row(null, null, null, null, "", "Hello", 2, 2.0), + Row(null, null, null, null, " add", null, 1, null), + Row(null, null, null, null, " yeah ", null, null, 4.0) + ), false) + } + + test("columnar BroadcastHashJoin right outer join is equal to native") { + val df = left.join(right.hint("broadcast"), col("q") === col("c"), "rightouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), + Row(null, null, null, null, "", "Hello", 2, 2.0), + Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), + Row(null, null, null, null, " yeah ", "yeah", 0, 4.0) + ), false) + } + + test("columnar BroadcastHashJoin right outer join is equal to native with null") { + val df = leftWithNull.join(rightWithNull.hint("broadcast"), col("q") === col("c"), "rightouter") + checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( + Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), + Row(null, null, null, null, "", "Hello", 2, 2.0), + Row(null, null, null, null, " add", null, 1, null), + Row(null, null, null, null, " yeah ", null, null, 4.0) + ), false) + } + + + + test("ShuffledHashJoin and project funsion test for reorder columns") { + val omniResult = person_test.join(order_test.hint("SHUFFLE_HASH"), person_test("id_p") === order_test("id_p"), "inner") + .select(order_test("order_no"), person_test("name"), order_test("id_p")) + val omniPlan = omniResult.queryExecution.executedPlan + assert(omniPlan.find(_.isInstanceOf[ColumnarProjectExec]).isEmpty, + s"SQL:\n@OmniEnv no ColumnarProjectExec,omniPlan:${omniPlan}") + checkAnswer(omniResult, _ => omniPlan, Seq( + Row(77895, "Carter", 3), + Row(44678, "Carter", 3), + Row(24562, "Adams", 1), + Row(22456, "Adams", 1) + ), false) + } + +} \ No newline at end of file -- Gitee From fe3fa2985d11642b1b511bc095bc7179f2b1c7c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Mon, 9 Dec 2024 16:41:58 +0800 Subject: [PATCH 10/15] =?UTF-8?q?=E7=BC=96=E5=86=99extennsion=E4=BE=A7?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ColumnarNestedLoopJoinExecSuite.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala index 4269a8fe0..09ef724c4 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala @@ -93,7 +93,7 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { } test("validate columnar nestedLoopJoin exec happened") { - val res = left.join(right.hint("broadcast"), col("q") =!= col("c")) + val res = left.join(right.hint("broadcast"), col("q") < col("c")) assert( res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, s"ColumnarBroadcastNestedLoopJoinExec not happened, " + @@ -101,63 +101,63 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { } test("columnar nestedLoopJoin Inner Join is equal to native") { - val df = left.join(right.hint("mergejoin"), col("q") =!= col("c")) + val df = left.join(right.hint("mergejoin"), col("q") < col("c")) val leftKeys = Seq(left.col("q").expr) val rightKeys = Seq(right.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, Inner) } test("columnar nestedLoopJoin Inner Join is equal to native With NULL") { - val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") =!= col("c")) + val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") < col("c")) val leftKeys = Seq(leftWithNull.col("q").expr) val rightKeys = Seq(rightWithNull.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, Inner) } test("columnar nestedLoopJoin LeftOuter Join is equal to native") { - val df = left.join(right.hint("mergejoin"), col("q") =!= col("c")) + val df = left.join(right.hint("mergejoin"), col("q") < col("c")) val leftKeys = Seq(left.col("q").expr) val rightKeys = Seq(right.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, LeftOuter) } test("columnar nestedLoopJoin LeftOuter Join is equal to native With NULL") { - val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") =!= col("c")) + val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") < col("c")) val leftKeys = Seq(leftWithNull.col("q").expr) val rightKeys = Seq(rightWithNull.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, LeftOuter) } test("columnar nestedLoopJoin right outer join is equal to native") { - val df = left.join(right.hint("SHUFFLE_HASH"), col("q") =!= col("c"), "rightouter") + val df = left.join(right.hint("SHUFFLE_HASH"), col("q") < col("c"), "rightouter") checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), - Row(null, null, null, null, "", "Hello", 2, 2.0), - Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), - Row(null, null, null, null, " yeah ", "yeah", 0, 4.0) + Row("", "Hello", 1, 1.0, "abc", "", 4, 1.0), + Row("", "Hello", 1, 1.0, "", "Hello", 2, 2.0), + Row(null, null, null, null, " yeah ", "yeah", 0, 4.0), + Row(null, null, null, null, "add", "World", 1, 3.0) ), false) } test("columnar nestedLoopJoin right outer join is equal to native with null") { val df = leftWithNull.join(rightWithNull.hint("SHUFFLE_HASH"), - col("q") =!= col("c"), "rightouter") + col("q") < col("c"), "rightouter") checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), + Row(null, null, null, null, "yeah", null,null, 4.0), + Row(null, null, null, null, "abc", "", 4, 1.0), Row(null, null, null, null, "", "Hello", 2, 2.0), - Row(null, null, null, null, " add", null, 1, null), - Row(null, null, null, null, " yeah ", null, null, 4.0) + Row(null, null, null, null, "add", null, 1, null) ), false) } test("columnar nestedLoopJoin FullOuter Join is equal to native") { - val df = left.join(right.hint("mergejoin"), col("q") =!= col("c")) + val df = left.join(right.hint("mergejoin"), col("q") < col("c")) val leftKeys = Seq(left.col("q").expr) val rightKeys = Seq(right.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, FullOuter) } test("columnar nestedLoopJoin FullOuter Join is equal to native With NULL") { - val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") =!= col("c")) + val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") < col("c")) val leftKeys = Seq(leftWithNull.col("q").expr) val rightKeys = Seq(rightWithNull.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, FullOuter) -- Gitee From 8d800ef85056002d242371d66973a60e055e37b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Mon, 9 Dec 2024 16:43:21 +0800 Subject: [PATCH 11/15] =?UTF-8?q?=E7=BC=96=E5=86=99extennsion=E4=BE=A7?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ColumnarNestedLoopJoinExecSuite.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala index 09ef724c4..b6cbafd7b 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala @@ -93,7 +93,7 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { } test("validate columnar nestedLoopJoin exec happened") { - val res = left.join(right.hint("broadcast"), col("q") < col("c")) + val res = left.join(right, col("q") < col("c")) assert( res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, s"ColumnarBroadcastNestedLoopJoinExec not happened, " + @@ -101,35 +101,35 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { } test("columnar nestedLoopJoin Inner Join is equal to native") { - val df = left.join(right.hint("mergejoin"), col("q") < col("c")) + val df = left.join(right, col("q") < col("c")) val leftKeys = Seq(left.col("q").expr) val rightKeys = Seq(right.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, Inner) } test("columnar nestedLoopJoin Inner Join is equal to native With NULL") { - val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") < col("c")) + val df = leftWithNull.join(rightWithNull, col("q") < col("c")) val leftKeys = Seq(leftWithNull.col("q").expr) val rightKeys = Seq(rightWithNull.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, Inner) } test("columnar nestedLoopJoin LeftOuter Join is equal to native") { - val df = left.join(right.hint("mergejoin"), col("q") < col("c")) + val df = left.join(right, col("q") < col("c")) val leftKeys = Seq(left.col("q").expr) val rightKeys = Seq(right.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, LeftOuter) } test("columnar nestedLoopJoin LeftOuter Join is equal to native With NULL") { - val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") < col("c")) + val df = leftWithNull.join(rightWithNull, col("q") < col("c")) val leftKeys = Seq(leftWithNull.col("q").expr) val rightKeys = Seq(rightWithNull.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, LeftOuter) } test("columnar nestedLoopJoin right outer join is equal to native") { - val df = left.join(right.hint("SHUFFLE_HASH"), col("q") < col("c"), "rightouter") + val df = left.join(right, col("q") < col("c"), "rightouter") checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( Row("", "Hello", 1, 1.0, "abc", "", 4, 1.0), Row("", "Hello", 1, 1.0, "", "Hello", 2, 2.0), @@ -139,7 +139,7 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { } test("columnar nestedLoopJoin right outer join is equal to native with null") { - val df = leftWithNull.join(rightWithNull.hint("SHUFFLE_HASH"), + val df = leftWithNull.join(rightWithNull, col("q") < col("c"), "rightouter") checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( Row(null, null, null, null, "yeah", null,null, 4.0), @@ -150,14 +150,14 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { } test("columnar nestedLoopJoin FullOuter Join is equal to native") { - val df = left.join(right.hint("mergejoin"), col("q") < col("c")) + val df = left.join(right, col("q") < col("c")) val leftKeys = Seq(left.col("q").expr) val rightKeys = Seq(right.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, FullOuter) } test("columnar nestedLoopJoin FullOuter Join is equal to native With NULL") { - val df = leftWithNull.join(rightWithNull.hint("mergejoin"), col("q") < col("c")) + val df = leftWithNull.join(rightWithNull, col("q") < col("c")) val leftKeys = Seq(leftWithNull.col("q").expr) val rightKeys = Seq(rightWithNull.col("c").expr) checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, FullOuter) -- Gitee From b2050f5821000b126bfa1599e0ec701c62a5ecb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Mon, 9 Dec 2024 16:51:46 +0800 Subject: [PATCH 12/15] =?UTF-8?q?=E7=BC=96=E5=86=99extennsion=E4=BE=A7?= =?UTF-8?q?=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala index b6cbafd7b..e7542408b 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala @@ -177,7 +177,7 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { } - // + //以下部分先不删除,之后可能要继续借鉴 test("columnar broadcastHashJoin is equal to native with null") { val df = leftWithNull.join(rightWithNull.hint("broadcast"), -- Gitee From 97336191158b4ba1b085eacd335abbddf89e3f14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Tue, 10 Dec 2024 21:21:06 +0800 Subject: [PATCH 13/15] =?UTF-8?q?=E7=BC=96=E5=86=99extennsion=E4=BE=A7?= =?UTF-8?q?=E7=94=A8=E4=BE=8B,=E6=A0=B9=E6=8D=AE=E6=A3=80=E8=A7=86?= =?UTF-8?q?=E6=84=8F=E8=A7=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../boostkit/spark/ColumnarPluginConfig.scala | 8 + .../boostkit/spark/TransformHintRule.scala | 31 +- .../ColumnarBroadcastNestedLoopJoinExec.scala | 13 +- .../ColumnarNestedLoopJoinExecSuite.scala | 436 ++++++------------ 4 files changed, 146 insertions(+), 342 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index e807f96fc..ea846f188 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -59,6 +59,8 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { def enableColumnarBroadcastJoin: Boolean = conf.getConf(ENABLE_COLUMNAR_BROADCAST_JOIN) + def enableColumnarBroadcastNestedJoin: Boolean = conf.getConf(ENABLE_COLUMNAR_BROADCAST_NESTED_JOIN) + def enableShareBroadcastJoinHashTable: Boolean = conf.getConf(ENABLE_SHARE_BROADCAST_JOIN_HASH_TABLE) def enableHeuristicJoinReorder: Boolean = conf.getConf(ENABLE_HEURISTIC_JOIN_REORDER) @@ -287,6 +289,12 @@ object ColumnarPluginConfig { .booleanConf .createWithDefault(true) + val ENABLE_COLUMNAR_BROADCAST_NESTED_JOIN = buildConf("spark.omni.sql.columnar.broadcastNestedJoin") + .internal() + .doc("enable or disable columnar broadcastNestedJoin") + .booleanConf + .createWithDefault(true) + val ENABLE_SHARE_BROADCAST_JOIN_HASH_TABLE = buildConf("spark.omni.sql.columnar.broadcastJoin.sharehashtable") .internal() .doc("enable or disable share columnar BroadcastHashJoin hashtable") diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/TransformHintRule.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/TransformHintRule.scala index c53c463d1..90399b6ff 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/TransformHintRule.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/TransformHintRule.scala @@ -120,6 +120,7 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { val enableColumnarExpand: Boolean = columnarConf.enableColumnarExpand val enableColumnarBroadcastExchange: Boolean = columnarConf.enableColumnarBroadcastExchange val enableColumnarBroadcastJoin: Boolean = columnarConf.enableColumnarBroadcastJoin + val enableColumnarBroadcastNestedJoin: Boolean = columnarConf.enableColumnarBroadcastNestedJoin val enableColumnarSortMergeJoin: Boolean = columnarConf.enableColumnarSortMergeJoin val enableShuffledHashJoin: Boolean = columnarConf.enableShuffledHashJoin val enableColumnarFileScan: Boolean = columnarConf.enableColumnarFileScan @@ -318,39 +319,11 @@ case class AddTransformHintRule() extends Rule[SparkPlan] { case plan: BroadcastNestedLoopJoinExec => // We need to check if BroadcastExchangeExec can be converted to columnar-based. // If not, BHJ should also be row-based. - if (!enableColumnarBroadcastJoin) { + if (!enableColumnarBroadcastNestedJoin) { TransformHints.tagNotTransformable( plan, "columnar BroadcastNestedLoopJoin is not enabled in BroadcastNestedLoopJoinExec") return } - val left = plan.left - left match { - case exec: BroadcastExchangeExec => - new ColumnarBroadcastExchangeExec(exec.mode, exec.child) - case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec, _) => - new ColumnarBroadcastExchangeExec(plan.mode, plan.child) - case BroadcastQueryStageExec(_, plan: ReusedExchangeExec, _) => - plan match { - case ReusedExchangeExec(_, b: BroadcastExchangeExec) => - new ColumnarBroadcastExchangeExec(b.mode, b.child) - case _ => - } - case _ => - } - val right = plan.right - right match { - case exec: BroadcastExchangeExec => - new ColumnarBroadcastExchangeExec(exec.mode, exec.child) - case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec, _) => - new ColumnarBroadcastExchangeExec(plan.mode, plan.child) - case BroadcastQueryStageExec(_, plan: ReusedExchangeExec, _) => - plan match { - case ReusedExchangeExec(_, b: BroadcastExchangeExec) => - new ColumnarBroadcastExchangeExec(b.mode, b.child) - case _ => - } - case _ => - } ColumnarBroadcastNestedLoopJoinExec( plan.left, plan.right, diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 64a1756db..646bb5ca1 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.execution.vectorized.OmniColumnVector import org.apache.spark.sql.vectorized.ColumnarBatch /** - * Performs an inner hash join of two child relations. When the output RDD of this operator is + * Performs a nested loop join of two child relations. When the output RDD of this operator is * being constructed, a Spark job is asynchronously started to calculate the values for the * broadcast relation. This data is then placed in a Spark broadcast variable. The streamedPlan * relation is not shuffled. @@ -151,7 +151,11 @@ case class ColumnarBroadcastNestedLoopJoinExec( def buildCheck(): Unit = { joinType match { - case LeftOuter | Inner | RightOuter => + case Inner => + case LeftOuter => + require(buildSide == BuildRight, "In left outer join case,buildSide must be BuildRight.") + case RightOuter => + require(buildSide == BuildLeft, "In right outer join case,buildSide must be BuildLeft.") case _ => throw new UnsupportedOperationException(s"Join-type[${joinType}] is not supported " + s"in ${this.nodeName}") @@ -248,7 +252,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( case e: Exception => { op.close() opFactory.close() - throw new RuntimeException("HashBuilder getOutput failed") + throw new RuntimeException("NestedLoopJoinBuilder getOutput failed") } } buildGetOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildGetOp) @@ -309,7 +313,6 @@ case class ColumnarBroadcastNestedLoopJoinExec( results = lookupOp.getOutput res = results.hasNext lookupGetOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startLookupGetOp) - } if (results == null) { false @@ -323,9 +326,7 @@ case class ColumnarBroadcastNestedLoopJoinExec( res } } - } - override def next(): ColumnarBatch = { val startLookupGetOp = System.nanoTime() val result = results.next() diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala index e7542408b..24014a416 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala @@ -67,312 +67,134 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { (" add", null, 1, null), (" yeah ", null, null, 4.0) ).toDF("a", "b", "c", "d") - - person_test = spark.createDataFrame( - sparkContext.parallelize(Seq( - Row(3, "Carter"), - Row(1, "Adams"), - Row(2, "Bush") - )), new StructType() - .add("id_p", IntegerType) - .add("name", StringType)) - person_test.createOrReplaceTempView("person_test") - - order_test = spark.createDataFrame( - sparkContext.parallelize(Seq( - Row(5, 34764, 65), - Row(1, 77895, 3), - Row(2, 44678, 3), - Row(4, 24562, 1), - Row(3, 22456, 1) - )), new StructType() - .add("id_o", IntegerType) - .add("order_no", IntegerType) - .add("id_p", IntegerType)) - order_test.createOrReplaceTempView("order_test") - } - - test("validate columnar nestedLoopJoin exec happened") { - val res = left.join(right, col("q") < col("c")) - assert( - res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, - s"ColumnarBroadcastNestedLoopJoinExec not happened, " + - s"executedPlan as follows: \n${res.queryExecution.executedPlan}") - } - - test("columnar nestedLoopJoin Inner Join is equal to native") { - val df = left.join(right, col("q") < col("c")) - val leftKeys = Seq(left.col("q").expr) - val rightKeys = Seq(right.col("c").expr) - checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, Inner) - } - - test("columnar nestedLoopJoin Inner Join is equal to native With NULL") { - val df = leftWithNull.join(rightWithNull, col("q") < col("c")) - val leftKeys = Seq(leftWithNull.col("q").expr) - val rightKeys = Seq(rightWithNull.col("c").expr) - checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, Inner) - } - - test("columnar nestedLoopJoin LeftOuter Join is equal to native") { - val df = left.join(right, col("q") < col("c")) - val leftKeys = Seq(left.col("q").expr) - val rightKeys = Seq(right.col("c").expr) - checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, LeftOuter) - } - - test("columnar nestedLoopJoin LeftOuter Join is equal to native With NULL") { - val df = leftWithNull.join(rightWithNull, col("q") < col("c")) - val leftKeys = Seq(leftWithNull.col("q").expr) - val rightKeys = Seq(rightWithNull.col("c").expr) - checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, LeftOuter) - } - - test("columnar nestedLoopJoin right outer join is equal to native") { - val df = left.join(right, col("q") < col("c"), "rightouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("", "Hello", 1, 1.0, "abc", "", 4, 1.0), - Row("", "Hello", 1, 1.0, "", "Hello", 2, 2.0), - Row(null, null, null, null, " yeah ", "yeah", 0, 4.0), - Row(null, null, null, null, "add", "World", 1, 3.0) - ), false) - } - - test("columnar nestedLoopJoin right outer join is equal to native with null") { - val df = leftWithNull.join(rightWithNull, - col("q") < col("c"), "rightouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row(null, null, null, null, "yeah", null,null, 4.0), - Row(null, null, null, null, "abc", "", 4, 1.0), - Row(null, null, null, null, "", "Hello", 2, 2.0), - Row(null, null, null, null, "add", null, 1, null) - ), false) - } - - test("columnar nestedLoopJoin FullOuter Join is equal to native") { - val df = left.join(right, col("q") < col("c")) - val leftKeys = Seq(left.col("q").expr) - val rightKeys = Seq(right.col("c").expr) - checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, FullOuter) - } - - test("columnar nestedLoopJoin FullOuter Join is equal to native With NULL") { - val df = leftWithNull.join(rightWithNull, col("q") < col("c")) - val leftKeys = Seq(leftWithNull.col("q").expr) - val rightKeys = Seq(rightWithNull.col("c").expr) - checkThatPlansAgreeTemplateForNLJ(df, leftKeys, rightKeys, FullOuter) - } - - def checkThatPlansAgreeTemplateForNLJ(df: DataFrame, leftKeys: Seq[Expression], - rightKeys: Seq[Expression], joinType: JoinType): Unit = { - checkThatPlansAgree( - df, - (child: SparkPlan) => - new ColumnarBroadcastNestedLoopJoinExec(child, child, BuildRight,joinType, - None), - (child: SparkPlan) => - BroadcastNestedLoopJoinExec(child, child, BuildRight,joinType, - None), - sortAnswers = false) - } - - - //以下部分先不删除,之后可能要继续借鉴 - - test("columnar broadcastHashJoin is equal to native with null") { - val df = leftWithNull.join(rightWithNull.hint("broadcast"), - col("q").isNotNull === col("c").isNotNull) - val leftKeys = Seq(leftWithNull.col("q").isNotNull.expr) - val rightKeys = Seq(rightWithNull.col("c").isNotNull.expr) - checkThatPlansAgreeTemplateForBHJ(df, leftKeys, rightKeys) - } - - def checkThatPlansAgreeTemplateForBHJ(df: DataFrame, leftKeys: Seq[Expression], - rightKeys: Seq[Expression], joinType: JoinType = Inner): Unit = { - checkThatPlansAgree( - df, - (child: SparkPlan) => - ColumnarBroadcastNestedLoopJoinExec(child, child, BuildRight,joinType, - None), - (child: SparkPlan) => - BroadcastNestedLoopJoinExec(child, child, BuildRight,joinType, - None), - sortAnswers = false) - } - - test("validate columnar broadcastHashJoin left outer join happened") { - val res = left.join(right.hint("broadcast"), col("q") === col("c"), "leftouter") - assert( - res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, - s"ColumnarBroadcastNestedLoopJoinExec not happened," + - s" executedPlan as follows: \n${res.queryExecution.executedPlan}") - } - - test("columnar broadcastHashJoin left outer join is equal to native") { - val df = left.join(right.hint("broadcast"), col("q") === col("c"), "leftouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), - Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), - Row(" add", "World", 8, 3.0, null, null, null, null), - Row(" yeah ", "yeah", 10, 8.0, null, null, null, null) - ), false) - } - - test("columnar broadcastHashJoin left outer join is equal to native with null") { - val df = leftWithNull.join(rightWithNull.hint("broadcast"), - col("q").isNotNull === col("c").isNotNull, "leftouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), - Row("abc", null, 4, 2.0, "", "Hello", 2, 2.0), - Row("abc", null, 4, 2.0, " add", null, 1, null), - Row("", "Hello", null, 1.0, " yeah ", null, null, 4.0), - Row(" add", "World", 8, 3.0, "abc", "", 4, 1.0), - Row(" add", "World", 8, 3.0, "", "Hello", 2, 2.0), - Row(" add", "World", 8, 3.0, " add", null, 1, null), - Row(" yeah ", "yeah", 10, 8.0, "abc", "", 4, 1.0), - Row(" yeah ", "yeah", 10, 8.0, "", "Hello", 2, 2.0), - Row(" yeah ", "yeah", 10, 8.0, " add", null, 1, null) - - ), false) - } - - test("validate columnar shuffledHashJoin full outer join happened") { - val res = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "fullouter") - assert( - res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffledHashJoinExec]).isDefined, - s"ColumnarShuffledHashJoinExec not happened," + - s" executedPlan as follows: \n${res.queryExecution.executedPlan}") - } - - test("columnar shuffledHashJoin full outer join is equal to native") { - val df = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "fullouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row(null, null, null, null, " yeah ", "yeah", 0, 4.0), - Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), - Row(" yeah ", "yeah", 10, 8.0, null, null, null, null), - Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), - Row(" add", "World", 8, 3.0, null, null, null, null), - Row(null, null, null, null, "", "Hello", 2, 2.0) - ), false) - } - - test("columnar shuffledHashJoin full outer join is equal to native with null") { - val df = leftWithNull.join(rightWithNull.hint("SHUFFLE_HASH"), - col("q").isNotNull === col("c").isNotNull, "fullouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("", "Hello", null, 1.0, " yeah ", null, null, 4.0), - Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), - Row("abc", null, 4, 2.0, "", "Hello", 2, 2.0), - Row("abc", null, 4, 2.0, " add", null, 1, null), - Row(" add", "World", 8, 3.0, "abc", "", 4, 1.0), - Row(" add", "World", 8, 3.0, "", "Hello", 2, 2.0), - Row(" add", "World", 8, 3.0, " add", null, 1, null), - Row(" yeah ", "yeah", 10, 8.0, "abc", "", 4, 1.0), - Row(" yeah ", "yeah", 10, 8.0, "", "Hello", 2, 2.0), - Row(" yeah ", "yeah", 10, 8.0, " add", null, 1, null) - ), false) - } - - test("validate columnar shuffledHashJoin left outer join happened") { - val res = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "leftouter") - assert( - res.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarShuffledHashJoinExec]).isDefined, - s"ColumnarShuffledHashJoinExec not happened," + - s" executedPlan as follows: \n${res.queryExecution.executedPlan}") - } - - test("columnar shuffledHashJoin left outer join is equal to native") { - val df = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "leftouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), - Row(" yeah ", "yeah", 10, 8.0, null, null, null, null), - Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), - Row(" add", "World", 8, 3.0, null, null, null, null) - ), false) - } - - test("columnar shuffledHashJoin left outer join is equal to native with null") { - val df = leftWithNull.join(rightWithNull.hint("SHUFFLE_HASH"), - col("q") === col("c"), "leftouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), - Row("", "Hello", null, 1.0, null, null, null, null), - Row(" yeah ", "yeah", 10, 8.0, null, null, null, null), - Row(" add", "World", 8, 3.0, null, null, null, null) - ), false) - } - - - def checkThatPlansAgreeTemplateForSMJ(df: DataFrame, leftKeys: Seq[Expression], - rightKeys: Seq[Expression], joinType: JoinType): Unit = { - checkThatPlansAgree( - df, - (child: SparkPlan) => - new ColumnarSortMergeJoinExec(leftKeys, rightKeys, joinType, - None, child, child), - (child: SparkPlan) => - SortMergeJoinExec(leftKeys, rightKeys, joinType, - None, child, child), - sortAnswers = true) - } - - - - test("columnar ShuffledHashJoin right outer join is equal to native") { - val df = left.join(right.hint("SHUFFLE_HASH"), col("q") === col("c"), "rightouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), - Row(null, null, null, null, "", "Hello", 2, 2.0), - Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), - Row(null, null, null, null, " yeah ", "yeah", 0, 4.0) - ), false) - } - - test("columnar ShuffledHashJoin right outer join is equal to native with null") { - val df = leftWithNull.join(rightWithNull.hint("SHUFFLE_HASH"), - col("q") === col("c"), "rightouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), - Row(null, null, null, null, "", "Hello", 2, 2.0), - Row(null, null, null, null, " add", null, 1, null), - Row(null, null, null, null, " yeah ", null, null, 4.0) - ), false) - } - - test("columnar BroadcastHashJoin right outer join is equal to native") { - val df = left.join(right.hint("broadcast"), col("q") === col("c"), "rightouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), - Row(null, null, null, null, "", "Hello", 2, 2.0), - Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), - Row(null, null, null, null, " yeah ", "yeah", 0, 4.0) - ), false) - } - - test("columnar BroadcastHashJoin right outer join is equal to native with null") { - val df = leftWithNull.join(rightWithNull.hint("broadcast"), col("q") === col("c"), "rightouter") - checkAnswer(df, _ => df.queryExecution.executedPlan, Seq( - Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), - Row(null, null, null, null, "", "Hello", 2, 2.0), - Row(null, null, null, null, " add", null, 1, null), - Row(null, null, null, null, " yeah ", null, null, 4.0) - ), false) - } - - - - test("ShuffledHashJoin and project funsion test for reorder columns") { - val omniResult = person_test.join(order_test.hint("SHUFFLE_HASH"), person_test("id_p") === order_test("id_p"), "inner") - .select(order_test("order_no"), person_test("name"), order_test("id_p")) - val omniPlan = omniResult.queryExecution.executedPlan - assert(omniPlan.find(_.isInstanceOf[ColumnarProjectExec]).isEmpty, - s"SQL:\n@OmniEnv no ColumnarProjectExec,omniPlan:${omniPlan}") - checkAnswer(omniResult, _ => omniPlan, Seq( - Row(77895, "Carter", 3), - Row(44678, "Carter", 3), - Row(24562, "Adams", 1), - Row(22456, "Adams", 1) - ), false) } - + test("columnar nestedLoopJoin Inner Join is equal to native") { + val df = left.join(right, col("q") < col("c")) + assert( + df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, + s"ColumnarBroadcastNestedLoopJoinExec not happened, " + + s"executedPlan as follows: \n${df.queryExecution.executedPlan}") + checkAnswer(df, Seq( + Row("", "Hello", 1, 1.0, "abc", "", 4, 1.0), + Row("", "Hello", 1, 1.0, "", "Hello", 2, 2.0) + )) + } + + test("columnar nestedLoopJoin Inner Join is equal to native With NULL") { + val df = leftWithNull.join(rightWithNull, col("q") < col("c")) + assert( + df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, + s"ColumnarBroadcastNestedLoopJoinExec not happened, " + + s"executedPlan as follows: \n${df.queryExecution.executedPlan}") + checkAnswer(df, Seq()) + } + + test("columnar nestedLoopJoin LeftOuter Join is equal to native") { + val df = left.join(right, col("q") < col("c"),"leftouter") + assert( + df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, + s"ColumnarBroadcastNestedLoopJoinExec not happened, " + + s"executedPlan as follows: \n${df.queryExecution.executedPlan}") + checkAnswer(df, Seq( + Row("abc", "", 4, 2.0, null, null, null, null), + Row(" yeah ", "yeah", 10, 8.0, null, null, null, null), + Row("", "Hello", 1, 1.0, "abc", "", 4, 1.0), + Row("", "Hello", 1, 1.0, "", "Hello", 2, 2.0), + Row(" add", "World", 8, 3.0, null, null, null, null) + )) + } + + test("columnar nestedLoopJoin LeftOuter Join is equal to native With NULL") { + val df = leftWithNull.join(rightWithNull, col("q") < col("c"),"leftouter") + assert( + df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, + s"ColumnarBroadcastNestedLoopJoinExec not happened, " + + s"executedPlan as follows: \n${df.queryExecution.executedPlan}") + checkAnswer(df, Seq( + Row(" add", "World", 8, 3.0, null, null, null, null), + Row(" yeah ", "yeah", 10, 8.0, null, null, null, null), + Row("", "Hello", null, 1.0, null, null, null, null), + Row("abc", null, 4, 2.0, null, null, null, null) + )) + } + + test("columnar nestedLoopJoin right outer join is equal to native") { + val df = left.join(right, col("q") < col("c"), "rightouter") + assert( + df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, + s"ColumnarBroadcastNestedLoopJoinExec not happened, " + + s"executedPlan as follows: \n${df.queryExecution.executedPlan}") + checkAnswer(df, Seq( + Row("", "Hello", 1, 1.0, "abc", "", 4, 1.0), + Row("", "Hello", 1, 1.0, "", "Hello", 2, 2.0), + Row(null, null, null, null, "add", "World", 1, 3.0), + Row(null, null, null, null, " yeah ", "yeah", 0, 4.0) + )) + } + + test("columnar nestedLoopJoin right outer join is equal to native with null") { + val df = leftWithNull.join(rightWithNull, col("q") < col("c"), "rightouter") + assert( + df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, + s"ColumnarBroadcastNestedLoopJoinExec not happened, " + + s"executedPlan as follows: \n${df.queryExecution.executedPlan}") + checkAnswer(df, Seq( + Row(null, null, null, null, "yeah", null, null, 4.0), + Row(null, null, null, null, "abc", "", 4, 1.0), + Row(null, null, null, null, "", "Hello", 2, 2.0), + Row(null, null, null, null, "add", null, 1, null) + )) + } + + test("columnar nestedLoopJoin Cross Join is equal to native") { + val df = left.join(right) + assert( + df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, + s"ColumnarBroadcastNestedLoopJoinExec not happened, " + + s"executedPlan as follows: \n${df.queryExecution.executedPlan}") + checkAnswer(df, Seq( + Row("abc", "", 4, 2.0, "abc", "", 4, 1.0), + Row("abc", "", 4, 2.0, "", "Hello", 2, 2.0), + Row("abc", "", 4, 2.0, " yeah ", "yeah", 0, 4.0), + Row("abc", "", 4, 2.0, " add", "World", 1, 3.0), + Row(" yeah ", "yeah", 10, 8.0, "abc", "", 4, 1.0), + Row(" yeah ", "yeah", 10, 8.0, "","Hello", 2, 2.0), + Row(" yeah ", "yeah", 10, 8.0, " yeah ", "yeah", 0, 4.0), + Row(" yeah ", "yeah", 10, 8.0, " add", "World", 1, 3.0), + Row("", "Hello", 1, 1.0, "abc", "", 4, 1.0), + Row("", "Hello", 1, 1.0, "", "Hello", 2, 2.0), + Row("", "Hello", 1, 1.0, " yeah ", "yeah", 0, 4.0), + Row("", "Hello", 1, 1.0, " add", "World", 1, 3.0), + Row(" add", "World", 8, 3.0, "abc", "", 4, 1.0), + Row(" add", "World", 8, 3.0, "", "Hello", 2, 2.0), + Row(" add", "World", 8, 3.0, " yeah ", "yeah", 0, 4.0), + Row(" add", "World", 8, 3.0, " add", "World", 1, 3.0) + )) + } + + test("columnar nestedLoopJoin Cross Join is equal to native With NULL") { + val df = leftWithNull.join(rightWithNull) + assert( + df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, + s"ColumnarBroadcastNestedLoopJoinExec not happened, " + + s"executedPlan as follows: \n${df.queryExecution.executedPlan}") + checkAnswer(df, Seq( + Row("abc", null, 4, 2.0, " yeah ", null, null, 4.0), + Row("abc", null, 4, 2.0, "abc", "", 4, 1.0), + Row("abc", null, 4, 2.0, "", "Hello", 2, 2.0), + Row("abc", null, 4, 2.0, " add", null, 1, null), + Row(" yeah ", "yeah", 10, 8.0, " yeah ", null, null, 4.0), + Row(" yeah ", "yeah", 10, 8.0, "abc", "", 4, 1.0), + Row(" yeah ", "yeah", 10, 8.0, "", "Hello", 2, 2.0), + Row(" yeah ", "yeah", 10, 8.0, " add", null, 1, null), + Row(" add", "World", 8, 3.0, " yeah ", null, null, 4.0), + Row(" add", "World", 8, 3.0, "abc", "", 4, 1.0), + Row(" add", "World", 8, 3.0, "", "Hello", 2, 2.0), + Row(" add", "World", 8, 3.0, " add", null, 1, null), + Row("", "Hello", null, 1.0, " yeah ", null, null, 4.0), + Row("", "Hello", null, 1.0, "abc", "", 4, 1.0), + Row("", "Hello", null, 1.0, "", "Hello", 2, 2.0), + Row("", "Hello", null, 1.0, " add", null, 1, null) + )) + } } \ No newline at end of file -- Gitee From f8d3010530c23ad16ee6a22255c5e8afc3a91f6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Tue, 10 Dec 2024 21:48:17 +0800 Subject: [PATCH 14/15] =?UTF-8?q?=E7=BC=96=E5=86=99extennsion=E4=BE=A7?= =?UTF-8?q?=E7=94=A8=E4=BE=8B,=E6=A0=B9=E6=8D=AE=E6=A3=80=E8=A7=86?= =?UTF-8?q?=E6=84=8F=E8=A7=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ColumnarNestedLoopJoinExecSuite.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala index 24014a416..ab92dcbbc 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{IntegerType, StringType, StructType} -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{DataFrame, QueryTest, Row} // refer to joins package class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { @@ -127,7 +127,7 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { checkAnswer(df, Seq( Row("", "Hello", 1, 1.0, "abc", "", 4, 1.0), Row("", "Hello", 1, 1.0, "", "Hello", 2, 2.0), - Row(null, null, null, null, "add", "World", 1, 3.0), + Row(null, null, null, null, " add", "World", 1, 3.0), Row(null, null, null, null, " yeah ", "yeah", 0, 4.0) )) } @@ -138,12 +138,18 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { df.queryExecution.executedPlan.find(_.isInstanceOf[ColumnarBroadcastNestedLoopJoinExec]).isDefined, s"ColumnarBroadcastNestedLoopJoinExec not happened, " + s"executedPlan as follows: \n${df.queryExecution.executedPlan}") - checkAnswer(df, Seq( + assert(QueryTest.sameRows(Seq( Row(null, null, null, null, "yeah", null, null, 4.0), Row(null, null, null, null, "abc", "", 4, 1.0), Row(null, null, null, null, "", "Hello", 2, 2.0), - Row(null, null, null, null, "add", null, 1, null) - )) + Row(null, null, null, null, " add", null, 1, null) + ),df.collect()).isEmpty,"the run value is error") +// checkAnswer(df, Seq( +// Row(null, null, null, null, "yeah", null, null, 4.0), +// Row(null, null, null, null, "abc", "", 4, 1.0), +// Row(null, null, null, null, "", "Hello", 2, 2.0), +// Row(null, null, null, null, "add", null, 1, null) +// )) } test("columnar nestedLoopJoin Cross Join is equal to native") { -- Gitee From c19f77a6da3b3c326298eabbc681b9a5d0243941 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=85=E6=B5=A9=E7=BF=94?= <1549665469@qq.com> Date: Tue, 10 Dec 2024 21:52:30 +0800 Subject: [PATCH 15/15] =?UTF-8?q?=E7=BC=96=E5=86=99extennsion=E4=BE=A7?= =?UTF-8?q?=E7=94=A8=E4=BE=8B,=E6=A0=B9=E6=8D=AE=E6=A3=80=E8=A7=86?= =?UTF-8?q?=E6=84=8F=E8=A7=81=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../joins/ColumnarBroadcastNestedLoopJoinExec.scala | 11 +++-------- .../execution/ColumnarNestedLoopJoinExecSuite.scala | 12 +++--------- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala index 646bb5ca1..6a49edb42 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastNestedLoopJoinExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -180,7 +180,6 @@ case class ColumnarBroadcastNestedLoopJoinExec( } override def doExecuteColumnar(): RDD[ColumnarBatch] = { - // input/output: {col1#10,col2#11,col1#12,col2#13} val numOutputRows = longMetric("numOutputRows") val numOutputVecBatches = longMetric("numOutputVecBatches") val numMergedVecBatches = longMetric("numMergedVecBatches") @@ -208,12 +207,12 @@ case class ColumnarBroadcastNestedLoopJoinExec( throw new UnsupportedOperationException(s"ColumnBroadcastNestedLoopJoin Join-type[$x] is not supported!") } val prunedBuildOutput = pruneOutput(buildOutput, projectExprIdList) - val buildOutputTypes = new Array[DataType](prunedBuildOutput.size) // {2,2}, buildOutput:col1#12,col2#13 + val buildOutputTypes = new Array[DataType](prunedBuildOutput.size) prunedBuildOutput.zipWithIndex.foreach { case (att, i) => buildOutputTypes(i) = OmniExpressionAdaptor.sparkTypeToOmniType(att.dataType, att.metadata) } - val probeTypes = new Array[DataType](streamedOutput.size) // {2,2}, streamedOutput:col1#10,col2#11 + val probeTypes = new Array[DataType](streamedOutput.size) streamedOutput.zipWithIndex.foreach { case (attr, i) => probeTypes(i) = OmniExpressionAdaptor.sparkTypeToOmniType(attr.dataType, attr.metadata) } @@ -258,10 +257,8 @@ case class ColumnarBroadcastNestedLoopJoinExec( buildGetOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startBuildGetOp) (opFactory, op) } - var buildOp: OmniOperator = null var buildOpFactory: OmniNestedLoopJoinBuildOperatorFactory = null - val (opFactory, op) = createBuildOpFactoryAndOp() buildOpFactory = opFactory buildOp = op @@ -308,7 +305,6 @@ case class ColumnarBroadcastNestedLoopJoinExec( val startlookupInput = System.nanoTime() lookupOp.addInput(vecBatch) lookupAddInputTime += NANOSECONDS.toMillis(System.nanoTime() - startlookupInput) - val startLookupGetOp = System.nanoTime() results = lookupOp.getOutput res = results.hasNext @@ -400,5 +396,4 @@ case class ColumnarBroadcastNestedLoopJoinExec( } } } - } diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala index ab92dcbbc..f7472a9b5 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/ColumnarNestedLoopJoinExecSuite.scala @@ -139,17 +139,11 @@ class ColumnarNestedLoopJoinExecSuite extends ColumnarSparkPlanTest { s"ColumnarBroadcastNestedLoopJoinExec not happened, " + s"executedPlan as follows: \n${df.queryExecution.executedPlan}") assert(QueryTest.sameRows(Seq( - Row(null, null, null, null, "yeah", null, null, 4.0), - Row(null, null, null, null, "abc", "", 4, 1.0), + Row(null, null, null, null, " add", null, 1, null), + Row(null, null, null, null, " yeah ", null, null, 4.0), Row(null, null, null, null, "", "Hello", 2, 2.0), - Row(null, null, null, null, " add", null, 1, null) + Row(null, null, null, null, "abc", "", 4, 1.0) ),df.collect()).isEmpty,"the run value is error") -// checkAnswer(df, Seq( -// Row(null, null, null, null, "yeah", null, null, 4.0), -// Row(null, null, null, null, "abc", "", 4, 1.0), -// Row(null, null, null, null, "", "Hello", 2, 2.0), -// Row(null, null, null, null, "add", null, 1, null) -// )) } test("columnar nestedLoopJoin Cross Join is equal to native") { -- Gitee