From 0255c964a8f249fcbbbdc459f6d64a87d971b9c2 Mon Sep 17 00:00:00 2001 From: chenyidao <979136761@qq.com> Date: Tue, 28 Feb 2023 16:16:52 +0800 Subject: [PATCH 1/2] omni adapte spark331 --- .../omniop-spark-extension/java/pom.xml | 10 +- .../vectorized/OmniColumnVector.java | 17 ++ .../boostkit/spark/ColumnarGuardRule.scala | 20 +- .../boostkit/spark/ColumnarPlugin.scala | 63 ++++- .../boostkit/spark/ColumnarPluginConfig.scala | 2 +- .../boostkit/spark/ShuffleJoinStrategy.scala | 2 +- .../expression/OmniExpressionAdaptor.scala | 4 +- .../spark/shuffle/ColumnarShuffleWriter.scala | 7 +- .../sort/OmniColumnarShuffleManager.scala | 5 +- .../ColumnarBasicPhysicalOperators.scala | 21 +- ...ColumnarBroadcastExchangeAdaptorExec.scala | 3 + .../ColumnarBroadcastExchangeExec.scala | 5 +- .../spark/sql/execution/ColumnarExec.scala | 19 +- .../sql/execution/ColumnarExpandExec.scala | 19 ++ .../ColumnarFileSourceScanExec.scala | 103 ++++--- .../execution/ColumnarHashAggregateExec.scala | 42 +-- .../ColumnarShuffleExchangeExec.scala | 39 ++- .../sql/execution/ColumnarSortExec.scala | 3 + .../ColumnarTakeOrderedAndProjectExec.scala | 3 + .../sql/execution/ColumnarWindowExec.scala | 22 +- .../sql/execution/ShuffledColumnarRDD.scala | 59 +++- .../adaptive/AQEPropagateEmptyRelation.scala | 100 +++++++ .../ColumnarCustomShuffleReaderExec.scala | 122 ++++++--- .../EliminateJoinToEmptyRelation.scala | 63 ----- .../PruneFileSourcePartitions.scala | 139 ---------- .../datasources/orc/OmniOrcFileFormat.scala | 15 +- .../execution/datasources/orc/OrcUtils.scala | 256 ------------------ .../joins/ColumnarBroadcastHashJoinExec.scala | 7 +- .../joins/ColumnarShuffledHashJoinExec.scala | 6 +- .../joins/ColumnarSortMergeJoinExec.scala | 6 + .../execution/PruneHiveTablePartitions.scala | 126 --------- omnioperator/omniop-spark-extension/pom.xml | 42 ++- 32 files changed, 595 insertions(+), 755 deletions(-) create mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala delete mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala delete mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala delete mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala delete mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala diff --git a/omnioperator/omniop-spark-extension/java/pom.xml b/omnioperator/omniop-spark-extension/java/pom.xml index caafa313f..3e3175bab 100644 --- a/omnioperator/omniop-spark-extension/java/pom.xml +++ b/omnioperator/omniop-spark-extension/java/pom.xml @@ -7,7 +7,7 @@ com.huawei.kunpeng boostkit-omniop-spark-parent - 3.1.1-1.1.0 + 3.3.1-1.1.0 ../pom.xml @@ -103,20 +103,20 @@ spark-core_${scala.binary.version} test-jar test - 3.1.1 + 3.3.1 org.apache.spark spark-catalyst_${scala.binary.version} test-jar test - 3.1.1 + 3.3.1 org.apache.spark spark-sql_${scala.binary.version} test-jar - 3.1.1 + 3.3.1 test @@ -127,7 +127,7 @@ org.apache.spark spark-hive_${scala.binary.version} - 3.1.1 + 3.3.1 provided diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java b/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java index 808f96e1f..3676d38dc 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java +++ b/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java @@ -354,6 +354,18 @@ public class OmniColumnVector extends WritableColumnVector { } } + @Override + public void putBooleans(int rowId, byte src) { + booleanDataVec.set(rowId, (src & 1) == 1); + booleanDataVec.set(rowId + 1, (src >>> 1 & 1) == 1); + booleanDataVec.set(rowId + 2, (src >>> 2 & 1) == 1); + booleanDataVec.set(rowId + 3, (src >>> 3 & 1) == 1); + booleanDataVec.set(rowId + 4, (src >>> 4 & 1) == 1); + booleanDataVec.set(rowId + 5, (src >>> 5 & 1) == 1); + booleanDataVec.set(rowId + 6, (src >>> 6 & 1) == 1); + booleanDataVec.set(rowId + 7, (src >>> 7 & 1) == 1); + } + @Override public boolean getBoolean(int rowId) { if (dictionaryData != null) { @@ -453,6 +465,11 @@ public class OmniColumnVector extends WritableColumnVector { return UTF8String.fromBytes(getBytes(rowId, count), rowId, count); } + @Override + public ByteBuffer getByteBuffer(int rowId, int count) { + throw new UnsupportedOperationException("getByteBuffer is not supported"); + } + // // APIs dealing with Shorts // diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarGuardRule.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarGuardRule.scala index a4e4eaa0a..46dd4b45a 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarGuardRule.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarGuardRule.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, CustomShuffleReaderExec} +import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, OmniAQEShuffleReadExec} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ @@ -37,6 +37,9 @@ case class RowGuard(child: SparkPlan) extends SparkPlan { } def children: Seq[SparkPlan] = Seq(child) + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = + legacyWithNewChildren(newChildren) } case class ColumnarGuardRule() extends Rule[SparkPlan] { @@ -92,6 +95,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { if (!enableColumnarHashAgg) return false new ColumnarHashAggregateExec( plan.requiredChildDistributionExpressions, + plan.isStreaming, + plan.numShufflePartitions, plan.groupingExpressions, plan.aggregateExpressions, plan.aggregateAttributes, @@ -127,9 +132,9 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { left match { case exec: BroadcastExchangeExec => new ColumnarBroadcastExchangeExec(exec.mode, exec.child) - case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec) => + case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec, _) => new ColumnarBroadcastExchangeExec(plan.mode, plan.child) - case BroadcastQueryStageExec(_, plan: ReusedExchangeExec) => + case BroadcastQueryStageExec(_, plan: ReusedExchangeExec, _) => plan match { case ReusedExchangeExec(_, b: BroadcastExchangeExec) => new ColumnarBroadcastExchangeExec(b.mode, b.child) @@ -141,9 +146,9 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { right match { case exec: BroadcastExchangeExec => new ColumnarBroadcastExchangeExec(exec.mode, exec.child) - case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec) => + case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec, _) => new ColumnarBroadcastExchangeExec(plan.mode, plan.child) - case BroadcastQueryStageExec(_, plan: ReusedExchangeExec) => + case BroadcastQueryStageExec(_, plan: ReusedExchangeExec, _) => plan match { case ReusedExchangeExec(_, b: BroadcastExchangeExec) => new ColumnarBroadcastExchangeExec(b.mode, b.child) @@ -182,7 +187,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { plan.buildSide, plan.condition, plan.left, - plan.right).buildCheck() + plan.right, + plan.isSkewJoin).buildCheck() case plan: BroadcastNestedLoopJoinExec => return false case p => p @@ -237,7 +243,7 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] { case p if !supportCodegen(p) => // insert row guard them recursively p.withNewChildren(p.children.map(insertRowGuardOrNot)) - case p: CustomShuffleReaderExec => + case p: OmniAQEShuffleReadExec => p.withNewChildren(p.children.map(insertRowGuardOrNot)) case p: BroadcastQueryStageExec => p diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala index d3fcbaf53..a94eb5d67 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.DynamicPruningSubquery import org.apache.spark.sql.catalyst.expressions.aggregate.Partial import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowToOmniColumnarExec, _} -import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ColumnarCustomShuffleReaderExec, CustomShuffleReaderExec, QueryStageExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, OmniAQEShuffleReadExec, AQEShuffleReadExec, QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ @@ -247,6 +247,8 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { case _ => new ColumnarHashAggregateExec( plan.requiredChildDistributionExpressions, + plan.isStreaming, + plan.numShufflePartitions, plan.groupingExpressions, plan.aggregateExpressions, plan.aggregateAttributes, @@ -257,6 +259,8 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { } else { new ColumnarHashAggregateExec( plan.requiredChildDistributionExpressions, + plan.isStreaming, + plan.numShufflePartitions, plan.groupingExpressions, plan.aggregateExpressions, plan.aggregateAttributes, @@ -267,6 +271,8 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { } else { new ColumnarHashAggregateExec( plan.requiredChildDistributionExpressions, + plan.isStreaming, + plan.numShufflePartitions, plan.groupingExpressions, plan.aggregateExpressions, plan.aggregateAttributes, @@ -311,7 +317,8 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { plan.buildSide, plan.condition, left, - right) + right, + plan.isSkewJoin) case plan: SortMergeJoinExec if enableColumnarSortMergeJoin => logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") val left = replaceWithColumnarPlan(plan.left) @@ -341,19 +348,19 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { val child = replaceWithColumnarPlan(plan.child) logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") new ColumnarShuffleExchangeExec(plan.outputPartitioning, child, plan.shuffleOrigin) - case plan: CustomShuffleReaderExec if columnarConf.enableColumnarShuffle => + case plan: AQEShuffleReadExec if columnarConf.enableColumnarShuffle => plan.child match { case shuffle: ColumnarShuffleExchangeExec => logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs) - case ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec) => + OmniAQEShuffleReadExec(plan.child, plan.partitionSpecs) + case ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _) => logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs) - case ShuffleQueryStageExec(_, reused: ReusedExchangeExec) => + OmniAQEShuffleReadExec(plan.child, plan.partitionSpecs) + case ShuffleQueryStageExec(_, reused: ReusedExchangeExec, _) => reused match { case ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec) => logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - ColumnarCustomShuffleReaderExec( + OmniAQEShuffleReadExec( plan.child, plan.partitionSpecs) case _ => @@ -375,13 +382,15 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { curPlan.id, BroadcastExchangeExec( originalBroadcastPlan.mode, - ColumnarBroadcastExchangeAdaptorExec(originalBroadcastPlan, 1))) + ColumnarBroadcastExchangeAdaptorExec(originalBroadcastPlan, 1)), + curPlan._canonicalized) case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeExec) => BroadcastQueryStageExec( curPlan.id, BroadcastExchangeExec( originalBroadcastPlan.mode, - ColumnarBroadcastExchangeAdaptorExec(curPlan.plan, 1))) + ColumnarBroadcastExchangeAdaptorExec(curPlan.plan, 1)), + curPlan._canonicalized) case _ => curPlan } @@ -409,11 +418,26 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { case ColumnarToRowExec(child: ColumnarBroadcastExchangeExec) => replaceWithColumnarPlan(child) case plan: ColumnarToRowExec => - val child = replaceWithColumnarPlan(plan.child) - if (conf.getConfString("spark.omni.sql.columnar.columnarToRow", "true").toBoolean) { - OmniColumnarToRowExec(child) - } else { - ColumnarToRowExec(child) + plan.child match { + case child: BroadcastQueryStageExec => + child.plan match { + case originalBroadcastPlan: ColumnarBroadcastExchangeExec => + BroadcastQueryStageExec( + child.id, + BroadcastExchangeExec( + originalBroadcastPlan.mode, + ColumnarBroadcastExchangeAdaptorExec(originalBroadcastPlan, 1)), child._canonicalized) + case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeExec) => + BroadcastQueryStageExec( + child.id, + BroadcastExchangeExec( + originalBroadcastPlan.mode, + ColumnarBroadcastExchangeAdaptorExec(child.plan, 1)), child._canonicalized) + case _ => + replaceColumnarToRow(plan, conf) + } + case _ => + replaceColumnarToRow(plan, conf) } case r: SparkPlan if !r.isInstanceOf[QueryStageExec] && !r.supportsColumnar && r.children.exists(c => @@ -430,6 +454,15 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { val children = p.children.map(replaceWithColumnarPlan) p.withNewChildren(children) } + + def replaceColumnarToRow(plan: ColumnarToRowExec, conf: SQLConf) : SparkPlan = { + val child = replaceWithColumnarPlan(plan.child) + if (conf.getConfString("spark.omni.sql.columnar.columnarToRow", "true").toBoolean) { + OmniColumnarToRowExec(child) + } else { + ColumnarToRowExec(child) + } + } } case class ColumnarOverrideRules(session: SparkSession) extends ColumnarRule with Logging { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index 29776a07a..a698c8108 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -153,7 +153,7 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { .toBoolean val enableFusion: Boolean = conf - .getConfString("spark.omni.sql.columnar.fusion", "true") + .getConfString("spark.omni.sql.columnar.fusion", "false") .toBoolean // Pick columnar shuffle hash join if one side join count > = 0 to build local hash map, and is diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ShuffleJoinStrategy.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ShuffleJoinStrategy.scala index 2071420c9..6b065552c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ShuffleJoinStrategy.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ShuffleJoinStrategy.scala @@ -37,7 +37,7 @@ object ShuffleJoinStrategy extends Strategy ColumnarPluginConfig.getConf.columnarPreferShuffledHashJoinCBO def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) + case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, _, left, right, hint) if columnarPreferShuffledHashJoin => val enable = getBroadcastBuildSide(left, right, joinType, hint, true, conf).isEmpty && !hintToSortMergeJoin(hint) && diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index da1a5b747..c4307082a 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -668,9 +668,9 @@ object OmniExpressionAdaptor extends Logging { def toOmniAggFunType(agg: AggregateExpression, isHashAgg: Boolean = false, isFinal: Boolean = false): FunctionType = { agg.aggregateFunction match { - case Sum(_) => OMNI_AGGREGATION_TYPE_SUM + case Sum(_, _) => OMNI_AGGREGATION_TYPE_SUM case Max(_) => OMNI_AGGREGATION_TYPE_MAX - case Average(_) => OMNI_AGGREGATION_TYPE_AVG + case Average(_, _) => OMNI_AGGREGATION_TYPE_AVG case Min(_) => OMNI_AGGREGATION_TYPE_MIN case Count(Literal(1, IntegerType) :: Nil) | Count(ArrayBuffer(Literal(1, IntegerType))) => if (isFinal) { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 7eca3427e..615ddb6b7 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -71,7 +71,7 @@ class ColumnarShuffleWriter[K, V]( override def write(records: Iterator[Product2[K, V]]): Unit = { if (!records.hasNext) { partitionLengths = new Array[Long](dep.partitioner.numPartitions) - shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, null) + shuffleBlockResolver.writeMetadataFileAndCommit(dep.shuffleId, mapId, partitionLengths, Array[Long](), null) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) return } @@ -107,7 +107,7 @@ class ColumnarShuffleWriter[K, V]( jniWrapper.split(nativeSplitter, vb.getNativeVectorBatch) dep.splitTime.add(System.nanoTime() - startTime) dep.numInputRows.add(cb.numRows) - writeMetrics.incRecordsWritten(1) + writeMetrics.incRecordsWritten(cb.numRows) } } val startTime = System.nanoTime() @@ -122,10 +122,11 @@ class ColumnarShuffleWriter[K, V]( partitionLengths = splitResult.getPartitionLengths try { - shuffleBlockResolver.writeIndexFileAndCommit( + shuffleBlockResolver.writeMetadataFileAndCommit( dep.shuffleId, mapId, partitionLengths, + Array[Long](), dataTmp) } finally { if (dataTmp.exists() && !dataTmp.delete()) { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/sort/OmniColumnarShuffleManager.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/sort/OmniColumnarShuffleManager.scala index e7c66ee72..28427bba2 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/sort/OmniColumnarShuffleManager.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/sort/OmniColumnarShuffleManager.scala @@ -99,7 +99,7 @@ class OmniColumnarShuffleManager(conf: SparkConf) extends ColumnarShuffleManager env.conf, metrics, shuffleExecutorComponents) - case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K@unchecked, V@unchecked] => + case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, bypassMergeSortHandle, @@ -107,9 +107,8 @@ class OmniColumnarShuffleManager(conf: SparkConf) extends ColumnarShuffleManager env.conf, metrics, shuffleExecutorComponents) - case other: BaseShuffleHandle[K@unchecked, V@unchecked, _] => + case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter( - shuffleBlockResolver, other, mapId, context, diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala index cb23b68f0..86ac4fb1c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS + import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil @@ -101,6 +102,9 @@ case class ColumnarProjectExec(projectList: Seq[NamedExpression], child: SparkPl |${ExplainUtils.generateFieldString("Input", child.output)} |""".stripMargin } + + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarProjectExec = + copy(child = newChild) } case class ColumnarFilterExec(condition: Expression, child: SparkPlan) @@ -109,6 +113,10 @@ case class ColumnarFilterExec(condition: Expression, child: SparkPlan) override def supportsColumnar: Boolean = true override def nodeName: String = "OmniColumnarFilter" + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarFilterExec = { + copy(this.condition, newChild) + } + // Split out all the IsNotNulls from condition. private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition { case IsNotNull(a) => isNullIntolerant(a) && a.references.subsetOf(child.outputSet) @@ -116,7 +124,7 @@ case class ColumnarFilterExec(condition: Expression, child: SparkPlan) } // If one expression and its children are null intolerant, it is null intolerant. - private def isNullIntolerant(expr: Expression): Boolean = expr match { + override def isNullIntolerant(expr: Expression): Boolean = expr match { case e: NullIntolerant => e.children.forall(isNullIntolerant) case _ => false } @@ -267,6 +275,9 @@ case class ColumnarConditionProjectExec(projectList: Seq[NamedExpression], override def output: Seq[Attribute] = projectList.map(_.toAttribute) + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarConditionProjectExec = + copy(child = newChild) + override lazy val metrics = Map( "addInputTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in omni addInput"), "numInputVecBatchs" -> SQLMetrics.createMetric(sparkContext, "number of input vecBatchs"), @@ -383,7 +394,7 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan { children.map(_.output).transpose.map { attrs => val firstAttr = attrs.head val nullable = attrs.exists(_.nullable) - val newDt = attrs.map(_.dataType).reduce(StructType.merge) + val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge) if (firstAttr.dataType == newDt) { firstAttr.withNullability(nullable) } else { @@ -393,6 +404,10 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan { } } + override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = { + copy(children = newChildren) + } + def buildCheck(): Unit = { val inputTypes = new Array[DataType](output.size) output.zipWithIndex.foreach { @@ -420,7 +435,7 @@ class ColumnarRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputRows = longMetric("numOutputRows") - sqlContext + session.sqlContext .sparkContext .parallelize(0 until numSlices, numSlices) .mapPartitionsWithIndex { (i, _) => diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala index d137388ab..1d236c16d 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala @@ -64,4 +64,7 @@ case class ColumnarBroadcastExchangeAdaptorExec(child: SparkPlan, numPartitions: "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"), "processTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_datatoarrowcolumnar")) + + override protected def withNewChildInternal(newChild: SparkPlan): + ColumnarBroadcastExchangeAdaptorExec = copy(child = newChild) } \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 72d1aae05..8a29e0d2b 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -65,7 +65,7 @@ class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) @transient override lazy val relationFuture: Future[broadcast.Broadcast[Any]] = { SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]]( - sqlContext.sparkSession, ColumnarBroadcastExchangeExec.executionContext) { + session.sqlContext.sparkSession, ColumnarBroadcastExchangeExec.executionContext) { try { // Setup a job group here so later it may get cancelled by groupId if necessary. sparkContext.setJobGroup(runId.toString, s"broadcast exchange (runId $runId)", @@ -159,6 +159,9 @@ class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) } } + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarBroadcastExchangeExec = + new ColumnarBroadcastExchangeExec(this.mode, newChild) + override protected def doPrepare(): Unit = { // Materialize the future. relationFuture diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala index b1fd51f48..d6ff2b40a 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala @@ -31,8 +31,9 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.util.SparkMemoryUtils import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OmniColumnVector, WritableColumnVector} -import org.apache.spark.sql.types.{BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DecimalType, DoubleType, IntegerType, LongType, ShortType, StringType, StructType, TimestampType} +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DecimalType, DoubleType, IntegerType, LongType, ShortType, StringType, StructType, TimestampType} import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils import nova.hetu.omniruntime.vector.Vec @@ -101,6 +102,7 @@ private object RowToColumnConverter { private def getConverterForType(dataType: DataType, nullable: Boolean): TypeConverter = { val core = dataType match { + case BinaryType => BinaryConverter case BooleanType => BooleanConverter case ByteType => ByteConverter case ShortType => ShortConverter @@ -123,6 +125,13 @@ private object RowToColumnConverter { } } + private object BinaryConverter extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + val bytes = row.getBinary(column) + cv.appendByteArray(bytes, 0, bytes.length) + } + } + private object BooleanConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = cv.appendBoolean(row.getBoolean(column)) @@ -232,8 +241,11 @@ case class RowToOmniColumnarExec(child: SparkPlan) extends RowToColumnarTransiti "rowToOmniColumnarTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in row to OmniColumnar") ) + override protected def withNewChildInternal(newChild: SparkPlan): RowToOmniColumnarExec = + copy(child = newChild) + override def doExecuteColumnar(): RDD[ColumnarBatch] = { - val enableOffHeapColumnVector = sqlContext.conf.offHeapColumnVectorEnabled + val enableOffHeapColumnVector = session.sqlContext.conf.offHeapColumnVectorEnabled val numInputRows = longMetric("numInputRows") val numOutputBatches = longMetric("numOutputBatches") val rowToOmniColumnarTime = longMetric("rowToOmniColumnarTime") @@ -313,6 +325,9 @@ case class OmniColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransiti ColumnarBatchToInternalRow.convert(localOutput, batches, numOutputRows, numInputBatches, omniColumnarToRowTime) } } + + override protected def withNewChildInternal(newChild: SparkPlan): + OmniColumnarToRowExec = copy(child = newChild) } object ColumnarBatchToInternalRow { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala index 27b05b16c..b25d97d60 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.sql.execution import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP @@ -161,4 +178,6 @@ case class ColumnarExpandExec( throw new UnsupportedOperationException(s"ColumnarExpandExec operator doesn't support doExecute().") } + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarExpandExec = + copy(child = newChild) } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala index 73091d069..90594d3eb 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.optimizer.BuildLeft import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.{OmniOrcFileFormat, OrcFileFormat} @@ -54,6 +55,7 @@ import org.apache.spark.sql.execution.joins.ColumnarBroadcastHashJoinExec import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.util.SparkMemoryUtils import org.apache.spark.sql.execution.util.SparkMemoryUtils.addLeakSafeTaskCompletionListener +import org.apache.spark.sql.execution.vectorized.ConstantColumnVector import org.apache.spark.sql.execution.vectorized.OmniColumnVector import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DecimalType, StructType} @@ -74,13 +76,19 @@ abstract class BaseColumnarFileSourceScanExec( disableBucketedScan: Boolean = false) extends DataSourceScanExec { + lazy val metadataColumns: Seq[AttributeReference] = + output.collect { case FileSourceMetadataAttribute(attr) => attr } + override lazy val supportsColumnar: Boolean = true override def vectorTypes: Option[Seq[String]] = relation.fileFormat.vectorTypes( requiredSchema = requiredSchema, partitionSchema = relation.partitionSchema, - relation.sparkSession.sessionState.conf) + relation.sparkSession.sessionState.conf).map { vectorTypes => + // for column-based file format, append metadata column's vector type classes if any + vectorTypes ++ Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName) + } private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty @@ -96,7 +104,7 @@ abstract class BaseColumnarFileSourceScanExec( } private def isDynamicPruningFilter(e: Expression): Boolean = - e.find(_.isInstanceOf[PlanExpression[_]]).isDefined + e.exists(_.isInstanceOf[PlanExpression[_]]) @transient lazy val selectedPartitions: Array[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) @@ -223,7 +231,13 @@ abstract class BaseColumnarFileSourceScanExec( @transient private lazy val pushedDownFilters = { val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) - dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) + // `dataFilters` should not include any metadata col filters + // because the metadata struct has been flatted in FileSourceStrategy + // and thus metadata col filters are invalid to be pushed down + dataFilters.filterNot(_.references.exists { + case FileSourceMetadataAttribute(_) => true + case _ => false + }).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) } override protected def metadata: Map[String, String] = { @@ -242,21 +256,26 @@ abstract class BaseColumnarFileSourceScanExec( "DataFilters" -> seqToString(dataFilters), "Location" -> locationDesc) - // (SPARK-32986): Add bucketed scan info in explain output of FileSourceScanExec - if (bucketedScan) { - relation.bucketSpec.map { spec => + relation.bucketSpec.map { spec => + val bucketedKey = "Bucketed" + if (bucketedScan) { val numSelectedBuckets = optionalBucketSet.map { b => b.cardinality() } getOrElse { spec.numBuckets } - metadata + ("SelectedBucketsCount" -> - (s"$numSelectedBuckets out of ${spec.numBuckets}" + + metadata ++ Map( + bucketedKey -> "true", + "SelectedBucketsCount" -> (s"$numSelectedBuckets out of ${spec.numBuckets}" + optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)" }.getOrElse(""))) - } getOrElse { - metadata + } else if (!relation.sparkSession.sessionState.conf.bucketingEnabled) { + metadata + (bucketedKey -> "false (disabled by configuration)") + } else if (disableBucketedScan) { + metadata + (bucketedKey -> "false (disabled by query planner)") + } else { + metadata + (bucketedKey -> "false (bucket column(s) not read)") } - } else { + } getOrElse { metadata } } @@ -312,7 +331,7 @@ abstract class BaseColumnarFileSourceScanExec( createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions, relation) } else { - createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation) + createReadRDD(readFile, dynamicallySelectedPartitions, relation) } sendDriverMetrics() readRDD @@ -343,7 +362,7 @@ abstract class BaseColumnarFileSourceScanExec( driverMetrics("staticFilesNum") = filesNum driverMetrics("staticFilesSize") = filesSize } - if (relation.partitionSchemaOption.isDefined) { + if (relation.partitionSchema.nonEmpty) { driverMetrics("numPartitions") = partitions.length } } @@ -363,7 +382,7 @@ abstract class BaseColumnarFileSourceScanExec( None } } ++ { - if (relation.partitionSchemaOption.isDefined) { + if (relation.partitionSchema.nonEmpty) { Map( "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"), "pruningTime" -> @@ -423,7 +442,7 @@ abstract class BaseColumnarFileSourceScanExec( /** * Create an RDD for bucketed reads. - * The non-bucketed variant of this function is [[createNonBucketedReadRDD]]. + * The non-bucketed variant of this function is [[createReadRDD]]. * * The algorithm is pretty simple: each RDD partition being returned should include all the files * with the same bucket id from all the given Hive partitions. @@ -447,10 +466,9 @@ abstract class BaseColumnarFileSourceScanExec( }.groupBy { f => BucketingUtils .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath)) } - // (SPARK-32985): Decouple bucket filter pruning and bucketed table scan val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { val bucketSet = optionalBucketSet.get filesGroupedToBuckets.filter { @@ -475,7 +493,8 @@ abstract class BaseColumnarFileSourceScanExec( } } - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions, + new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns) } /** @@ -486,7 +505,7 @@ abstract class BaseColumnarFileSourceScanExec( * @param selectedPartitions Hive-style partition that are part of the read. * @param fsRelation [[HadoopFsRelation]] associated with the read. */ - private def createNonBucketedReadRDD( + private def createReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { @@ -496,27 +515,43 @@ abstract class BaseColumnarFileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") + // Filter files with bucket pruning if possible + val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled + val shouldProcess: Path => Boolean = optionalBucketSet match { + case Some(bucketSet) if bucketingEnabled => + // Do not prune the file if bucket file name is invalid + filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get) + case _ => + _ => true + } + val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => // getPath() is very expensive so we only want to call it once in this block: val filePath = file.getPath - val isSplitable = relation.fileFormat.isSplitable( - relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( - sparkSession = relation.sparkSession, - file = file, - filePath = filePath, - isSplitable = isSplitable, - maxSplitBytes = maxSplitBytes, - partitionValues = partition.values - ) + + if (shouldProcess(filePath)) { + val isSplitable = relation.fileFormat.isSplitable( + relation.sparkSession, relation.options, filePath) + PartitionedFileUtil.splitFiles( + sparkSession = relation.sparkSession, + file = file, + filePath = filePath, + isSplitable = isSplitable, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) + } else { + Seq.empty + } } }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - new FileScanRDD(fsRelation.sparkSession, readFile, partitions) + new FileScanRDD(fsRelation.sparkSession, readFile, partitions, + new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns) } // Filters unused DynamicPruningExpression expressions - one which has been replaced @@ -551,7 +586,7 @@ abstract class BaseColumnarFileSourceScanExec( throw new UnsupportedOperationException(s"Unsupported final aggregate expression in operator fusion, exp: $exp") } else if (exp.mode == Partial) { exp.aggregateFunction match { - case Sum(_) | Min(_) | Average(_) | Max(_) | Count(_) | First(_, _) => + case Sum(_, _) | Min(_) | Average(_, _) | Max(_) | Count(_) | First(_, _) => val aggExp = exp.aggregateFunction.children.head omniOutputExressionOrder += { exp.aggregateFunction.inputAggBufferAttributes.head.exprId -> @@ -569,7 +604,7 @@ abstract class BaseColumnarFileSourceScanExec( } } else if (exp.mode == PartialMerge) { exp.aggregateFunction match { - case Sum(_) | Min(_) | Average(_) | Max(_) | Count(_) | First(_, _) => + case Sum(_, _) | Min(_) | Average(_, _) | Max(_) | Count(_) | First(_, _) => val aggExp = exp.aggregateFunction.children.head omniOutputExressionOrder += { exp.aggregateFunction.inputAggBufferAttributes.head.exprId -> @@ -815,7 +850,7 @@ case class ColumnarMultipleOperatorExec( None } } ++ { - if (relation.partitionSchemaOption.isDefined) { + if (relation.partitionSchema.nonEmpty) { Map( "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"), "pruningTime" -> @@ -1162,7 +1197,7 @@ case class ColumnarMultipleOperatorExec1( None } } ++ { - if (relation.partitionSchemaOption.isDefined) { + if (relation.partitionSchema.nonEmpty) { Map( "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"), "pruningTime" -> diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala index e2618842a..278bbdb55 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit.NANOSECONDS + import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil @@ -32,8 +33,9 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.execution.ColumnarProjection.dealPartitionData -import org.apache.spark.sql.execution.aggregate.BaseAggregateExec +import org.apache.spark.sql.execution.aggregate.{AggregateCodegenSupport, BaseAggregateExec} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.util.SparkMemoryUtils import org.apache.spark.sql.execution.vectorized.OmniColumnVector @@ -45,14 +47,18 @@ import org.apache.spark.sql.vectorized.ColumnarBatch */ case class ColumnarHashAggregateExec( requiredChildDistributionExpressions: Option[Seq[Expression]], + isStreaming: Boolean, + numShufflePartitions: Option[Int], groupingExpressions: Seq[NamedExpression], aggregateExpressions: Seq[AggregateExpression], aggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends BaseAggregateExec - with AliasAwareOutputPartitioning { + extends AggregateCodegenSupport { + + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarHashAggregateExec = + copy(child = newChild) override def verboseStringWithOperatorId(): String = { s""" @@ -77,6 +83,15 @@ case class ColumnarHashAggregateExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numOutputVecBatchs" -> SQLMetrics.createMetric(sparkContext, "number of output vecBatchs")) + protected override def needHashTable: Boolean = true + + protected override def doConsumeWithKeys(ctx: CodegenContext, input: Seq[ExprCode]): String = { + throw new UnsupportedOperationException("ColumnarHashAgg code-gen does not support grouping keys") + } + + protected override def doProduceWithKeys(ctx: CodegenContext): String = { + throw new UnsupportedOperationException("ColumnarHashAgg code-gen does not support grouping keys") + } override def supportsColumnar: Boolean = true @@ -99,7 +114,7 @@ case class ColumnarHashAggregateExec( } if (exp.mode == Final) { exp.aggregateFunction match { - case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) => + case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_,_) => omniAggFunctionTypes(index) = toOmniAggFunType(exp, true, true) omniAggOutputTypes(index) = toOmniAggInOutType(exp.aggregateFunction.dataType) omniAggChannels(index) = @@ -110,7 +125,7 @@ case class ColumnarHashAggregateExec( } } else if (exp.mode == PartialMerge) { exp.aggregateFunction match { - case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) => + case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_,_) => omniAggFunctionTypes(index) = toOmniAggFunType(exp, true) omniAggOutputTypes(index) = toOmniAggInOutType(exp.aggregateFunction.inputAggBufferAttributes) @@ -125,7 +140,7 @@ case class ColumnarHashAggregateExec( } } else if (exp.mode == Partial) { exp.aggregateFunction match { - case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) => + case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_,_) => omniAggFunctionTypes(index) = toOmniAggFunType(exp, true) omniAggOutputTypes(index) = toOmniAggInOutType(exp.aggregateFunction.inputAggBufferAttributes) @@ -150,7 +165,7 @@ case class ColumnarHashAggregateExec( omniSourceTypes(i) = sparkTypeToOmniType(attr.dataType, attr.metadata) } - for (aggChannel <-omniAggChannels) { + for (aggChannel <- omniAggChannels) { if (!isSimpleColumnForAll(aggChannel)) { checkOmniJsonWhiteList("", aggChannel.toArray) } @@ -202,7 +217,7 @@ case class ColumnarHashAggregateExec( } if (exp.mode == Final) { exp.aggregateFunction match { - case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) => + case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_, _) => omniAggFunctionTypes(index) = toOmniAggFunType(exp, true, true) omniAggOutputTypes(index) = toOmniAggInOutType(exp.aggregateFunction.dataType) @@ -214,7 +229,7 @@ case class ColumnarHashAggregateExec( } } else if (exp.mode == PartialMerge) { exp.aggregateFunction match { - case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) => + case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_, _) => omniAggFunctionTypes(index) = toOmniAggFunType(exp, true) omniAggOutputTypes(index) = toOmniAggInOutType(exp.aggregateFunction.inputAggBufferAttributes) @@ -229,7 +244,7 @@ case class ColumnarHashAggregateExec( } } else if (exp.mode == Partial) { exp.aggregateFunction match { - case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) => + case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_, _) => omniAggFunctionTypes(index) = toOmniAggFunType(exp, true) omniAggOutputTypes(index) = toOmniAggInOutType(exp.aggregateFunction.inputAggBufferAttributes) @@ -338,10 +353,3 @@ case class ColumnarHashAggregateExec( throw new UnsupportedOperationException("This operator doesn't support doExecute().") } } - -object ColumnarHashAggregateExec { - def supportsAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean = { - val aggregationBufferSchema = StructType.fromAttributes(aggregateBufferAttributes) - UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(aggregationBufferSchema) - } -} diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index cea0a1438..fc662128e 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.execution import com.huawei.boostkit.spark.ColumnarPluginConfig - -import java.util.Random import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import scala.collection.JavaConverters._ @@ -41,8 +39,9 @@ import org.apache.spark.shuffle.ColumnarShuffleDependency import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering +import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin} +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.createShuffleWriteProcessor import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleWriteMetricsReporter} @@ -53,16 +52,17 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.MutablePair +import org.apache.spark.util.random.XORShiftRandom -class ColumnarShuffleExchangeExec( - override val outputPartitioning: Partitioning, - child: SparkPlan, - shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS) - extends ShuffleExchangeExec(outputPartitioning, child, shuffleOrigin) with ShuffleExchangeLike{ +case class ColumnarShuffleExchangeExec( + override val outputPartitioning: Partitioning, + child: SparkPlan, + shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS) + extends ShuffleExchangeLike { private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) - override lazy val readMetrics = + private[sql] lazy val readMetrics = SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), @@ -100,9 +100,19 @@ class ColumnarShuffleExchangeExec( override def numPartitions: Int = columnarShuffleDependency.partitioner.numPartitions + override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[ColumnarBatch] = { + new ShuffledColumnarRDD(columnarShuffleDependency, readMetrics, partitionSpecs) + } + + override def runtimeStatistics: Statistics = { + val dataSize = metrics("dataSize").value + val rowCount = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN).value + Statistics(dataSize, Some(rowCount)) + } + @transient lazy val columnarShuffleDependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { - ColumnarShuffleExchangeExec.prepareShuffleDependency( + val dep = ColumnarShuffleExchangeExec.prepareShuffleDependency( inputColumnarRDD, child.output, outputPartitioning, @@ -113,8 +123,8 @@ class ColumnarShuffleExchangeExec( longMetric("numInputRows"), longMetric("splitTime"), longMetric("spillTime")) + dep } - var cachedShuffleRDD: ShuffledColumnarRDD = _ override def doExecute(): RDD[InternalRow] = { @@ -155,6 +165,8 @@ class ColumnarShuffleExchangeExec( cachedShuffleRDD } } + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeExec = + copy(child = newChild) } object ColumnarShuffleExchangeExec extends Logging { @@ -229,7 +241,8 @@ object ColumnarShuffleExchangeExec extends Logging { (columnarBatch: ColumnarBatch, numPartitions: Int) => { val pidArr = new Array[Int](columnarBatch.numRows()) for (i <- 0 until columnarBatch.numRows()) { - val position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions) + val partitionId = TaskContext.get().partitionId() + val position = new XORShiftRandom(partitionId).nextInt(numPartitions) pidArr(i) = position + 1 } val vec = new IntVec(columnarBatch.numRows()) @@ -324,6 +337,7 @@ object ColumnarShuffleExchangeExec extends Logging { rdd.mapPartitionsWithIndexInternal((_, cbIter) => { cbIter.map { cb => (0, cb) } }, isOrderSensitive = isOrderSensitive) + case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") } val numCols = outputAttributes.size @@ -341,6 +355,7 @@ object ColumnarShuffleExchangeExec extends Logging { new PartitionInfo("hash", numPartitions, numCols, intputTypes) case RangePartitioning(ordering, numPartitions) => new PartitionInfo("range", numPartitions, numCols, intputTypes) + case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") } new ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch]( diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala index 7c7001dbc..49f245111 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala @@ -56,6 +56,9 @@ case class ColumnarSortExec( override def outputPartitioning: Partitioning = child.outputPartitioning + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarSortExec = + copy(child = newChild) + override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala index 6fec9f9a0..92efd4d53 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala @@ -49,6 +49,9 @@ case class ColumnarTakeOrderedAndProjectExec( override def nodeName: String = "OmniColumnarTakeOrderedAndProject" + override protected def withNewChildInternal(newChild: SparkPlan): + ColumnarTakeOrderedAndProjectExec = copy(child = newChild) + val serializer: Serializer = new ColumnarBatchSerializer( longMetric("avgReadBatchNumRows"), longMetric("numOutputRows")) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala index e5534d3c6..63414c781 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala @@ -50,6 +50,9 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], override def supportsColumnar: Boolean = true + override protected def withNewChildInternal(newChild: SparkPlan): ColumnarWindowExec = + copy(child = newChild) + override lazy val metrics = Map( "addInputTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in omni addInput"), "numInputVecBatchs" -> SQLMetrics.createMetric(sparkContext, "number of input vecBatchs"), @@ -59,25 +62,6 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "numOutputVecBatchs" -> SQLMetrics.createMetric(sparkContext, "number of output vecBatchs")) - override def output: Seq[Attribute] = - child.output ++ windowExpression.map(_.toAttribute) - - override def requiredChildDistribution: Seq[Distribution] = { - if (partitionSpec.isEmpty) { - // Only show warning when the number of bytes is larger than 100 MiB? - logWarning("No Partition Defined for Window operation! Moving all data to a single " - + "partition, this can cause serious performance degradation.") - AllTuples :: Nil - } else ClusteredDistribution(partitionSpec) :: Nil - } - - override def requiredChildOrdering: Seq[Seq[SortOrder]] = - Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec) - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - override def outputPartitioning: Partitioning = child.outputPartitioning - override protected def doExecute(): RDD[InternalRow] = { throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarRDD.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarRDD.scala index 1e728239b..7f664121b 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarRDD.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarRDD.scala @@ -24,6 +24,43 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsRe import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch +sealed trait ShufflePartitionSpec + +// A partition that reads data of one or more reducers, from `startReducerIndex` (inclusive) to +// `endReducerIndex` (exclusive). +case class CoalescedPartitionSpec( + startReducerIndex: Int, + endReducerIndex: Int, + @transient dataSize: Option[Long] = None) extends ShufflePartitionSpec + +object CoalescedPartitionSpec { + def apply(startReducerIndex: Int, + endReducerIndex: Int, + dataSize: Long): CoalescedPartitionSpec = { + CoalescedPartitionSpec(startReducerIndex, endReducerIndex, Some(dataSize)) + } +} + +// A partition that reads partial data of one reducer, from `startMapIndex` (inclusive) to +// `endMapIndex` (exclusive). +case class PartialReducerPartitionSpec( + reducerIndex: Int, + startMapIndex: Int, + endMapIndex: Int, + @transient dataSize: Long) extends ShufflePartitionSpec + +// A partition that reads partial data of one mapper, from `startReducerIndex` (inclusive) to +// `endReducerIndex` (exclusive). +case class PartialMapperPartitionSpec( + mapIndex: Int, + startReducerIndex: Int, + endReducerIndex: Int) extends ShufflePartitionSpec + +case class CoalescedMapperPartitionSpec( + startMapIndex: Int, + endMapIndex: Int, + numReducers: Int) extends ShufflePartitionSpec + /** * The [[Partition]] used by [[ShuffledRowRDD]]. */ @@ -70,7 +107,7 @@ class ShuffledColumnarRDD( override def getPreferredLocations(partition: Partition): Seq[String] = { val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] partition.asInstanceOf[ShuffledColumnarRDDPartition].spec match { - case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => + case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) => startReducerIndex.until(endReducerIndex).flatMap { reducerIndex => tracker.getPreferredLocationsForShuffle(dependency, reducerIndex) } @@ -80,6 +117,9 @@ class ShuffledColumnarRDD( case PartialMapperPartitionSpec(mapIndex, _, _) => tracker.getMapLocation(dependency, mapIndex, mapIndex + 1) + + case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) => + tracker.getMapLocation(dependency, startMapIndex, endMapIndex) } } @@ -89,7 +129,7 @@ class ShuffledColumnarRDD( // as well as the `tempMetrics` for basic shuffle metrics. val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) val reader = split.asInstanceOf[ShuffledColumnarRDDPartition].spec match { - case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => + case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) => SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, startReducerIndex, @@ -116,7 +156,22 @@ class ShuffledColumnarRDD( endReducerIndex, context, sqlMetricsReporter) + + case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) => + SparkEnv.get.shuffleManager.getReader( + dependency.shuffleHandle, + startMapIndex, + endMapIndex, + 0, + numReducers, + context, + sqlMetricsReporter) } reader.read().asInstanceOf[Iterator[Product2[Int, ColumnarBatch]]].map(_._2) } + + override def clearDependencies(): Unit = { + super.clearDependencies() + dependency = null + } } \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala new file mode 100644 index 000000000..004296200 --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL} +import org.apache.spark.sql.execution.ColumnarHashedRelation +import org.apache.spark.sql.execution.aggregate.BaseAggregateExec +import org.apache.spark.sql.execution.exchange.{REPARTITION_BY_COL, REPARTITION_BY_NUM, ShuffleExchangeLike} +import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys + +/** + * This rule runs in the AQE optimizer and optimizes more cases + * compared to [[PropagateEmptyRelationBase]]: + * 1. Join is single column NULL-aware anti join (NAAJ) + * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. Eliminate join to an + * empty [[LocalRelation]]. + */ +object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { + override protected def isEmpty(plan: LogicalPlan): Boolean = + super.isEmpty(plan) || (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0)) + + override protected def nonEmpty(plan: LogicalPlan): Boolean = + super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0) + + private def isRootRepartition(plan: LogicalPlan): Boolean = plan match { + case l: LogicalQueryStage if l.getTagValue(ROOT_REPARTITION).isDefined => true + case _ => false + } + + // The returned value follows: + // - 0 means the plan must produce 0 row + // - positive value means an estimated row count which can be over-estimated + // - none means the plan has not materialized or the plan can not be estimated + private def getEstimatedRowCount(plan: LogicalPlan): Option[BigInt] = plan match { + case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized => + stage.getRuntimeStatistics.rowCount + + case LogicalQueryStage(_, agg: BaseAggregateExec) if agg.groupingExpressions.nonEmpty && + agg.child.isInstanceOf[QueryStageExec] => + val stage = agg.child.asInstanceOf[QueryStageExec] + if (stage.isMaterialized) { + stage.getRuntimeStatistics.rowCount + } else { + None + } + + case _ => None + } + + private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan match { + case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.isMaterialized => + if (stage.broadcast.supportsColumnar) { + val colRelation = stage.broadcast.relationFuture.get().value.asInstanceOf[ColumnarHashedRelation] + colRelation.relation == HashedRelationWithAllNullKeys + } else { + stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys + } + case _ => false + } + + private def eliminateSingleColumnNullAwareAntiJoin: PartialFunction[LogicalPlan, LogicalPlan] = { + case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys(j.right) => + empty(j) + } + + override protected def userSpecifiedRepartition(p: LogicalPlan): Boolean = p match { + case LogicalQueryStage(_, ShuffleQueryStageExec(_, shuffle: ShuffleExchangeLike, _)) + if shuffle.shuffleOrigin == REPARTITION_BY_COL || + shuffle.shuffleOrigin == REPARTITION_BY_NUM => true + case _ => false + } + + override protected def applyInternal(p: LogicalPlan): LogicalPlan = p.transformUpWithPruning( + // LOCAL_RELATION and TRUE_OR_FALSE_LITERAL pattern are matched at + // `PropagateEmptyRelationBase.commonApplyFunc` + // LOGICAL_QUERY_STAGE pattern is matched at `PropagateEmptyRelationBase.commonApplyFunc` + // and `AQEPropagateEmptyRelation.eliminateSingleColumnNullAwareAntiJoin` + // Note that, We can not specify ruleId here since the LogicalQueryStage is not immutable. + _.containsAnyPattern(LOGICAL_QUERY_STAGE, LOCAL_RELATION, TRUE_OR_FALSE_LITERAL)) { + eliminateSingleColumnNullAwareAntiJoin.orElse(commonApplyFunc) + } +} \ No newline at end of file diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala index d34b93e5b..be4efd90c 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, UnknownPartitioning} +import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -36,7 +37,7 @@ import scala.collection.mutable.ArrayBuffer * node during canonicalization. * @param partitionSpecs The partition specs that defines the arrangement. */ -case class ColumnarCustomShuffleReaderExec( +case class OmniAQEShuffleReadExec( child: SparkPlan, partitionSpecs: Seq[ShufflePartitionSpec]) extends UnaryExecNode { @@ -57,9 +58,9 @@ case class ColumnarCustomShuffleReaderExec( partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size == partitionSpecs.length) { child match { - case ShuffleQueryStageExec(_, s: ShuffleExchangeLike) => + case ShuffleQueryStageExec(_, s: ShuffleExchangeLike, _) => s.child.outputPartitioning - case ShuffleQueryStageExec(_, r @ ReusedExchangeExec(_, s: ShuffleExchangeLike)) => + case ShuffleQueryStageExec(_, r @ ReusedExchangeExec(_, s: ShuffleExchangeLike), _) => s.child.outputPartitioning match { case e: Expression => r.updateAttr(e).asInstanceOf[Partitioning] case other => other @@ -67,13 +68,34 @@ case class ColumnarCustomShuffleReaderExec( case _ => throw new IllegalStateException("operating on canonicalization plan") } + } else if (isCoalescedRead) { + // For coalesced shuffle read, the data distribution is not changed, only the number of + // partitions is changed. + child.outputPartitioning match { + case h: HashPartitioning => + CurrentOrigin.withOrigin(h.origin)(h.copy(numPartitions = partitionSpecs.length)) + case r: RangePartitioning => + CurrentOrigin.withOrigin(r.origin)(r.copy(numPartitions = partitionSpecs.length)) + // This can only happen for `REBALANCE_PARTITIONS_BY_NONE`, which uses + // `RoundRobinPartitioning` but we don't need to retain the number of partitions. + case r: RoundRobinPartitioning => + r.copy(numPartitions = partitionSpecs.length) + case other @ SinglePartition => + throw new IllegalStateException( + "Unexpected partitioning for coalesced shuffle read: " + other) + case _ => + // Spark plugins may have custom partitioning and may replace this operator + // during the postStageOptimization phase, so return UnknownPartitioning here + // rather than throw an exception + UnknownPartitioning(partitionSpecs.length) + } } else { UnknownPartitioning(partitionSpecs.length) } } override def stringArgs: Iterator[Any] = { - val desc = if (isLocalReader) { + val desc = if (isLocalRead) { "local" } else if (hasCoalescedPartition && hasSkewedPartition) { "coalesced and skewed" @@ -87,14 +109,38 @@ case class ColumnarCustomShuffleReaderExec( Iterator(desc) } - def hasCoalescedPartition: Boolean = - partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec]) + /** + * Returns true iff some partitions were actually combined + */ + private def isCoalescedSpec(spec: ShufflePartitionSpec) = spec match { + case CoalescedPartitionSpec(0, 0, _) => true + case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1 + case _ => false + } + + /** + * Returns true iff some non-empty partitions were combined + */ + def hasCoalescedPartition: Boolean = { + partitionSpecs.exists(isCoalescedSpec) + } def hasSkewedPartition: Boolean = partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) - def isLocalReader: Boolean = - partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) + def isLocalRead: Boolean = + partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) || + partitionSpecs.exists(_.isInstanceOf[CoalescedMapperPartitionSpec]) + + def isCoalescedRead: Boolean = { + partitionSpecs.sliding(2).forall { + // A single partition spec which is `CoalescedPartitionSpec` also means coalesced read. + case Seq(_: CoalescedPartitionSpec) => true + case Seq(l: CoalescedPartitionSpec, r: CoalescedPartitionSpec) => + l.endReducerIndex <= r.startReducerIndex + case _ => false + } + } private def shuffleStage = child match { case stage: ShuffleQueryStageExec => Some(stage) @@ -102,13 +148,13 @@ case class ColumnarCustomShuffleReaderExec( } @transient private lazy val partitionDataSizes: Option[Seq[Long]] = { - if (partitionSpecs.nonEmpty && !isLocalReader && shuffleStage.get.mapStats.isDefined) { - val bytesByPartitionId = shuffleStage.get.mapStats.get.bytesByPartitionId + if (!isLocalRead && shuffleStage.get.mapStats.isDefined) { Some(partitionSpecs.map { - case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => - startReducerIndex.until(endReducerIndex).map(bytesByPartitionId).sum + case p: CoalescedPartitionSpec => + assert(p.dataSize.isDefined) + p.dataSize.get case p: PartialReducerPartitionSpec => p.dataSize - case p => throw new IllegalStateException("unexpected " + p) + case p => throw new IllegalStateException(s"unexpected $p") }) } else { None @@ -141,6 +187,13 @@ case class ColumnarCustomShuffleReaderExec( driverAccumUpdates += (skewedSplits.id -> numSplits) } + if (hasCoalescedPartition) { + val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions") + val x = partitionSpecs.count(isCoalescedSpec) + numCoalescedPartitionsMetric.set(x) + driverAccumUpdates += numCoalescedPartitionsMetric.id -> x + } + partitionDataSizes.foreach { dataSizes => val partitionDataSizeMetrics = metrics("partitionDataSize") driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _) @@ -154,8 +207,8 @@ case class ColumnarCustomShuffleReaderExec( @transient override lazy val metrics: Map[String, SQLMetric] = { if (shuffleStage.isDefined) { Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions")) ++ { - if (isLocalReader) { - // We split the mapper partition evenly when creating local shuffle reader, so no + if (isLocalRead) { + // We split the mapper partition evenly when creating local shuffle read, so no // data size info is available. Map.empty } else { @@ -171,6 +224,13 @@ case class ColumnarCustomShuffleReaderExec( } else { Map.empty } + } ++ { + if (hasCoalescedPartition) { + Map("numCoalescedPartitions" -> + SQLMetrics.createMetric(sparkContext, "number of coalesced partitions")) + } else { + Map.empty + } } } else { // It's a canonicalized plan, no need to report metrics. @@ -178,24 +238,19 @@ case class ColumnarCustomShuffleReaderExec( } } - private var cachedShuffleRDD: RDD[ColumnarBatch] = null - private lazy val shuffleRDD: RDD[_] = { - sendDriverMetrics() - if (cachedShuffleRDD == null) { - cachedShuffleRDD = child match { - case stage: ShuffleQueryStageExec => - new ShuffledColumnarRDD( - stage.shuffle - .asInstanceOf[ColumnarShuffleExchangeExec] - .columnarShuffleDependency, - stage.shuffle.asInstanceOf[ColumnarShuffleExchangeExec].readMetrics, - partitionSpecs.toArray) - case _ => - throw new IllegalStateException("operating on canonicalized plan") - } + shuffleStage match { + case Some(stage) => + sendDriverMetrics() + new ShuffledColumnarRDD( + stage.shuffle + .asInstanceOf[ColumnarShuffleExchangeExec] + .columnarShuffleDependency, + stage.shuffle.asInstanceOf[ColumnarShuffleExchangeExec].readMetrics, + partitionSpecs.toArray) + case _ => + throw new IllegalStateException("operating on canonicalized plan") } - cachedShuffleRDD } override protected def doExecute(): RDD[InternalRow] = { @@ -205,4 +260,7 @@ case class ColumnarCustomShuffleReaderExec( override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { shuffleRDD.asInstanceOf[RDD[ColumnarBatch]] } + + override protected def withNewChildInternal(newChild: SparkPlan): OmniAQEShuffleReadExec = + new OmniAQEShuffleReadExec(newChild, this.partitionSpecs) } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala deleted file mode 100644 index 4edf0f4f8..000000000 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin -import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi} -import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.ColumnarHashedRelation -import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation, HashedRelationWithAllNullKeys} - -/** - * This optimization rule detects and converts a Join to an empty [[LocalRelation]]: - * 1. Join is single column NULL-aware anti join (NAAJ), and broadcasted [[HashedRelation]] - * is [[HashedRelationWithAllNullKeys]]. - * - * 2. Join is inner or left semi join, and broadcasted [[HashedRelation]] - * is [[EmptyHashedRelation]]. - * This applies to all Joins (sort merge join, shuffled hash join, and broadcast hash join), - * because sort merge join and shuffled hash join will be changed to broadcast hash join with AQE - * at the first place. - */ -object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { - - private def canEliminate(plan: LogicalPlan, relation: HashedRelation): Boolean = plan match { - case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined - && stage.broadcast.relationFuture.get().value == relation => true - case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined - && stage.broadcast.supportsColumnar => { - val cr = stage.broadcast.relationFuture.get().value.asInstanceOf[ColumnarHashedRelation] - cr.relation == relation - } - case _ => false - } - - def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { - case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) - if canEliminate(j.right, HashedRelationWithAllNullKeys) => - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - - case j @ Join(_, _, Inner, _, _) if canEliminate(j.left, EmptyHashedRelation) || - canEliminate(j.right, EmptyHashedRelation) => - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - - case j @ Join(_, _, LeftSemi, _, _) if canEliminate(j.right, EmptyHashedRelation) => - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - } -} diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala deleted file mode 100644 index c9a0dcbbf..000000000 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogStatistics -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} -import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan} -import org.apache.spark.sql.types.StructType - -/** - * Prune the partitions of file source based table using partition filters. Currently, this rule - * is applied to [[HadoopFsRelation]] with [[CatalogFileIndex]] and [[DataSourceV2ScanRelation]] - * with [[FileScan]]. - * - * For [[HadoopFsRelation]], the location will be replaced by pruned file index, and corresponding - * statistics will be updated. And the partition filters will be kept in the filters of returned - * logical plan. - * - * For [[DataSourceV2ScanRelation]], both partition filters and data filters will be added to - * its underlying [[FileScan]]. And the partition filters will be removed in the filters of - * returned logical plan. - */ -private[sql] object PruneFileSourcePartitions - extends Rule[LogicalPlan] with PredicateHelper { - - private def getPartitionKeyFiltersAndDataFilters( - sparkSession: SparkSession, - relation: LeafNode, - partitionSchema: StructType, - filters: Seq[Expression], - output: Seq[AttributeReference]): (ExpressionSet, Seq[Expression]) = { - val normalizedFilters = DataSourceStrategy.normalizeExprs( - filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), output) - val partitionColumns = - relation.resolve(partitionSchema, sparkSession.sessionState.analyzer.resolver) - val partitionSet = AttributeSet(partitionColumns) - val (partitionFilters, dataFilters) = normalizedFilters.partition(f => - f.references.subsetOf(partitionSet) - ) - val extraPartitionFilter = - dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet)) - - (ExpressionSet(partitionFilters ++ extraPartitionFilter), dataFilters) - } - - private def rebuildPhysicalOperation( - projects: Seq[NamedExpression], - filters: Seq[Expression], - relation: LeafNode): Project = { - val withFilter = if (filters.nonEmpty) { - val filterExpression = filters.reduceLeft(And) - Filter(filterExpression, relation) - } else { - relation - } - Project(projects, withFilter) - } - - override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { - case op @ PhysicalOperation(projects, filters, - logicalRelation @ - LogicalRelation(fsRelation @ - HadoopFsRelation( - catalogFileIndex: CatalogFileIndex, - partitionSchema, - _, - _, - _, - _), - _, - _, - _)) - if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => - val (partitionKeyFilters, _) = getPartitionKeyFiltersAndDataFilters( - fsRelation.sparkSession, logicalRelation, partitionSchema, filters, - logicalRelation.output) - // Fix spark issue SPARK-34119(row 104-113) - if (partitionKeyFilters.nonEmpty) { - val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) - val prunedFsRelation = - fsRelation.copy(location = prunedFileIndex)(fsRelation.sparkSession) - // Change table stats based on the sizeInBytes of pruned files - val filteredStats = - FilterEstimation(Filter(partitionKeyFilters.reduce(And), logicalRelation)).estimate - val colStats = filteredStats.map(_.attributeStats.map { case (attr, colStat) => - (attr.name, colStat.toCatalogColumnStat(attr.name, attr.dataType)) - }) - val withStats = logicalRelation.catalogTable.map(_.copy( - stats = Some(CatalogStatistics( - sizeInBytes = BigInt(prunedFileIndex.sizeInBytes), - rowCount = filteredStats.flatMap(_.rowCount), - colStats = colStats.getOrElse(Map.empty))))) - val prunedLogicalRelation = logicalRelation.copy( - relation = prunedFsRelation, catalogTable = withStats) - // Keep partition-pruning predicates so that they are visible in physical planning - rebuildPhysicalOperation(projects, filters, prunedLogicalRelation) - } else { - op - } - - case op @ PhysicalOperation(projects, filters, - v2Relation @ DataSourceV2ScanRelation(_, scan: FileScan, output)) - if filters.nonEmpty && scan.readDataSchema.nonEmpty => - val (partitionKeyFilters, dataFilters) = - getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation, - scan.readPartitionSchema, filters, output) - // The dataFilters are pushed down only once - if (partitionKeyFilters.nonEmpty || (dataFilters.nonEmpty && scan.dataFilters.isEmpty)) { - val prunedV2Relation = - v2Relation.copy(scan = scan.withFilters(partitionKeyFilters.toSeq, dataFilters)) - // The pushed down partition filters don't need to be reevaluated. - val afterScanFilters = - ExpressionSet(filters) -- partitionKeyFilters.filter(_.references.nonEmpty) - rebuildPhysicalOperation(projects, afterScanFilters.toSeq, prunedV2Relation) - } else { - op - } - } -} diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala index 0e5a7eae6..7325635ff 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala @@ -82,18 +82,17 @@ class OmniOrcFileFormat extends FileFormat with DataSourceRegister with Serializ val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - val resultedColPruneInfo = - Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => - OrcUtils.requestedColumnIds( - isCaseSensitive, dataSchema, requiredSchema, reader, conf) - } + val orcSchema = + Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions))(_.getSchema) + val resultedColPruneInfo = OrcUtils.requestedColumnIds( + isCaseSensitive, dataSchema, requiredSchema, orcSchema, conf) if (resultedColPruneInfo.isEmpty) { Iterator.empty } else { // ORC predicate pushdown - if (orcFilterPushDown) { - OrcUtils.readCatalystSchema(filePath, conf, ignoreCorruptFiles).foreach { + if (orcFilterPushDown && filters.nonEmpty) { + OrcUtils.readCatalystSchema(filePath, conf, ignoreCorruptFiles).foreach { fileSchema => OrcFilters.createFilter(fileSchema, filters).foreach { f => OrcInputFormat.setSearchArgument(conf, f, fileSchema.fieldNames) } @@ -107,6 +106,8 @@ class OmniOrcFileFormat extends FileFormat with DataSourceRegister with Serializ "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) + val includeColumns = requestedColIds.filter(_ != -1).sorted.mkString(",") + taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, includeColumns) val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala deleted file mode 100644 index 3392caa54..000000000 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.orc - -import java.nio.charset.StandardCharsets.UTF_8 -import java.util.Locale - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription, Writer} - -import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} -import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils} -import org.apache.spark.sql.execution.datasources.SchemaMergeUtils -import org.apache.spark.sql.types._ -import org.apache.spark.util.{ThreadUtils, Utils} - -object OrcUtils extends Logging { - - // The extensions for ORC compression codecs - val extensionsForCompressionCodecNames = Map( - "NONE" -> "", - "SNAPPY" -> ".snappy", - "ZLIB" -> ".zlib", - "LZO" -> ".lzo", - "ZSTD" -> ".zstd", - "ZSTD_JNI" -> ".zstd_jni") - - def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { - val origPath = new Path(pathStr) - val fs = origPath.getFileSystem(conf) - val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath) - .filterNot(_.isDirectory) - .map(_.getPath) - .filterNot(_.getName.startsWith("_")) - .filterNot(_.getName.startsWith(".")) - paths - } - - def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean) - : Option[TypeDescription] = { - val fs = file.getFileSystem(conf) - val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - try { - val schema = Utils.tryWithResource(OrcFile.createReader(file, readerOptions)) { reader => - reader.getSchema - } - if (schema.getFieldNames.isEmpty) { - None - } else { - Some(schema) - } - } catch { - case e: org.apache.orc.FileFormatException => - if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file", e) - None - } else { - throw new SparkException(s"Could not read footer for file", e) - } - } - } - - private def toCatalystSchema(schema: TypeDescription): StructType = { - // The Spark query engine has not completely supported CHAR/VARCHAR type yet, and here we - // replace the orc CHAR/VARCHAR with STRING type. - CharVarcharUtils.replaceCharVarcharWithStringInSchema( - CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]) - } - - def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String]) - : Option[StructType] = { - val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles - val conf = sparkSession.sessionState.newHadoopConfWithOptions(options) - files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { - case Some(schema) => - logDebug(s"Reading schema from file $files, got Hive schema string: $schema") - toCatalystSchema(schema) - } - } - - def readCatalystSchema( - file: Path, - conf: Configuration, - ignoreCorruptFiles: Boolean): Option[StructType] = { - readSchema(file, conf, ignoreCorruptFiles) match { - case Some(schema) => Some(toCatalystSchema(schema)) - - case None => - // Field names is empty or `FileFormatException` was thrown but ignoreCorruptFiles is true. - None - } - } - - /** - * Reads ORC file schemas in multi-threaded manner, using native version of ORC. - * This is visible for testing. - */ - def readOrcSchemasInParallel( - files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean): Seq[StructType] = { - ThreadUtils.parmap(files, "readingOrcSchemas", 8) { currentFile => - OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles).map(toCatalystSchema) - }.flatten - } - - def inferSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String]) - : Option[StructType] = { - val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) - if (orcOptions.mergeSchema) { - SchemaMergeUtils.mergeSchemasInParallel( - sparkSession, options, files, OrcUtils.readOrcSchemasInParallel) - } else { - OrcUtils.readSchema(sparkSession, files, options) - } - } - - /** - * @return Returns the combination of requested column ids from the given ORC file and - * boolean flag to find if the pruneCols is allowed or not. Requested Column id can be - * -1, which means the requested column doesn't exist in the ORC file. Returns None - * if the given ORC file is empty. - */ - def requestedColumnIds( - isCaseSensitive: Boolean, - dataSchema: StructType, - requiredSchema: StructType, - reader: Reader, - conf: Configuration): Option[(Array[Int], Boolean)] = { - val orcFieldNames = reader.getSchema.getFieldNames.asScala - if (orcFieldNames.isEmpty) { - // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer. - None - } else { - if (orcFieldNames.forall(_.startsWith("_col"))) { - // This is a ORC file written by Hive, no field names in the physical schema, assume the - // physical schema maps to the data scheme by index. - assert(orcFieldNames.length <= dataSchema.length, "The given data schema " + - s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " + - "no idea which columns were dropped, fail to read.") - // for ORC file written by Hive, no field names - // in the physical schema, there is a need to send the - // entire dataSchema instead of required schema. - // So pruneCols is not done in this case - Some(requiredSchema.fieldNames.map { name => - val index = dataSchema.fieldIndex(name) - if (index < orcFieldNames.length) { - index - } else { - -1 - } - }, false) - } else { - if (isCaseSensitive) { - Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) => - if (orcFieldNames.indexWhere(caseSensitiveResolution(_, name)) != -1) { - idx - } else { - -1 - } - }, true) - } else { - // Do case-insensitive resolution only if in case-insensitive mode - val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT)) - Some(requiredSchema.fieldNames.zipWithIndex.map { case (requiredFieldName, idx) => - caseInsensitiveOrcFieldMap - .get(requiredFieldName.toLowerCase(Locale.ROOT)) - .map { matchedOrcFields => - if (matchedOrcFields.size > 1) { - // Need to fail if there is ambiguity, i.e. more than one field is matched. - val matchedOrcFieldsString = matchedOrcFields.mkString("[", ", ", "]") - reader.close() - throw new RuntimeException(s"""Found duplicate field(s) "$requiredFieldName": """ - + s"$matchedOrcFieldsString in case-insensitive mode") - } else { - idx - } - }.getOrElse(-1) - }, true) - } - } - } - } - - /** - * Add a metadata specifying Spark version. - */ - def addSparkVersionMetadata(writer: Writer): Unit = { - writer.addUserMetadata(SPARK_VERSION_METADATA_KEY, UTF_8.encode(SPARK_VERSION_SHORT)) - } - - /** - * Given a `StructType` object, this methods converts it to corresponding string representation - * in ORC. - */ - def orcTypeDescriptionString(dt: DataType): String = dt match { - case s: StructType => - val fieldTypes = s.fields.map { f => - s"${quoteIdentifier(f.name)}:${orcTypeDescriptionString(f.dataType)}" - } - s"struct<${fieldTypes.mkString(",")}>" - case a: ArrayType => - s"array<${orcTypeDescriptionString(a.elementType)}>" - case m: MapType => - s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>" - case _ => dt.catalogString - } - - /** - * Returns the result schema to read from ORC file. In addition, It sets - * the schema string to 'orc.mapred.input.schema' so ORC reader can use later. - * - * @param canPruneCols Flag to decide whether pruned cols schema is send to resultSchema - * or to send the entire dataSchema to resultSchema. - * @param dataSchema Schema of the orc files. - * @param resultSchema Result data schema created after pruning cols. - * @param partitionSchema Schema of partitions. - * @param conf Hadoop Configuration. - * @return Returns the result schema as string. - */ - def orcResultSchemaString( - canPruneCols: Boolean, - dataSchema: StructType, - resultSchema: StructType, - partitionSchema: StructType, - conf: Configuration): String = { - val resultSchemaString = if (canPruneCols) { - OrcUtils.orcTypeDescriptionString(resultSchema) - } else { - OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) - } - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) - resultSchemaString - } -} diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala index a2ee977f9..2c1271fb0 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala @@ -97,6 +97,9 @@ case class ColumnarBroadcastHashJoinExec( override def nodeName: String = "OmniColumnarBroadcastHashJoin" + override protected def withNewChildrenInternal(newLeft: SparkPlan, newRight: SparkPlan): + ColumnarBroadcastHashJoinExec = copy(left = newLeft, right = newRight) + override def requiredChildDistribution: Seq[Distribution] = { val mode = HashedRelationBroadcastMode(buildBoundKeys, isNullAwareAntiJoin) buildSide match { @@ -109,7 +112,7 @@ case class ColumnarBroadcastHashJoinExec( override lazy val outputPartitioning: Partitioning = { joinType match { - case _: InnerLike if sqlContext.conf.broadcastHashJoinOutputPartitioningExpandLimit > 0 => + case _: InnerLike if session.sqlContext.conf.broadcastHashJoinOutputPartitioningExpandLimit > 0 => streamedPlan.outputPartitioning match { case h: HashPartitioning => expandOutputPartitioning(h) case c: PartitioningCollection => expandOutputPartitioning(c) @@ -150,7 +153,7 @@ case class ColumnarBroadcastHashJoinExec( // Seq("a", "b", "c"), Seq("a", "b", "y"), Seq("a", "x", "c"), Seq("a", "x", "y"). // The expanded expressions are returned as PartitioningCollection. private def expandOutputPartitioning(partitioning: HashPartitioning): PartitioningCollection = { - val maxNumCombinations = sqlContext.conf.broadcastHashJoinOutputPartitioningExpandLimit + val maxNumCombinations = session.sqlContext.conf.broadcastHashJoinOutputPartitioningExpandLimit var currentNumCombinations = 0 def generateExprCombinations( diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala index 9eb666fcc..263af0ddb 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala @@ -50,7 +50,8 @@ case class ColumnarShuffledHashJoinExec( buildSide: BuildSide, condition: Option[Expression], left: SparkPlan, - right: SparkPlan) + right: SparkPlan, + isSkewJoin: Boolean) extends HashJoin with ShuffledJoin { override lazy val metrics = Map( @@ -81,6 +82,9 @@ case class ColumnarShuffledHashJoinExec( override def outputPartitioning: Partitioning = super[ShuffledJoin].outputPartitioning + override protected def withNewChildrenInternal(newLeft: SparkPlan, newRight: SparkPlan): + ColumnarShuffledHashJoinExec = copy(left = newLeft, right = newRight) + override def outputOrdering: Seq[SortOrder] = joinType match { case FullOuter => Nil case _ => super.outputOrdering diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala index 59b763428..d55af2d9d 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala @@ -68,6 +68,12 @@ class ColumnarSortMergeJoinExec( if (isSkewJoin) "OmniColumnarSortMergeJoin(skew=true)" else "OmniColumnarSortMergeJoin" } + override protected def withNewChildrenInternal(newLeft: SparkPlan, + newRight: SparkPlan): ColumnarSortMergeJoinExec = { + new ColumnarSortMergeJoinExec(this.leftKeys, this.rightKeys, this.joinType, + this.condition, newLeft, newRight, this.isSkewJoin) + } + val SMJ_NEED_ADD_STREAM_TBL_DATA = 2 val SMJ_NEED_ADD_BUFFERED_TBL_DATA = 3 val SCAN_FINISH = 4 diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala deleted file mode 100644 index 0503b2b7b..000000000 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.hadoop.hive.common.StatsSetupConst - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.analysis.CastSupport -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, PredicateHelper, SubqueryExpression} -import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} -import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.DataSourceStrategy - -/** - * Prune hive table partitions using partition filters on [[HiveTableRelation]]. The pruned - * partitions will be kept in [[HiveTableRelation.prunedPartitions]], and the statistics of - * the hive table relation will be updated based on pruned partitions. - * - * This rule is executed in optimization phase, so the statistics can be updated before physical - * planning, which is useful for some spark strategy, e.g. - * [[org.apache.spark.sql.execution.SparkStrategies.JoinSelection]]. - * - * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. - */ -private[sql] class PruneHiveTablePartitions(session: SparkSession) - extends Rule[LogicalPlan] with CastSupport with PredicateHelper { - - /** - * Extract the partition filters from the filters on the table. - */ - private def getPartitionKeyFilters( - filters: Seq[Expression], - relation: HiveTableRelation): ExpressionSet = { - val normalizedFilters = DataSourceStrategy.normalizeExprs( - filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output) - val partitionColumnSet = AttributeSet(relation.partitionCols) - ExpressionSet( - normalizedFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionColumnSet))) - } - - /** - * Prune the hive table using filters on the partitions of the table. - */ - private def prunePartitions( - relation: HiveTableRelation, - partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = { - if (conf.metastorePartitionPruning) { - session.sessionState.catalog.listPartitionsByFilter( - relation.tableMeta.identifier, partitionFilters.toSeq) - } else { - ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta, - session.sessionState.catalog.listPartitions(relation.tableMeta.identifier), - partitionFilters.toSeq, conf.sessionLocalTimeZone) - } - } - - /** - * Update the statistics of the table. - */ - private def updateTableMeta( - relation: HiveTableRelation, - prunedPartitions: Seq[CatalogTablePartition], - partitionKeyFilters: ExpressionSet): CatalogTable = { - val sizeOfPartitions = prunedPartitions.map { partition => - val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) - val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) - if (rawDataSize.isDefined && rawDataSize.get > 0) { - rawDataSize.get - } else if (totalSize.isDefined && totalSize.get > 0L) { - totalSize.get - } else { - 0L - } - } - // Fix spark issue SPARK-34119(row 95-106) - if (sizeOfPartitions.forall(_ > 0)) { - val filteredStats = - FilterEstimation(Filter(partitionKeyFilters.reduce(And), relation)).estimate - val colStats = filteredStats.map(_.attributeStats.map { case (attr, colStat) => - (attr.name, colStat.toCatalogColumnStat(attr.name, attr.dataType)) - }) - relation.tableMeta.copy( - stats = Some(CatalogStatistics( - sizeInBytes = BigInt(sizeOfPartitions.sum), - rowCount = filteredStats.flatMap(_.rowCount), - colStats = colStats.getOrElse(Map.empty)))) - } else { - relation.tableMeta - } - } - - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation) - if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => - val partitionKeyFilters = getPartitionKeyFilters(filters, relation) - if (partitionKeyFilters.nonEmpty) { - val newPartitions = prunePartitions(relation, partitionKeyFilters) - // Fix spark issue SPARK-34119(row 117) - val newTableMeta = updateTableMeta(relation, newPartitions, partitionKeyFilters) - val newRelation = relation.copy( - tableMeta = newTableMeta, prunedPartitions = Some(newPartitions)) - // Keep partition filters so that they are visible in physical planning - Project(projections, Filter(filters.reduceLeft(And), newRelation)) - } else { - op - } - } -} diff --git a/omnioperator/omniop-spark-extension/pom.xml b/omnioperator/omniop-spark-extension/pom.xml index 026fc5997..c95b391b0 100644 --- a/omnioperator/omniop-spark-extension/pom.xml +++ b/omnioperator/omniop-spark-extension/pom.xml @@ -8,14 +8,14 @@ com.huawei.kunpeng boostkit-omniop-spark-parent pom - 3.1.1-1.1.0 + 3.3.1-1.1.0 BoostKit Spark Native Sql Engine Extension Parent Pom 2.12.10 2.12 - 3.1.1 + 3.3.1 3.2.2 UTF-8 UTF-8 @@ -55,6 +55,18 @@ org.apache.curator curator-recipes + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + @@ -101,6 +113,20 @@ ${omniruntime.version} aarch64 provided + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + + com.google.protobuf @@ -124,6 +150,18 @@ org.apache.curator curator-recipes + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-databind + -- Gitee From e96b2ab508da58f1900689cd296443ae5203487e Mon Sep 17 00:00:00 2001 From: chenyidao <979136761@qq.com> Date: Tue, 21 Mar 2023 17:55:49 +0800 Subject: [PATCH 2/2] solve assert fail question for aqe --- .../scala/org/apache/spark/sql/execution/ColumnarExec.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala index d6ff2b40a..47a59336e 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala @@ -298,8 +298,6 @@ case class RowToOmniColumnarExec(child: SparkPlan) extends RowToColumnarTransiti case class OmniColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition { - assert(child.supportsColumnar) - override def nodeName: String = "OmniColumnarToRow" override def output: Seq[Attribute] = child.output -- Gitee