From 0255c964a8f249fcbbbdc459f6d64a87d971b9c2 Mon Sep 17 00:00:00 2001
From: chenyidao <979136761@qq.com>
Date: Tue, 28 Feb 2023 16:16:52 +0800
Subject: [PATCH 1/2] omni adapte spark331
---
.../omniop-spark-extension/java/pom.xml | 10 +-
.../vectorized/OmniColumnVector.java | 17 ++
.../boostkit/spark/ColumnarGuardRule.scala | 20 +-
.../boostkit/spark/ColumnarPlugin.scala | 63 ++++-
.../boostkit/spark/ColumnarPluginConfig.scala | 2 +-
.../boostkit/spark/ShuffleJoinStrategy.scala | 2 +-
.../expression/OmniExpressionAdaptor.scala | 4 +-
.../spark/shuffle/ColumnarShuffleWriter.scala | 7 +-
.../sort/OmniColumnarShuffleManager.scala | 5 +-
.../ColumnarBasicPhysicalOperators.scala | 21 +-
...ColumnarBroadcastExchangeAdaptorExec.scala | 3 +
.../ColumnarBroadcastExchangeExec.scala | 5 +-
.../spark/sql/execution/ColumnarExec.scala | 19 +-
.../sql/execution/ColumnarExpandExec.scala | 19 ++
.../ColumnarFileSourceScanExec.scala | 103 ++++---
.../execution/ColumnarHashAggregateExec.scala | 42 +--
.../ColumnarShuffleExchangeExec.scala | 39 ++-
.../sql/execution/ColumnarSortExec.scala | 3 +
.../ColumnarTakeOrderedAndProjectExec.scala | 3 +
.../sql/execution/ColumnarWindowExec.scala | 22 +-
.../sql/execution/ShuffledColumnarRDD.scala | 59 +++-
.../adaptive/AQEPropagateEmptyRelation.scala | 100 +++++++
.../ColumnarCustomShuffleReaderExec.scala | 122 ++++++---
.../EliminateJoinToEmptyRelation.scala | 63 -----
.../PruneFileSourcePartitions.scala | 139 ----------
.../datasources/orc/OmniOrcFileFormat.scala | 15 +-
.../execution/datasources/orc/OrcUtils.scala | 256 ------------------
.../joins/ColumnarBroadcastHashJoinExec.scala | 7 +-
.../joins/ColumnarShuffledHashJoinExec.scala | 6 +-
.../joins/ColumnarSortMergeJoinExec.scala | 6 +
.../execution/PruneHiveTablePartitions.scala | 126 ---------
omnioperator/omniop-spark-extension/pom.xml | 42 ++-
32 files changed, 595 insertions(+), 755 deletions(-)
create mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
delete mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala
delete mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
delete mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
delete mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
diff --git a/omnioperator/omniop-spark-extension/java/pom.xml b/omnioperator/omniop-spark-extension/java/pom.xml
index caafa313f..3e3175bab 100644
--- a/omnioperator/omniop-spark-extension/java/pom.xml
+++ b/omnioperator/omniop-spark-extension/java/pom.xml
@@ -7,7 +7,7 @@
com.huawei.kunpeng
boostkit-omniop-spark-parent
- 3.1.1-1.1.0
+ 3.3.1-1.1.0
../pom.xml
@@ -103,20 +103,20 @@
spark-core_${scala.binary.version}
test-jar
test
- 3.1.1
+ 3.3.1
org.apache.spark
spark-catalyst_${scala.binary.version}
test-jar
test
- 3.1.1
+ 3.3.1
org.apache.spark
spark-sql_${scala.binary.version}
test-jar
- 3.1.1
+ 3.3.1
test
@@ -127,7 +127,7 @@
org.apache.spark
spark-hive_${scala.binary.version}
- 3.1.1
+ 3.3.1
provided
diff --git a/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java b/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java
index 808f96e1f..3676d38dc 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java
+++ b/omnioperator/omniop-spark-extension/java/src/main/java/org/apache/spark/sql/execution/vectorized/OmniColumnVector.java
@@ -354,6 +354,18 @@ public class OmniColumnVector extends WritableColumnVector {
}
}
+ @Override
+ public void putBooleans(int rowId, byte src) {
+ booleanDataVec.set(rowId, (src & 1) == 1);
+ booleanDataVec.set(rowId + 1, (src >>> 1 & 1) == 1);
+ booleanDataVec.set(rowId + 2, (src >>> 2 & 1) == 1);
+ booleanDataVec.set(rowId + 3, (src >>> 3 & 1) == 1);
+ booleanDataVec.set(rowId + 4, (src >>> 4 & 1) == 1);
+ booleanDataVec.set(rowId + 5, (src >>> 5 & 1) == 1);
+ booleanDataVec.set(rowId + 6, (src >>> 6 & 1) == 1);
+ booleanDataVec.set(rowId + 7, (src >>> 7 & 1) == 1);
+ }
+
@Override
public boolean getBoolean(int rowId) {
if (dictionaryData != null) {
@@ -453,6 +465,11 @@ public class OmniColumnVector extends WritableColumnVector {
return UTF8String.fromBytes(getBytes(rowId, count), rowId, count);
}
+ @Override
+ public ByteBuffer getByteBuffer(int rowId, int count) {
+ throw new UnsupportedOperationException("getByteBuffer is not supported");
+ }
+
//
// APIs dealing with Shorts
//
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarGuardRule.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarGuardRule.scala
index a4e4eaa0a..46dd4b45a 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarGuardRule.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarGuardRule.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, CustomShuffleReaderExec}
+import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, OmniAQEShuffleReadExec}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
@@ -37,6 +37,9 @@ case class RowGuard(child: SparkPlan) extends SparkPlan {
}
def children: Seq[SparkPlan] = Seq(child)
+
+ override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan =
+ legacyWithNewChildren(newChildren)
}
case class ColumnarGuardRule() extends Rule[SparkPlan] {
@@ -92,6 +95,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
if (!enableColumnarHashAgg) return false
new ColumnarHashAggregateExec(
plan.requiredChildDistributionExpressions,
+ plan.isStreaming,
+ plan.numShufflePartitions,
plan.groupingExpressions,
plan.aggregateExpressions,
plan.aggregateAttributes,
@@ -127,9 +132,9 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
left match {
case exec: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(exec.mode, exec.child)
- case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec) =>
+ case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec, _) =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
- case BroadcastQueryStageExec(_, plan: ReusedExchangeExec) =>
+ case BroadcastQueryStageExec(_, plan: ReusedExchangeExec, _) =>
plan match {
case ReusedExchangeExec(_, b: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(b.mode, b.child)
@@ -141,9 +146,9 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
right match {
case exec: BroadcastExchangeExec =>
new ColumnarBroadcastExchangeExec(exec.mode, exec.child)
- case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec) =>
+ case BroadcastQueryStageExec(_, plan: BroadcastExchangeExec, _) =>
new ColumnarBroadcastExchangeExec(plan.mode, plan.child)
- case BroadcastQueryStageExec(_, plan: ReusedExchangeExec) =>
+ case BroadcastQueryStageExec(_, plan: ReusedExchangeExec, _) =>
plan match {
case ReusedExchangeExec(_, b: BroadcastExchangeExec) =>
new ColumnarBroadcastExchangeExec(b.mode, b.child)
@@ -182,7 +187,8 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
plan.buildSide,
plan.condition,
plan.left,
- plan.right).buildCheck()
+ plan.right,
+ plan.isSkewJoin).buildCheck()
case plan: BroadcastNestedLoopJoinExec => return false
case p =>
p
@@ -237,7 +243,7 @@ case class ColumnarGuardRule() extends Rule[SparkPlan] {
case p if !supportCodegen(p) =>
// insert row guard them recursively
p.withNewChildren(p.children.map(insertRowGuardOrNot))
- case p: CustomShuffleReaderExec =>
+ case p: OmniAQEShuffleReadExec =>
p.withNewChildren(p.children.map(insertRowGuardOrNot))
case p: BroadcastQueryStageExec =>
p
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala
index d3fcbaf53..a94eb5d67 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.DynamicPruningSubquery
import org.apache.spark.sql.catalyst.expressions.aggregate.Partial
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowToOmniColumnarExec, _}
-import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ColumnarCustomShuffleReaderExec, CustomShuffleReaderExec, QueryStageExec, ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, OmniAQEShuffleReadExec, AQEShuffleReadExec, QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
@@ -247,6 +247,8 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
case _ =>
new ColumnarHashAggregateExec(
plan.requiredChildDistributionExpressions,
+ plan.isStreaming,
+ plan.numShufflePartitions,
plan.groupingExpressions,
plan.aggregateExpressions,
plan.aggregateAttributes,
@@ -257,6 +259,8 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
} else {
new ColumnarHashAggregateExec(
plan.requiredChildDistributionExpressions,
+ plan.isStreaming,
+ plan.numShufflePartitions,
plan.groupingExpressions,
plan.aggregateExpressions,
plan.aggregateAttributes,
@@ -267,6 +271,8 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
} else {
new ColumnarHashAggregateExec(
plan.requiredChildDistributionExpressions,
+ plan.isStreaming,
+ plan.numShufflePartitions,
plan.groupingExpressions,
plan.aggregateExpressions,
plan.aggregateAttributes,
@@ -311,7 +317,8 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
plan.buildSide,
plan.condition,
left,
- right)
+ right,
+ plan.isSkewJoin)
case plan: SortMergeJoinExec if enableColumnarSortMergeJoin =>
logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.")
val left = replaceWithColumnarPlan(plan.left)
@@ -341,19 +348,19 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
val child = replaceWithColumnarPlan(plan.child)
logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.")
new ColumnarShuffleExchangeExec(plan.outputPartitioning, child, plan.shuffleOrigin)
- case plan: CustomShuffleReaderExec if columnarConf.enableColumnarShuffle =>
+ case plan: AQEShuffleReadExec if columnarConf.enableColumnarShuffle =>
plan.child match {
case shuffle: ColumnarShuffleExchangeExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
- ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs)
- case ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec) =>
+ OmniAQEShuffleReadExec(plan.child, plan.partitionSpecs)
+ case ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _) =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
- ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs)
- case ShuffleQueryStageExec(_, reused: ReusedExchangeExec) =>
+ OmniAQEShuffleReadExec(plan.child, plan.partitionSpecs)
+ case ShuffleQueryStageExec(_, reused: ReusedExchangeExec, _) =>
reused match {
case ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec) =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
- ColumnarCustomShuffleReaderExec(
+ OmniAQEShuffleReadExec(
plan.child,
plan.partitionSpecs)
case _ =>
@@ -375,13 +382,15 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] {
curPlan.id,
BroadcastExchangeExec(
originalBroadcastPlan.mode,
- ColumnarBroadcastExchangeAdaptorExec(originalBroadcastPlan, 1)))
+ ColumnarBroadcastExchangeAdaptorExec(originalBroadcastPlan, 1)),
+ curPlan._canonicalized)
case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeExec) =>
BroadcastQueryStageExec(
curPlan.id,
BroadcastExchangeExec(
originalBroadcastPlan.mode,
- ColumnarBroadcastExchangeAdaptorExec(curPlan.plan, 1)))
+ ColumnarBroadcastExchangeAdaptorExec(curPlan.plan, 1)),
+ curPlan._canonicalized)
case _ =>
curPlan
}
@@ -409,11 +418,26 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] {
case ColumnarToRowExec(child: ColumnarBroadcastExchangeExec) =>
replaceWithColumnarPlan(child)
case plan: ColumnarToRowExec =>
- val child = replaceWithColumnarPlan(plan.child)
- if (conf.getConfString("spark.omni.sql.columnar.columnarToRow", "true").toBoolean) {
- OmniColumnarToRowExec(child)
- } else {
- ColumnarToRowExec(child)
+ plan.child match {
+ case child: BroadcastQueryStageExec =>
+ child.plan match {
+ case originalBroadcastPlan: ColumnarBroadcastExchangeExec =>
+ BroadcastQueryStageExec(
+ child.id,
+ BroadcastExchangeExec(
+ originalBroadcastPlan.mode,
+ ColumnarBroadcastExchangeAdaptorExec(originalBroadcastPlan, 1)), child._canonicalized)
+ case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeExec) =>
+ BroadcastQueryStageExec(
+ child.id,
+ BroadcastExchangeExec(
+ originalBroadcastPlan.mode,
+ ColumnarBroadcastExchangeAdaptorExec(child.plan, 1)), child._canonicalized)
+ case _ =>
+ replaceColumnarToRow(plan, conf)
+ }
+ case _ =>
+ replaceColumnarToRow(plan, conf)
}
case r: SparkPlan
if !r.isInstanceOf[QueryStageExec] && !r.supportsColumnar && r.children.exists(c =>
@@ -430,6 +454,15 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] {
val children = p.children.map(replaceWithColumnarPlan)
p.withNewChildren(children)
}
+
+ def replaceColumnarToRow(plan: ColumnarToRowExec, conf: SQLConf) : SparkPlan = {
+ val child = replaceWithColumnarPlan(plan.child)
+ if (conf.getConfString("spark.omni.sql.columnar.columnarToRow", "true").toBoolean) {
+ OmniColumnarToRowExec(child)
+ } else {
+ ColumnarToRowExec(child)
+ }
+ }
}
case class ColumnarOverrideRules(session: SparkSession) extends ColumnarRule with Logging {
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala
index 29776a07a..a698c8108 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala
@@ -153,7 +153,7 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging {
.toBoolean
val enableFusion: Boolean = conf
- .getConfString("spark.omni.sql.columnar.fusion", "true")
+ .getConfString("spark.omni.sql.columnar.fusion", "false")
.toBoolean
// Pick columnar shuffle hash join if one side join count > = 0 to build local hash map, and is
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ShuffleJoinStrategy.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ShuffleJoinStrategy.scala
index 2071420c9..6b065552c 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ShuffleJoinStrategy.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ShuffleJoinStrategy.scala
@@ -37,7 +37,7 @@ object ShuffleJoinStrategy extends Strategy
ColumnarPluginConfig.getConf.columnarPreferShuffledHashJoinCBO
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint)
+ case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, _, left, right, hint)
if columnarPreferShuffledHashJoin =>
val enable = getBroadcastBuildSide(left, right, joinType, hint, true, conf).isEmpty &&
!hintToSortMergeJoin(hint) &&
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala
index da1a5b747..c4307082a 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala
@@ -668,9 +668,9 @@ object OmniExpressionAdaptor extends Logging {
def toOmniAggFunType(agg: AggregateExpression, isHashAgg: Boolean = false, isFinal: Boolean = false): FunctionType = {
agg.aggregateFunction match {
- case Sum(_) => OMNI_AGGREGATION_TYPE_SUM
+ case Sum(_, _) => OMNI_AGGREGATION_TYPE_SUM
case Max(_) => OMNI_AGGREGATION_TYPE_MAX
- case Average(_) => OMNI_AGGREGATION_TYPE_AVG
+ case Average(_, _) => OMNI_AGGREGATION_TYPE_AVG
case Min(_) => OMNI_AGGREGATION_TYPE_MIN
case Count(Literal(1, IntegerType) :: Nil) | Count(ArrayBuffer(Literal(1, IntegerType))) =>
if (isFinal) {
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 7eca3427e..615ddb6b7 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -71,7 +71,7 @@ class ColumnarShuffleWriter[K, V](
override def write(records: Iterator[Product2[K, V]]): Unit = {
if (!records.hasNext) {
partitionLengths = new Array[Long](dep.partitioner.numPartitions)
- shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, null)
+ shuffleBlockResolver.writeMetadataFileAndCommit(dep.shuffleId, mapId, partitionLengths, Array[Long](), null)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
return
}
@@ -107,7 +107,7 @@ class ColumnarShuffleWriter[K, V](
jniWrapper.split(nativeSplitter, vb.getNativeVectorBatch)
dep.splitTime.add(System.nanoTime() - startTime)
dep.numInputRows.add(cb.numRows)
- writeMetrics.incRecordsWritten(1)
+ writeMetrics.incRecordsWritten(cb.numRows)
}
}
val startTime = System.nanoTime()
@@ -122,10 +122,11 @@ class ColumnarShuffleWriter[K, V](
partitionLengths = splitResult.getPartitionLengths
try {
- shuffleBlockResolver.writeIndexFileAndCommit(
+ shuffleBlockResolver.writeMetadataFileAndCommit(
dep.shuffleId,
mapId,
partitionLengths,
+ Array[Long](),
dataTmp)
} finally {
if (dataTmp.exists() && !dataTmp.delete()) {
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/sort/OmniColumnarShuffleManager.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/sort/OmniColumnarShuffleManager.scala
index e7c66ee72..28427bba2 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/sort/OmniColumnarShuffleManager.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/shuffle/sort/OmniColumnarShuffleManager.scala
@@ -99,7 +99,7 @@ class OmniColumnarShuffleManager(conf: SparkConf) extends ColumnarShuffleManager
env.conf,
metrics,
shuffleExecutorComponents)
- case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K@unchecked, V@unchecked] =>
+ case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
bypassMergeSortHandle,
@@ -107,9 +107,8 @@ class OmniColumnarShuffleManager(conf: SparkConf) extends ColumnarShuffleManager
env.conf,
metrics,
shuffleExecutorComponents)
- case other: BaseShuffleHandle[K@unchecked, V@unchecked, _] =>
+ case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(
- shuffleBlockResolver,
other,
mapId,
context,
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala
index cb23b68f0..86ac4fb1c 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBasicPhysicalOperators.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution
import java.util.concurrent.TimeUnit.NANOSECONDS
+
import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP
import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._
import com.huawei.boostkit.spark.util.OmniAdaptorUtil
@@ -101,6 +102,9 @@ case class ColumnarProjectExec(projectList: Seq[NamedExpression], child: SparkPl
|${ExplainUtils.generateFieldString("Input", child.output)}
|""".stripMargin
}
+
+ override protected def withNewChildInternal(newChild: SparkPlan): ColumnarProjectExec =
+ copy(child = newChild)
}
case class ColumnarFilterExec(condition: Expression, child: SparkPlan)
@@ -109,6 +113,10 @@ case class ColumnarFilterExec(condition: Expression, child: SparkPlan)
override def supportsColumnar: Boolean = true
override def nodeName: String = "OmniColumnarFilter"
+ override protected def withNewChildInternal(newChild: SparkPlan): ColumnarFilterExec = {
+ copy(this.condition, newChild)
+ }
+
// Split out all the IsNotNulls from condition.
private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition {
case IsNotNull(a) => isNullIntolerant(a) && a.references.subsetOf(child.outputSet)
@@ -116,7 +124,7 @@ case class ColumnarFilterExec(condition: Expression, child: SparkPlan)
}
// If one expression and its children are null intolerant, it is null intolerant.
- private def isNullIntolerant(expr: Expression): Boolean = expr match {
+ override def isNullIntolerant(expr: Expression): Boolean = expr match {
case e: NullIntolerant => e.children.forall(isNullIntolerant)
case _ => false
}
@@ -267,6 +275,9 @@ case class ColumnarConditionProjectExec(projectList: Seq[NamedExpression],
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+ override protected def withNewChildInternal(newChild: SparkPlan): ColumnarConditionProjectExec =
+ copy(child = newChild)
+
override lazy val metrics = Map(
"addInputTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in omni addInput"),
"numInputVecBatchs" -> SQLMetrics.createMetric(sparkContext, "number of input vecBatchs"),
@@ -383,7 +394,7 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
children.map(_.output).transpose.map { attrs =>
val firstAttr = attrs.head
val nullable = attrs.exists(_.nullable)
- val newDt = attrs.map(_.dataType).reduce(StructType.merge)
+ val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge)
if (firstAttr.dataType == newDt) {
firstAttr.withNullability(nullable)
} else {
@@ -393,6 +404,10 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) extends SparkPlan {
}
}
+ override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = {
+ copy(children = newChildren)
+ }
+
def buildCheck(): Unit = {
val inputTypes = new Array[DataType](output.size)
output.zipWithIndex.foreach {
@@ -420,7 +435,7 @@ class ColumnarRangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
- sqlContext
+ session.sqlContext
.sparkContext
.parallelize(0 until numSlices, numSlices)
.mapPartitionsWithIndex { (i, _) =>
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala
index d137388ab..1d236c16d 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala
@@ -64,4 +64,7 @@ case class ColumnarBroadcastExchangeAdaptorExec(child: SparkPlan, numPartitions:
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"),
"processTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime_datatoarrowcolumnar"))
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
+ ColumnarBroadcastExchangeAdaptorExec = copy(child = newChild)
}
\ No newline at end of file
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 72d1aae05..8a29e0d2b 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -65,7 +65,7 @@ class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
@transient
override lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](
- sqlContext.sparkSession, ColumnarBroadcastExchangeExec.executionContext) {
+ session.sqlContext.sparkSession, ColumnarBroadcastExchangeExec.executionContext) {
try {
// Setup a job group here so later it may get cancelled by groupId if necessary.
sparkContext.setJobGroup(runId.toString, s"broadcast exchange (runId $runId)",
@@ -159,6 +159,9 @@ class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
}
}
+ override protected def withNewChildInternal(newChild: SparkPlan): ColumnarBroadcastExchangeExec =
+ new ColumnarBroadcastExchangeExec(this.mode, newChild)
+
override protected def doPrepare(): Unit = {
// Materialize the future.
relationFuture
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala
index b1fd51f48..d6ff2b40a 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala
@@ -31,8 +31,9 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.util.SparkMemoryUtils
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OmniColumnVector, WritableColumnVector}
-import org.apache.spark.sql.types.{BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DecimalType, DoubleType, IntegerType, LongType, ShortType, StringType, StructType, TimestampType}
+import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DecimalType, DoubleType, IntegerType, LongType, ShortType, StringType, StructType, TimestampType}
import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
import nova.hetu.omniruntime.vector.Vec
@@ -101,6 +102,7 @@ private object RowToColumnConverter {
private def getConverterForType(dataType: DataType, nullable: Boolean): TypeConverter = {
val core = dataType match {
+ case BinaryType => BinaryConverter
case BooleanType => BooleanConverter
case ByteType => ByteConverter
case ShortType => ShortConverter
@@ -123,6 +125,13 @@ private object RowToColumnConverter {
}
}
+ private object BinaryConverter extends TypeConverter {
+ override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = {
+ val bytes = row.getBinary(column)
+ cv.appendByteArray(bytes, 0, bytes.length)
+ }
+ }
+
private object BooleanConverter extends TypeConverter {
override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit =
cv.appendBoolean(row.getBoolean(column))
@@ -232,8 +241,11 @@ case class RowToOmniColumnarExec(child: SparkPlan) extends RowToColumnarTransiti
"rowToOmniColumnarTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in row to OmniColumnar")
)
+ override protected def withNewChildInternal(newChild: SparkPlan): RowToOmniColumnarExec =
+ copy(child = newChild)
+
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
- val enableOffHeapColumnVector = sqlContext.conf.offHeapColumnVectorEnabled
+ val enableOffHeapColumnVector = session.sqlContext.conf.offHeapColumnVectorEnabled
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
val rowToOmniColumnarTime = longMetric("rowToOmniColumnarTime")
@@ -313,6 +325,9 @@ case class OmniColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransiti
ColumnarBatchToInternalRow.convert(localOutput, batches, numOutputRows, numInputBatches, omniColumnarToRowTime)
}
}
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
+ OmniColumnarToRowExec = copy(child = newChild)
}
object ColumnarBatchToInternalRow {
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala
index 27b05b16c..b25d97d60 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExpandExec.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.sql.execution
import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP
@@ -161,4 +178,6 @@ case class ColumnarExpandExec(
throw new UnsupportedOperationException(s"ColumnarExpandExec operator doesn't support doExecute().")
}
+ override protected def withNewChildInternal(newChild: SparkPlan): ColumnarExpandExec =
+ copy(child = newChild)
}
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala
index 73091d069..90594d3eb 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarFileSourceScanExec.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.{OmniOrcFileFormat, OrcFileFormat}
@@ -54,6 +55,7 @@ import org.apache.spark.sql.execution.joins.ColumnarBroadcastHashJoinExec
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.util.SparkMemoryUtils
import org.apache.spark.sql.execution.util.SparkMemoryUtils.addLeakSafeTaskCompletionListener
+import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
import org.apache.spark.sql.execution.vectorized.OmniColumnVector
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, StructType}
@@ -74,13 +76,19 @@ abstract class BaseColumnarFileSourceScanExec(
disableBucketedScan: Boolean = false)
extends DataSourceScanExec {
+ lazy val metadataColumns: Seq[AttributeReference] =
+ output.collect { case FileSourceMetadataAttribute(attr) => attr }
+
override lazy val supportsColumnar: Boolean = true
override def vectorTypes: Option[Seq[String]] =
relation.fileFormat.vectorTypes(
requiredSchema = requiredSchema,
partitionSchema = relation.partitionSchema,
- relation.sparkSession.sessionState.conf)
+ relation.sparkSession.sessionState.conf).map { vectorTypes =>
+ // for column-based file format, append metadata column's vector type classes if any
+ vectorTypes ++ Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName)
+ }
private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
@@ -96,7 +104,7 @@ abstract class BaseColumnarFileSourceScanExec(
}
private def isDynamicPruningFilter(e: Expression): Boolean =
- e.find(_.isInstanceOf[PlanExpression[_]]).isDefined
+ e.exists(_.isInstanceOf[PlanExpression[_]])
@transient lazy val selectedPartitions: Array[PartitionDirectory] = {
val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
@@ -223,7 +231,13 @@ abstract class BaseColumnarFileSourceScanExec(
@transient
private lazy val pushedDownFilters = {
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
- dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
+ // `dataFilters` should not include any metadata col filters
+ // because the metadata struct has been flatted in FileSourceStrategy
+ // and thus metadata col filters are invalid to be pushed down
+ dataFilters.filterNot(_.references.exists {
+ case FileSourceMetadataAttribute(_) => true
+ case _ => false
+ }).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
}
override protected def metadata: Map[String, String] = {
@@ -242,21 +256,26 @@ abstract class BaseColumnarFileSourceScanExec(
"DataFilters" -> seqToString(dataFilters),
"Location" -> locationDesc)
- // (SPARK-32986): Add bucketed scan info in explain output of FileSourceScanExec
- if (bucketedScan) {
- relation.bucketSpec.map { spec =>
+ relation.bucketSpec.map { spec =>
+ val bucketedKey = "Bucketed"
+ if (bucketedScan) {
val numSelectedBuckets = optionalBucketSet.map { b =>
b.cardinality()
} getOrElse {
spec.numBuckets
}
- metadata + ("SelectedBucketsCount" ->
- (s"$numSelectedBuckets out of ${spec.numBuckets}" +
+ metadata ++ Map(
+ bucketedKey -> "true",
+ "SelectedBucketsCount" -> (s"$numSelectedBuckets out of ${spec.numBuckets}" +
optionalNumCoalescedBuckets.map { b => s" (Coalesced to $b)" }.getOrElse("")))
- } getOrElse {
- metadata
+ } else if (!relation.sparkSession.sessionState.conf.bucketingEnabled) {
+ metadata + (bucketedKey -> "false (disabled by configuration)")
+ } else if (disableBucketedScan) {
+ metadata + (bucketedKey -> "false (disabled by query planner)")
+ } else {
+ metadata + (bucketedKey -> "false (bucket column(s) not read)")
}
- } else {
+ } getOrElse {
metadata
}
}
@@ -312,7 +331,7 @@ abstract class BaseColumnarFileSourceScanExec(
createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,
relation)
} else {
- createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation)
+ createReadRDD(readFile, dynamicallySelectedPartitions, relation)
}
sendDriverMetrics()
readRDD
@@ -343,7 +362,7 @@ abstract class BaseColumnarFileSourceScanExec(
driverMetrics("staticFilesNum") = filesNum
driverMetrics("staticFilesSize") = filesSize
}
- if (relation.partitionSchemaOption.isDefined) {
+ if (relation.partitionSchema.nonEmpty) {
driverMetrics("numPartitions") = partitions.length
}
}
@@ -363,7 +382,7 @@ abstract class BaseColumnarFileSourceScanExec(
None
}
} ++ {
- if (relation.partitionSchemaOption.isDefined) {
+ if (relation.partitionSchema.nonEmpty) {
Map(
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"),
"pruningTime" ->
@@ -423,7 +442,7 @@ abstract class BaseColumnarFileSourceScanExec(
/**
* Create an RDD for bucketed reads.
- * The non-bucketed variant of this function is [[createNonBucketedReadRDD]].
+ * The non-bucketed variant of this function is [[createReadRDD]].
*
* The algorithm is pretty simple: each RDD partition being returned should include all the files
* with the same bucket id from all the given Hive partitions.
@@ -447,10 +466,9 @@ abstract class BaseColumnarFileSourceScanExec(
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
- .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
+ .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath))
}
- // (SPARK-32985): Decouple bucket filter pruning and bucketed table scan
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
filesGroupedToBuckets.filter {
@@ -475,7 +493,8 @@ abstract class BaseColumnarFileSourceScanExec(
}
}
- new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+ new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
+ new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns)
}
/**
@@ -486,7 +505,7 @@ abstract class BaseColumnarFileSourceScanExec(
* @param selectedPartitions Hive-style partition that are part of the read.
* @param fsRelation [[HadoopFsRelation]] associated with the read.
*/
- private def createNonBucketedReadRDD(
+ private def createReadRDD(
readFile: (PartitionedFile) => Iterator[InternalRow],
selectedPartitions: Array[PartitionDirectory],
fsRelation: HadoopFsRelation): RDD[InternalRow] = {
@@ -496,27 +515,43 @@ abstract class BaseColumnarFileSourceScanExec(
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
+ // Filter files with bucket pruning if possible
+ val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled
+ val shouldProcess: Path => Boolean = optionalBucketSet match {
+ case Some(bucketSet) if bucketingEnabled =>
+ // Do not prune the file if bucket file name is invalid
+ filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
+ case _ =>
+ _ => true
+ }
+
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
- val isSplitable = relation.fileFormat.isSplitable(
- relation.sparkSession, relation.options, filePath)
- PartitionedFileUtil.splitFiles(
- sparkSession = relation.sparkSession,
- file = file,
- filePath = filePath,
- isSplitable = isSplitable,
- maxSplitBytes = maxSplitBytes,
- partitionValues = partition.values
- )
+
+ if (shouldProcess(filePath)) {
+ val isSplitable = relation.fileFormat.isSplitable(
+ relation.sparkSession, relation.options, filePath)
+ PartitionedFileUtil.splitFiles(
+ sparkSession = relation.sparkSession,
+ file = file,
+ filePath = filePath,
+ isSplitable = isSplitable,
+ maxSplitBytes = maxSplitBytes,
+ partitionValues = partition.values
+ )
+ } else {
+ Seq.empty
+ }
}
}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
val partitions =
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
- new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
+ new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
+ new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns)
}
// Filters unused DynamicPruningExpression expressions - one which has been replaced
@@ -551,7 +586,7 @@ abstract class BaseColumnarFileSourceScanExec(
throw new UnsupportedOperationException(s"Unsupported final aggregate expression in operator fusion, exp: $exp")
} else if (exp.mode == Partial) {
exp.aggregateFunction match {
- case Sum(_) | Min(_) | Average(_) | Max(_) | Count(_) | First(_, _) =>
+ case Sum(_, _) | Min(_) | Average(_, _) | Max(_) | Count(_) | First(_, _) =>
val aggExp = exp.aggregateFunction.children.head
omniOutputExressionOrder += {
exp.aggregateFunction.inputAggBufferAttributes.head.exprId ->
@@ -569,7 +604,7 @@ abstract class BaseColumnarFileSourceScanExec(
}
} else if (exp.mode == PartialMerge) {
exp.aggregateFunction match {
- case Sum(_) | Min(_) | Average(_) | Max(_) | Count(_) | First(_, _) =>
+ case Sum(_, _) | Min(_) | Average(_, _) | Max(_) | Count(_) | First(_, _) =>
val aggExp = exp.aggregateFunction.children.head
omniOutputExressionOrder += {
exp.aggregateFunction.inputAggBufferAttributes.head.exprId ->
@@ -815,7 +850,7 @@ case class ColumnarMultipleOperatorExec(
None
}
} ++ {
- if (relation.partitionSchemaOption.isDefined) {
+ if (relation.partitionSchema.nonEmpty) {
Map(
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"),
"pruningTime" ->
@@ -1162,7 +1197,7 @@ case class ColumnarMultipleOperatorExec1(
None
}
} ++ {
- if (relation.partitionSchemaOption.isDefined) {
+ if (relation.partitionSchema.nonEmpty) {
Map(
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"),
"pruningTime" ->
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala
index e2618842a..278bbdb55 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarHashAggregateExec.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution
import java.util.concurrent.TimeUnit.NANOSECONDS
+
import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP
import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._
import com.huawei.boostkit.spark.util.OmniAdaptorUtil
@@ -32,8 +33,9 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.execution.ColumnarProjection.dealPartitionData
-import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
+import org.apache.spark.sql.execution.aggregate.{AggregateCodegenSupport, BaseAggregateExec}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.util.SparkMemoryUtils
import org.apache.spark.sql.execution.vectorized.OmniColumnVector
@@ -45,14 +47,18 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
*/
case class ColumnarHashAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
+ isStreaming: Boolean,
+ numShufflePartitions: Option[Int],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
- extends BaseAggregateExec
- with AliasAwareOutputPartitioning {
+ extends AggregateCodegenSupport {
+
+ override protected def withNewChildInternal(newChild: SparkPlan): ColumnarHashAggregateExec =
+ copy(child = newChild)
override def verboseStringWithOperatorId(): String = {
s"""
@@ -77,6 +83,15 @@ case class ColumnarHashAggregateExec(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputVecBatchs" -> SQLMetrics.createMetric(sparkContext, "number of output vecBatchs"))
+ protected override def needHashTable: Boolean = true
+
+ protected override def doConsumeWithKeys(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+ throw new UnsupportedOperationException("ColumnarHashAgg code-gen does not support grouping keys")
+ }
+
+ protected override def doProduceWithKeys(ctx: CodegenContext): String = {
+ throw new UnsupportedOperationException("ColumnarHashAgg code-gen does not support grouping keys")
+ }
override def supportsColumnar: Boolean = true
@@ -99,7 +114,7 @@ case class ColumnarHashAggregateExec(
}
if (exp.mode == Final) {
exp.aggregateFunction match {
- case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) =>
+ case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_,_) =>
omniAggFunctionTypes(index) = toOmniAggFunType(exp, true, true)
omniAggOutputTypes(index) = toOmniAggInOutType(exp.aggregateFunction.dataType)
omniAggChannels(index) =
@@ -110,7 +125,7 @@ case class ColumnarHashAggregateExec(
}
} else if (exp.mode == PartialMerge) {
exp.aggregateFunction match {
- case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) =>
+ case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_,_) =>
omniAggFunctionTypes(index) = toOmniAggFunType(exp, true)
omniAggOutputTypes(index) =
toOmniAggInOutType(exp.aggregateFunction.inputAggBufferAttributes)
@@ -125,7 +140,7 @@ case class ColumnarHashAggregateExec(
}
} else if (exp.mode == Partial) {
exp.aggregateFunction match {
- case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) =>
+ case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_,_) =>
omniAggFunctionTypes(index) = toOmniAggFunType(exp, true)
omniAggOutputTypes(index) =
toOmniAggInOutType(exp.aggregateFunction.inputAggBufferAttributes)
@@ -150,7 +165,7 @@ case class ColumnarHashAggregateExec(
omniSourceTypes(i) = sparkTypeToOmniType(attr.dataType, attr.metadata)
}
- for (aggChannel <-omniAggChannels) {
+ for (aggChannel <- omniAggChannels) {
if (!isSimpleColumnForAll(aggChannel)) {
checkOmniJsonWhiteList("", aggChannel.toArray)
}
@@ -202,7 +217,7 @@ case class ColumnarHashAggregateExec(
}
if (exp.mode == Final) {
exp.aggregateFunction match {
- case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) =>
+ case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_, _) =>
omniAggFunctionTypes(index) = toOmniAggFunType(exp, true, true)
omniAggOutputTypes(index) =
toOmniAggInOutType(exp.aggregateFunction.dataType)
@@ -214,7 +229,7 @@ case class ColumnarHashAggregateExec(
}
} else if (exp.mode == PartialMerge) {
exp.aggregateFunction match {
- case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) =>
+ case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_, _) =>
omniAggFunctionTypes(index) = toOmniAggFunType(exp, true)
omniAggOutputTypes(index) =
toOmniAggInOutType(exp.aggregateFunction.inputAggBufferAttributes)
@@ -229,7 +244,7 @@ case class ColumnarHashAggregateExec(
}
} else if (exp.mode == Partial) {
exp.aggregateFunction match {
- case Sum(_) | Min(_) | Max(_) | Count(_) | Average(_) | First(_,_) =>
+ case Sum(_, _) | Min(_) | Max(_) | Count(_) | Average(_, _) | First(_, _) =>
omniAggFunctionTypes(index) = toOmniAggFunType(exp, true)
omniAggOutputTypes(index) =
toOmniAggInOutType(exp.aggregateFunction.inputAggBufferAttributes)
@@ -338,10 +353,3 @@ case class ColumnarHashAggregateExec(
throw new UnsupportedOperationException("This operator doesn't support doExecute().")
}
}
-
-object ColumnarHashAggregateExec {
- def supportsAggregate(aggregateBufferAttributes: Seq[Attribute]): Boolean = {
- val aggregationBufferSchema = StructType.fromAttributes(aggregateBufferAttributes)
- UnsafeFixedWidthAggregationMap.supportsAggregationBufferSchema(aggregationBufferSchema)
- }
-}
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
index cea0a1438..fc662128e 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
@@ -18,8 +18,6 @@
package org.apache.spark.sql.execution
import com.huawei.boostkit.spark.ColumnarPluginConfig
-
-import java.util.Random
import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP
import scala.collection.JavaConverters._
@@ -41,8 +39,9 @@ import org.apache.spark.shuffle.ColumnarShuffleDependency
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
+import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.createShuffleWriteProcessor
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleWriteMetricsReporter}
@@ -53,16 +52,17 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.MutablePair
+import org.apache.spark.util.random.XORShiftRandom
-class ColumnarShuffleExchangeExec(
- override val outputPartitioning: Partitioning,
- child: SparkPlan,
- shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS)
- extends ShuffleExchangeExec(outputPartitioning, child, shuffleOrigin) with ShuffleExchangeLike{
+case class ColumnarShuffleExchangeExec(
+ override val outputPartitioning: Partitioning,
+ child: SparkPlan,
+ shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS)
+ extends ShuffleExchangeLike {
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
- override lazy val readMetrics =
+ private[sql] lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics: Map[String, SQLMetric] = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
@@ -100,9 +100,19 @@ class ColumnarShuffleExchangeExec(
override def numPartitions: Int = columnarShuffleDependency.partitioner.numPartitions
+ override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[ColumnarBatch] = {
+ new ShuffledColumnarRDD(columnarShuffleDependency, readMetrics, partitionSpecs)
+ }
+
+ override def runtimeStatistics: Statistics = {
+ val dataSize = metrics("dataSize").value
+ val rowCount = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN).value
+ Statistics(dataSize, Some(rowCount))
+ }
+
@transient
lazy val columnarShuffleDependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = {
- ColumnarShuffleExchangeExec.prepareShuffleDependency(
+ val dep = ColumnarShuffleExchangeExec.prepareShuffleDependency(
inputColumnarRDD,
child.output,
outputPartitioning,
@@ -113,8 +123,8 @@ class ColumnarShuffleExchangeExec(
longMetric("numInputRows"),
longMetric("splitTime"),
longMetric("spillTime"))
+ dep
}
-
var cachedShuffleRDD: ShuffledColumnarRDD = _
override def doExecute(): RDD[InternalRow] = {
@@ -155,6 +165,8 @@ class ColumnarShuffleExchangeExec(
cachedShuffleRDD
}
}
+ override protected def withNewChildInternal(newChild: SparkPlan): ColumnarShuffleExchangeExec =
+ copy(child = newChild)
}
object ColumnarShuffleExchangeExec extends Logging {
@@ -229,7 +241,8 @@ object ColumnarShuffleExchangeExec extends Logging {
(columnarBatch: ColumnarBatch, numPartitions: Int) => {
val pidArr = new Array[Int](columnarBatch.numRows())
for (i <- 0 until columnarBatch.numRows()) {
- val position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
+ val partitionId = TaskContext.get().partitionId()
+ val position = new XORShiftRandom(partitionId).nextInt(numPartitions)
pidArr(i) = position + 1
}
val vec = new IntVec(columnarBatch.numRows())
@@ -324,6 +337,7 @@ object ColumnarShuffleExchangeExec extends Logging {
rdd.mapPartitionsWithIndexInternal((_, cbIter) => {
cbIter.map { cb => (0, cb) }
}, isOrderSensitive = isOrderSensitive)
+ case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning")
}
val numCols = outputAttributes.size
@@ -341,6 +355,7 @@ object ColumnarShuffleExchangeExec extends Logging {
new PartitionInfo("hash", numPartitions, numCols, intputTypes)
case RangePartitioning(ordering, numPartitions) =>
new PartitionInfo("range", numPartitions, numCols, intputTypes)
+ case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning")
}
new ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch](
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala
index 7c7001dbc..49f245111 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarSortExec.scala
@@ -56,6 +56,9 @@ case class ColumnarSortExec(
override def outputPartitioning: Partitioning = child.outputPartitioning
+ override protected def withNewChildInternal(newChild: SparkPlan): ColumnarSortExec =
+ copy(child = newChild)
+
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala
index 6fec9f9a0..92efd4d53 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarTakeOrderedAndProjectExec.scala
@@ -49,6 +49,9 @@ case class ColumnarTakeOrderedAndProjectExec(
override def nodeName: String = "OmniColumnarTakeOrderedAndProject"
+ override protected def withNewChildInternal(newChild: SparkPlan):
+ ColumnarTakeOrderedAndProjectExec = copy(child = newChild)
+
val serializer: Serializer = new ColumnarBatchSerializer(
longMetric("avgReadBatchNumRows"),
longMetric("numOutputRows"))
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala
index e5534d3c6..63414c781 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala
@@ -50,6 +50,9 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression],
override def supportsColumnar: Boolean = true
+ override protected def withNewChildInternal(newChild: SparkPlan): ColumnarWindowExec =
+ copy(child = newChild)
+
override lazy val metrics = Map(
"addInputTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in omni addInput"),
"numInputVecBatchs" -> SQLMetrics.createMetric(sparkContext, "number of input vecBatchs"),
@@ -59,25 +62,6 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression],
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputVecBatchs" -> SQLMetrics.createMetric(sparkContext, "number of output vecBatchs"))
- override def output: Seq[Attribute] =
- child.output ++ windowExpression.map(_.toAttribute)
-
- override def requiredChildDistribution: Seq[Distribution] = {
- if (partitionSpec.isEmpty) {
- // Only show warning when the number of bytes is larger than 100 MiB?
- logWarning("No Partition Defined for Window operation! Moving all data to a single "
- + "partition, this can cause serious performance degradation.")
- AllTuples :: Nil
- } else ClusteredDistribution(partitionSpec) :: Nil
- }
-
- override def requiredChildOrdering: Seq[Seq[SortOrder]] =
- Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
-
- override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
- override def outputPartitioning: Partitioning = child.outputPartitioning
-
override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarRDD.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarRDD.scala
index 1e728239b..7f664121b 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarRDD.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ShuffledColumnarRDD.scala
@@ -24,6 +24,43 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsRe
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
+sealed trait ShufflePartitionSpec
+
+// A partition that reads data of one or more reducers, from `startReducerIndex` (inclusive) to
+// `endReducerIndex` (exclusive).
+case class CoalescedPartitionSpec(
+ startReducerIndex: Int,
+ endReducerIndex: Int,
+ @transient dataSize: Option[Long] = None) extends ShufflePartitionSpec
+
+object CoalescedPartitionSpec {
+ def apply(startReducerIndex: Int,
+ endReducerIndex: Int,
+ dataSize: Long): CoalescedPartitionSpec = {
+ CoalescedPartitionSpec(startReducerIndex, endReducerIndex, Some(dataSize))
+ }
+}
+
+// A partition that reads partial data of one reducer, from `startMapIndex` (inclusive) to
+// `endMapIndex` (exclusive).
+case class PartialReducerPartitionSpec(
+ reducerIndex: Int,
+ startMapIndex: Int,
+ endMapIndex: Int,
+ @transient dataSize: Long) extends ShufflePartitionSpec
+
+// A partition that reads partial data of one mapper, from `startReducerIndex` (inclusive) to
+// `endReducerIndex` (exclusive).
+case class PartialMapperPartitionSpec(
+ mapIndex: Int,
+ startReducerIndex: Int,
+ endReducerIndex: Int) extends ShufflePartitionSpec
+
+case class CoalescedMapperPartitionSpec(
+ startMapIndex: Int,
+ endMapIndex: Int,
+ numReducers: Int) extends ShufflePartitionSpec
+
/**
* The [[Partition]] used by [[ShuffledRowRDD]].
*/
@@ -70,7 +107,7 @@ class ShuffledColumnarRDD(
override def getPreferredLocations(partition: Partition): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
partition.asInstanceOf[ShuffledColumnarRDDPartition].spec match {
- case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
+ case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
startReducerIndex.until(endReducerIndex).flatMap { reducerIndex =>
tracker.getPreferredLocationsForShuffle(dependency, reducerIndex)
}
@@ -80,6 +117,9 @@ class ShuffledColumnarRDD(
case PartialMapperPartitionSpec(mapIndex, _, _) =>
tracker.getMapLocation(dependency, mapIndex, mapIndex + 1)
+
+ case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) =>
+ tracker.getMapLocation(dependency, startMapIndex, endMapIndex)
}
}
@@ -89,7 +129,7 @@ class ShuffledColumnarRDD(
// as well as the `tempMetrics` for basic shuffle metrics.
val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
val reader = split.asInstanceOf[ShuffledColumnarRDDPartition].spec match {
- case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
+ case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
startReducerIndex,
@@ -116,7 +156,22 @@ class ShuffledColumnarRDD(
endReducerIndex,
context,
sqlMetricsReporter)
+
+ case CoalescedMapperPartitionSpec(startMapIndex, endMapIndex, numReducers) =>
+ SparkEnv.get.shuffleManager.getReader(
+ dependency.shuffleHandle,
+ startMapIndex,
+ endMapIndex,
+ 0,
+ numReducers,
+ context,
+ sqlMetricsReporter)
}
reader.read().asInstanceOf[Iterator[Product2[Int, ColumnarBatch]]].map(_._2)
}
+
+ override def clearDependencies(): Unit = {
+ super.clearDependencies()
+ dependency = null
+ }
}
\ No newline at end of file
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
new file mode 100644
index 000000000..004296200
--- /dev/null
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase
+import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, LOGICAL_QUERY_STAGE, TRUE_OR_FALSE_LITERAL}
+import org.apache.spark.sql.execution.ColumnarHashedRelation
+import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
+import org.apache.spark.sql.execution.exchange.{REPARTITION_BY_COL, REPARTITION_BY_NUM, ShuffleExchangeLike}
+import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys
+
+/**
+ * This rule runs in the AQE optimizer and optimizes more cases
+ * compared to [[PropagateEmptyRelationBase]]:
+ * 1. Join is single column NULL-aware anti join (NAAJ)
+ * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. Eliminate join to an
+ * empty [[LocalRelation]].
+ */
+object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase {
+ override protected def isEmpty(plan: LogicalPlan): Boolean =
+ super.isEmpty(plan) || (!isRootRepartition(plan) && getEstimatedRowCount(plan).contains(0))
+
+ override protected def nonEmpty(plan: LogicalPlan): Boolean =
+ super.nonEmpty(plan) || getEstimatedRowCount(plan).exists(_ > 0)
+
+ private def isRootRepartition(plan: LogicalPlan): Boolean = plan match {
+ case l: LogicalQueryStage if l.getTagValue(ROOT_REPARTITION).isDefined => true
+ case _ => false
+ }
+
+ // The returned value follows:
+ // - 0 means the plan must produce 0 row
+ // - positive value means an estimated row count which can be over-estimated
+ // - none means the plan has not materialized or the plan can not be estimated
+ private def getEstimatedRowCount(plan: LogicalPlan): Option[BigInt] = plan match {
+ case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized =>
+ stage.getRuntimeStatistics.rowCount
+
+ case LogicalQueryStage(_, agg: BaseAggregateExec) if agg.groupingExpressions.nonEmpty &&
+ agg.child.isInstanceOf[QueryStageExec] =>
+ val stage = agg.child.asInstanceOf[QueryStageExec]
+ if (stage.isMaterialized) {
+ stage.getRuntimeStatistics.rowCount
+ } else {
+ None
+ }
+
+ case _ => None
+ }
+
+ private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan match {
+ case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.isMaterialized =>
+ if (stage.broadcast.supportsColumnar) {
+ val colRelation = stage.broadcast.relationFuture.get().value.asInstanceOf[ColumnarHashedRelation]
+ colRelation.relation == HashedRelationWithAllNullKeys
+ } else {
+ stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys
+ }
+ case _ => false
+ }
+
+ private def eliminateSingleColumnNullAwareAntiJoin: PartialFunction[LogicalPlan, LogicalPlan] = {
+ case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys(j.right) =>
+ empty(j)
+ }
+
+ override protected def userSpecifiedRepartition(p: LogicalPlan): Boolean = p match {
+ case LogicalQueryStage(_, ShuffleQueryStageExec(_, shuffle: ShuffleExchangeLike, _))
+ if shuffle.shuffleOrigin == REPARTITION_BY_COL ||
+ shuffle.shuffleOrigin == REPARTITION_BY_NUM => true
+ case _ => false
+ }
+
+ override protected def applyInternal(p: LogicalPlan): LogicalPlan = p.transformUpWithPruning(
+ // LOCAL_RELATION and TRUE_OR_FALSE_LITERAL pattern are matched at
+ // `PropagateEmptyRelationBase.commonApplyFunc`
+ // LOGICAL_QUERY_STAGE pattern is matched at `PropagateEmptyRelationBase.commonApplyFunc`
+ // and `AQEPropagateEmptyRelation.eliminateSingleColumnNullAwareAntiJoin`
+ // Note that, We can not specify ruleId here since the LogicalQueryStage is not immutable.
+ _.containsAnyPattern(LOGICAL_QUERY_STAGE, LOCAL_RELATION, TRUE_OR_FALSE_LITERAL)) {
+ eliminateSingleColumnNullAwareAntiJoin.orElse(commonApplyFunc)
+ }
+}
\ No newline at end of file
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala
index d34b93e5b..be4efd90c 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.adaptive
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -36,7 +37,7 @@ import scala.collection.mutable.ArrayBuffer
* node during canonicalization.
* @param partitionSpecs The partition specs that defines the arrangement.
*/
-case class ColumnarCustomShuffleReaderExec(
+case class OmniAQEShuffleReadExec(
child: SparkPlan,
partitionSpecs: Seq[ShufflePartitionSpec])
extends UnaryExecNode {
@@ -57,9 +58,9 @@ case class ColumnarCustomShuffleReaderExec(
partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size ==
partitionSpecs.length) {
child match {
- case ShuffleQueryStageExec(_, s: ShuffleExchangeLike) =>
+ case ShuffleQueryStageExec(_, s: ShuffleExchangeLike, _) =>
s.child.outputPartitioning
- case ShuffleQueryStageExec(_, r @ ReusedExchangeExec(_, s: ShuffleExchangeLike)) =>
+ case ShuffleQueryStageExec(_, r @ ReusedExchangeExec(_, s: ShuffleExchangeLike), _) =>
s.child.outputPartitioning match {
case e: Expression => r.updateAttr(e).asInstanceOf[Partitioning]
case other => other
@@ -67,13 +68,34 @@ case class ColumnarCustomShuffleReaderExec(
case _ =>
throw new IllegalStateException("operating on canonicalization plan")
}
+ } else if (isCoalescedRead) {
+ // For coalesced shuffle read, the data distribution is not changed, only the number of
+ // partitions is changed.
+ child.outputPartitioning match {
+ case h: HashPartitioning =>
+ CurrentOrigin.withOrigin(h.origin)(h.copy(numPartitions = partitionSpecs.length))
+ case r: RangePartitioning =>
+ CurrentOrigin.withOrigin(r.origin)(r.copy(numPartitions = partitionSpecs.length))
+ // This can only happen for `REBALANCE_PARTITIONS_BY_NONE`, which uses
+ // `RoundRobinPartitioning` but we don't need to retain the number of partitions.
+ case r: RoundRobinPartitioning =>
+ r.copy(numPartitions = partitionSpecs.length)
+ case other @ SinglePartition =>
+ throw new IllegalStateException(
+ "Unexpected partitioning for coalesced shuffle read: " + other)
+ case _ =>
+ // Spark plugins may have custom partitioning and may replace this operator
+ // during the postStageOptimization phase, so return UnknownPartitioning here
+ // rather than throw an exception
+ UnknownPartitioning(partitionSpecs.length)
+ }
} else {
UnknownPartitioning(partitionSpecs.length)
}
}
override def stringArgs: Iterator[Any] = {
- val desc = if (isLocalReader) {
+ val desc = if (isLocalRead) {
"local"
} else if (hasCoalescedPartition && hasSkewedPartition) {
"coalesced and skewed"
@@ -87,14 +109,38 @@ case class ColumnarCustomShuffleReaderExec(
Iterator(desc)
}
- def hasCoalescedPartition: Boolean =
- partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+ /**
+ * Returns true iff some partitions were actually combined
+ */
+ private def isCoalescedSpec(spec: ShufflePartitionSpec) = spec match {
+ case CoalescedPartitionSpec(0, 0, _) => true
+ case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1
+ case _ => false
+ }
+
+ /**
+ * Returns true iff some non-empty partitions were combined
+ */
+ def hasCoalescedPartition: Boolean = {
+ partitionSpecs.exists(isCoalescedSpec)
+ }
def hasSkewedPartition: Boolean =
partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
- def isLocalReader: Boolean =
- partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])
+ def isLocalRead: Boolean =
+ partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) ||
+ partitionSpecs.exists(_.isInstanceOf[CoalescedMapperPartitionSpec])
+
+ def isCoalescedRead: Boolean = {
+ partitionSpecs.sliding(2).forall {
+ // A single partition spec which is `CoalescedPartitionSpec` also means coalesced read.
+ case Seq(_: CoalescedPartitionSpec) => true
+ case Seq(l: CoalescedPartitionSpec, r: CoalescedPartitionSpec) =>
+ l.endReducerIndex <= r.startReducerIndex
+ case _ => false
+ }
+ }
private def shuffleStage = child match {
case stage: ShuffleQueryStageExec => Some(stage)
@@ -102,13 +148,13 @@ case class ColumnarCustomShuffleReaderExec(
}
@transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
- if (partitionSpecs.nonEmpty && !isLocalReader && shuffleStage.get.mapStats.isDefined) {
- val bytesByPartitionId = shuffleStage.get.mapStats.get.bytesByPartitionId
+ if (!isLocalRead && shuffleStage.get.mapStats.isDefined) {
Some(partitionSpecs.map {
- case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
- startReducerIndex.until(endReducerIndex).map(bytesByPartitionId).sum
+ case p: CoalescedPartitionSpec =>
+ assert(p.dataSize.isDefined)
+ p.dataSize.get
case p: PartialReducerPartitionSpec => p.dataSize
- case p => throw new IllegalStateException("unexpected " + p)
+ case p => throw new IllegalStateException(s"unexpected $p")
})
} else {
None
@@ -141,6 +187,13 @@ case class ColumnarCustomShuffleReaderExec(
driverAccumUpdates += (skewedSplits.id -> numSplits)
}
+ if (hasCoalescedPartition) {
+ val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions")
+ val x = partitionSpecs.count(isCoalescedSpec)
+ numCoalescedPartitionsMetric.set(x)
+ driverAccumUpdates += numCoalescedPartitionsMetric.id -> x
+ }
+
partitionDataSizes.foreach { dataSizes =>
val partitionDataSizeMetrics = metrics("partitionDataSize")
driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _)
@@ -154,8 +207,8 @@ case class ColumnarCustomShuffleReaderExec(
@transient override lazy val metrics: Map[String, SQLMetric] = {
if (shuffleStage.isDefined) {
Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions")) ++ {
- if (isLocalReader) {
- // We split the mapper partition evenly when creating local shuffle reader, so no
+ if (isLocalRead) {
+ // We split the mapper partition evenly when creating local shuffle read, so no
// data size info is available.
Map.empty
} else {
@@ -171,6 +224,13 @@ case class ColumnarCustomShuffleReaderExec(
} else {
Map.empty
}
+ } ++ {
+ if (hasCoalescedPartition) {
+ Map("numCoalescedPartitions" ->
+ SQLMetrics.createMetric(sparkContext, "number of coalesced partitions"))
+ } else {
+ Map.empty
+ }
}
} else {
// It's a canonicalized plan, no need to report metrics.
@@ -178,24 +238,19 @@ case class ColumnarCustomShuffleReaderExec(
}
}
- private var cachedShuffleRDD: RDD[ColumnarBatch] = null
-
private lazy val shuffleRDD: RDD[_] = {
- sendDriverMetrics()
- if (cachedShuffleRDD == null) {
- cachedShuffleRDD = child match {
- case stage: ShuffleQueryStageExec =>
- new ShuffledColumnarRDD(
- stage.shuffle
- .asInstanceOf[ColumnarShuffleExchangeExec]
- .columnarShuffleDependency,
- stage.shuffle.asInstanceOf[ColumnarShuffleExchangeExec].readMetrics,
- partitionSpecs.toArray)
- case _ =>
- throw new IllegalStateException("operating on canonicalized plan")
- }
+ shuffleStage match {
+ case Some(stage) =>
+ sendDriverMetrics()
+ new ShuffledColumnarRDD(
+ stage.shuffle
+ .asInstanceOf[ColumnarShuffleExchangeExec]
+ .columnarShuffleDependency,
+ stage.shuffle.asInstanceOf[ColumnarShuffleExchangeExec].readMetrics,
+ partitionSpecs.toArray)
+ case _ =>
+ throw new IllegalStateException("operating on canonicalized plan")
}
- cachedShuffleRDD
}
override protected def doExecute(): RDD[InternalRow] = {
@@ -205,4 +260,7 @@ case class ColumnarCustomShuffleReaderExec(
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
shuffleRDD.asInstanceOf[RDD[ColumnarBatch]]
}
+
+ override protected def withNewChildInternal(newChild: SparkPlan): OmniAQEShuffleReadExec =
+ new OmniAQEShuffleReadExec(newChild, this.partitionSpecs)
}
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala
deleted file mode 100644
index 4edf0f4f8..000000000
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.adaptive
-
-import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
-import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi}
-import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.ColumnarHashedRelation
-import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation, HashedRelationWithAllNullKeys}
-
-/**
- * This optimization rule detects and converts a Join to an empty [[LocalRelation]]:
- * 1. Join is single column NULL-aware anti join (NAAJ), and broadcasted [[HashedRelation]]
- * is [[HashedRelationWithAllNullKeys]].
- *
- * 2. Join is inner or left semi join, and broadcasted [[HashedRelation]]
- * is [[EmptyHashedRelation]].
- * This applies to all Joins (sort merge join, shuffled hash join, and broadcast hash join),
- * because sort merge join and shuffled hash join will be changed to broadcast hash join with AQE
- * at the first place.
- */
-object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] {
-
- private def canEliminate(plan: LogicalPlan, relation: HashedRelation): Boolean = plan match {
- case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined
- && stage.broadcast.relationFuture.get().value == relation => true
- case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined
- && stage.broadcast.supportsColumnar => {
- val cr = stage.broadcast.relationFuture.get().value.asInstanceOf[ColumnarHashedRelation]
- cr.relation == relation
- }
- case _ => false
- }
-
- def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
- case j @ ExtractSingleColumnNullAwareAntiJoin(_, _)
- if canEliminate(j.right, HashedRelationWithAllNullKeys) =>
- LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
-
- case j @ Join(_, _, Inner, _, _) if canEliminate(j.left, EmptyHashedRelation) ||
- canEliminate(j.right, EmptyHashedRelation) =>
- LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
-
- case j @ Join(_, _, LeftSemi, _, _) if canEliminate(j.right, EmptyHashedRelation) =>
- LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
- }
-}
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
deleted file mode 100644
index c9a0dcbbf..000000000
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan}
-import org.apache.spark.sql.types.StructType
-
-/**
- * Prune the partitions of file source based table using partition filters. Currently, this rule
- * is applied to [[HadoopFsRelation]] with [[CatalogFileIndex]] and [[DataSourceV2ScanRelation]]
- * with [[FileScan]].
- *
- * For [[HadoopFsRelation]], the location will be replaced by pruned file index, and corresponding
- * statistics will be updated. And the partition filters will be kept in the filters of returned
- * logical plan.
- *
- * For [[DataSourceV2ScanRelation]], both partition filters and data filters will be added to
- * its underlying [[FileScan]]. And the partition filters will be removed in the filters of
- * returned logical plan.
- */
-private[sql] object PruneFileSourcePartitions
- extends Rule[LogicalPlan] with PredicateHelper {
-
- private def getPartitionKeyFiltersAndDataFilters(
- sparkSession: SparkSession,
- relation: LeafNode,
- partitionSchema: StructType,
- filters: Seq[Expression],
- output: Seq[AttributeReference]): (ExpressionSet, Seq[Expression]) = {
- val normalizedFilters = DataSourceStrategy.normalizeExprs(
- filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), output)
- val partitionColumns =
- relation.resolve(partitionSchema, sparkSession.sessionState.analyzer.resolver)
- val partitionSet = AttributeSet(partitionColumns)
- val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
- f.references.subsetOf(partitionSet)
- )
- val extraPartitionFilter =
- dataFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionSet))
-
- (ExpressionSet(partitionFilters ++ extraPartitionFilter), dataFilters)
- }
-
- private def rebuildPhysicalOperation(
- projects: Seq[NamedExpression],
- filters: Seq[Expression],
- relation: LeafNode): Project = {
- val withFilter = if (filters.nonEmpty) {
- val filterExpression = filters.reduceLeft(And)
- Filter(filterExpression, relation)
- } else {
- relation
- }
- Project(projects, withFilter)
- }
-
- override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
- case op @ PhysicalOperation(projects, filters,
- logicalRelation @
- LogicalRelation(fsRelation @
- HadoopFsRelation(
- catalogFileIndex: CatalogFileIndex,
- partitionSchema,
- _,
- _,
- _,
- _),
- _,
- _,
- _))
- if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined =>
- val (partitionKeyFilters, _) = getPartitionKeyFiltersAndDataFilters(
- fsRelation.sparkSession, logicalRelation, partitionSchema, filters,
- logicalRelation.output)
- // Fix spark issue SPARK-34119(row 104-113)
- if (partitionKeyFilters.nonEmpty) {
- val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
- val prunedFsRelation =
- fsRelation.copy(location = prunedFileIndex)(fsRelation.sparkSession)
- // Change table stats based on the sizeInBytes of pruned files
- val filteredStats =
- FilterEstimation(Filter(partitionKeyFilters.reduce(And), logicalRelation)).estimate
- val colStats = filteredStats.map(_.attributeStats.map { case (attr, colStat) =>
- (attr.name, colStat.toCatalogColumnStat(attr.name, attr.dataType))
- })
- val withStats = logicalRelation.catalogTable.map(_.copy(
- stats = Some(CatalogStatistics(
- sizeInBytes = BigInt(prunedFileIndex.sizeInBytes),
- rowCount = filteredStats.flatMap(_.rowCount),
- colStats = colStats.getOrElse(Map.empty)))))
- val prunedLogicalRelation = logicalRelation.copy(
- relation = prunedFsRelation, catalogTable = withStats)
- // Keep partition-pruning predicates so that they are visible in physical planning
- rebuildPhysicalOperation(projects, filters, prunedLogicalRelation)
- } else {
- op
- }
-
- case op @ PhysicalOperation(projects, filters,
- v2Relation @ DataSourceV2ScanRelation(_, scan: FileScan, output))
- if filters.nonEmpty && scan.readDataSchema.nonEmpty =>
- val (partitionKeyFilters, dataFilters) =
- getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation,
- scan.readPartitionSchema, filters, output)
- // The dataFilters are pushed down only once
- if (partitionKeyFilters.nonEmpty || (dataFilters.nonEmpty && scan.dataFilters.isEmpty)) {
- val prunedV2Relation =
- v2Relation.copy(scan = scan.withFilters(partitionKeyFilters.toSeq, dataFilters))
- // The pushed down partition filters don't need to be reevaluated.
- val afterScanFilters =
- ExpressionSet(filters) -- partitionKeyFilters.filter(_.references.nonEmpty)
- rebuildPhysicalOperation(projects, afterScanFilters.toSeq, prunedV2Relation)
- } else {
- op
- }
- }
-}
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala
index 0e5a7eae6..7325635ff 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OmniOrcFileFormat.scala
@@ -82,18 +82,17 @@ class OmniOrcFileFormat extends FileFormat with DataSourceRegister with Serializ
val fs = filePath.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val resultedColPruneInfo =
- Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader =>
- OrcUtils.requestedColumnIds(
- isCaseSensitive, dataSchema, requiredSchema, reader, conf)
- }
+ val orcSchema =
+ Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions))(_.getSchema)
+ val resultedColPruneInfo = OrcUtils.requestedColumnIds(
+ isCaseSensitive, dataSchema, requiredSchema, orcSchema, conf)
if (resultedColPruneInfo.isEmpty) {
Iterator.empty
} else {
// ORC predicate pushdown
- if (orcFilterPushDown) {
- OrcUtils.readCatalystSchema(filePath, conf, ignoreCorruptFiles).foreach {
+ if (orcFilterPushDown && filters.nonEmpty) {
+ OrcUtils.readCatalystSchema(filePath, conf, ignoreCorruptFiles).foreach {
fileSchema => OrcFilters.createFilter(fileSchema, filters).foreach { f =>
OrcInputFormat.setSearchArgument(conf, f, fileSchema.fieldNames)
}
@@ -107,6 +106,8 @@ class OmniOrcFileFormat extends FileFormat with DataSourceRegister with Serializ
"[BUG] requested column IDs do not match required schema")
val taskConf = new Configuration(conf)
+ val includeColumns = requestedColIds.filter(_ != -1).sorted.mkString(",")
+ taskConf.set(OrcConf.INCLUDE_COLUMNS.getAttribute, includeColumns)
val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
deleted file mode 100644
index 3392caa54..000000000
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.orc
-
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util.Locale
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription, Writer}
-
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils}
-import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
-import org.apache.spark.sql.types._
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-object OrcUtils extends Logging {
-
- // The extensions for ORC compression codecs
- val extensionsForCompressionCodecNames = Map(
- "NONE" -> "",
- "SNAPPY" -> ".snappy",
- "ZLIB" -> ".zlib",
- "LZO" -> ".lzo",
- "ZSTD" -> ".zstd",
- "ZSTD_JNI" -> ".zstd_jni")
-
- def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
- val origPath = new Path(pathStr)
- val fs = origPath.getFileSystem(conf)
- val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
- .filterNot(_.isDirectory)
- .map(_.getPath)
- .filterNot(_.getName.startsWith("_"))
- .filterNot(_.getName.startsWith("."))
- paths
- }
-
- def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean)
- : Option[TypeDescription] = {
- val fs = file.getFileSystem(conf)
- val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- try {
- val schema = Utils.tryWithResource(OrcFile.createReader(file, readerOptions)) { reader =>
- reader.getSchema
- }
- if (schema.getFieldNames.isEmpty) {
- None
- } else {
- Some(schema)
- }
- } catch {
- case e: org.apache.orc.FileFormatException =>
- if (ignoreCorruptFiles) {
- logWarning(s"Skipped the footer in the corrupted file", e)
- None
- } else {
- throw new SparkException(s"Could not read footer for file", e)
- }
- }
- }
-
- private def toCatalystSchema(schema: TypeDescription): StructType = {
- // The Spark query engine has not completely supported CHAR/VARCHAR type yet, and here we
- // replace the orc CHAR/VARCHAR with STRING type.
- CharVarcharUtils.replaceCharVarcharWithStringInSchema(
- CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType])
- }
-
- def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String])
- : Option[StructType] = {
- val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
- val conf = sparkSession.sessionState.newHadoopConfWithOptions(options)
- files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
- case Some(schema) =>
- logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
- toCatalystSchema(schema)
- }
- }
-
- def readCatalystSchema(
- file: Path,
- conf: Configuration,
- ignoreCorruptFiles: Boolean): Option[StructType] = {
- readSchema(file, conf, ignoreCorruptFiles) match {
- case Some(schema) => Some(toCatalystSchema(schema))
-
- case None =>
- // Field names is empty or `FileFormatException` was thrown but ignoreCorruptFiles is true.
- None
- }
- }
-
- /**
- * Reads ORC file schemas in multi-threaded manner, using native version of ORC.
- * This is visible for testing.
- */
- def readOrcSchemasInParallel(
- files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean): Seq[StructType] = {
- ThreadUtils.parmap(files, "readingOrcSchemas", 8) { currentFile =>
- OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles).map(toCatalystSchema)
- }.flatten
- }
-
- def inferSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String])
- : Option[StructType] = {
- val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
- if (orcOptions.mergeSchema) {
- SchemaMergeUtils.mergeSchemasInParallel(
- sparkSession, options, files, OrcUtils.readOrcSchemasInParallel)
- } else {
- OrcUtils.readSchema(sparkSession, files, options)
- }
- }
-
- /**
- * @return Returns the combination of requested column ids from the given ORC file and
- * boolean flag to find if the pruneCols is allowed or not. Requested Column id can be
- * -1, which means the requested column doesn't exist in the ORC file. Returns None
- * if the given ORC file is empty.
- */
- def requestedColumnIds(
- isCaseSensitive: Boolean,
- dataSchema: StructType,
- requiredSchema: StructType,
- reader: Reader,
- conf: Configuration): Option[(Array[Int], Boolean)] = {
- val orcFieldNames = reader.getSchema.getFieldNames.asScala
- if (orcFieldNames.isEmpty) {
- // SPARK-8501: Some old empty ORC files always have an empty schema stored in their footer.
- None
- } else {
- if (orcFieldNames.forall(_.startsWith("_col"))) {
- // This is a ORC file written by Hive, no field names in the physical schema, assume the
- // physical schema maps to the data scheme by index.
- assert(orcFieldNames.length <= dataSchema.length, "The given data schema " +
- s"${dataSchema.catalogString} has less fields than the actual ORC physical schema, " +
- "no idea which columns were dropped, fail to read.")
- // for ORC file written by Hive, no field names
- // in the physical schema, there is a need to send the
- // entire dataSchema instead of required schema.
- // So pruneCols is not done in this case
- Some(requiredSchema.fieldNames.map { name =>
- val index = dataSchema.fieldIndex(name)
- if (index < orcFieldNames.length) {
- index
- } else {
- -1
- }
- }, false)
- } else {
- if (isCaseSensitive) {
- Some(requiredSchema.fieldNames.zipWithIndex.map { case (name, idx) =>
- if (orcFieldNames.indexWhere(caseSensitiveResolution(_, name)) != -1) {
- idx
- } else {
- -1
- }
- }, true)
- } else {
- // Do case-insensitive resolution only if in case-insensitive mode
- val caseInsensitiveOrcFieldMap = orcFieldNames.groupBy(_.toLowerCase(Locale.ROOT))
- Some(requiredSchema.fieldNames.zipWithIndex.map { case (requiredFieldName, idx) =>
- caseInsensitiveOrcFieldMap
- .get(requiredFieldName.toLowerCase(Locale.ROOT))
- .map { matchedOrcFields =>
- if (matchedOrcFields.size > 1) {
- // Need to fail if there is ambiguity, i.e. more than one field is matched.
- val matchedOrcFieldsString = matchedOrcFields.mkString("[", ", ", "]")
- reader.close()
- throw new RuntimeException(s"""Found duplicate field(s) "$requiredFieldName": """
- + s"$matchedOrcFieldsString in case-insensitive mode")
- } else {
- idx
- }
- }.getOrElse(-1)
- }, true)
- }
- }
- }
- }
-
- /**
- * Add a metadata specifying Spark version.
- */
- def addSparkVersionMetadata(writer: Writer): Unit = {
- writer.addUserMetadata(SPARK_VERSION_METADATA_KEY, UTF_8.encode(SPARK_VERSION_SHORT))
- }
-
- /**
- * Given a `StructType` object, this methods converts it to corresponding string representation
- * in ORC.
- */
- def orcTypeDescriptionString(dt: DataType): String = dt match {
- case s: StructType =>
- val fieldTypes = s.fields.map { f =>
- s"${quoteIdentifier(f.name)}:${orcTypeDescriptionString(f.dataType)}"
- }
- s"struct<${fieldTypes.mkString(",")}>"
- case a: ArrayType =>
- s"array<${orcTypeDescriptionString(a.elementType)}>"
- case m: MapType =>
- s"map<${orcTypeDescriptionString(m.keyType)},${orcTypeDescriptionString(m.valueType)}>"
- case _ => dt.catalogString
- }
-
- /**
- * Returns the result schema to read from ORC file. In addition, It sets
- * the schema string to 'orc.mapred.input.schema' so ORC reader can use later.
- *
- * @param canPruneCols Flag to decide whether pruned cols schema is send to resultSchema
- * or to send the entire dataSchema to resultSchema.
- * @param dataSchema Schema of the orc files.
- * @param resultSchema Result data schema created after pruning cols.
- * @param partitionSchema Schema of partitions.
- * @param conf Hadoop Configuration.
- * @return Returns the result schema as string.
- */
- def orcResultSchemaString(
- canPruneCols: Boolean,
- dataSchema: StructType,
- resultSchema: StructType,
- partitionSchema: StructType,
- conf: Configuration): String = {
- val resultSchemaString = if (canPruneCols) {
- OrcUtils.orcTypeDescriptionString(resultSchema)
- } else {
- OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields))
- }
- OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
- resultSchemaString
- }
-}
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala
index a2ee977f9..2c1271fb0 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarBroadcastHashJoinExec.scala
@@ -97,6 +97,9 @@ case class ColumnarBroadcastHashJoinExec(
override def nodeName: String = "OmniColumnarBroadcastHashJoin"
+ override protected def withNewChildrenInternal(newLeft: SparkPlan, newRight: SparkPlan):
+ ColumnarBroadcastHashJoinExec = copy(left = newLeft, right = newRight)
+
override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildBoundKeys, isNullAwareAntiJoin)
buildSide match {
@@ -109,7 +112,7 @@ case class ColumnarBroadcastHashJoinExec(
override lazy val outputPartitioning: Partitioning = {
joinType match {
- case _: InnerLike if sqlContext.conf.broadcastHashJoinOutputPartitioningExpandLimit > 0 =>
+ case _: InnerLike if session.sqlContext.conf.broadcastHashJoinOutputPartitioningExpandLimit > 0 =>
streamedPlan.outputPartitioning match {
case h: HashPartitioning => expandOutputPartitioning(h)
case c: PartitioningCollection => expandOutputPartitioning(c)
@@ -150,7 +153,7 @@ case class ColumnarBroadcastHashJoinExec(
// Seq("a", "b", "c"), Seq("a", "b", "y"), Seq("a", "x", "c"), Seq("a", "x", "y").
// The expanded expressions are returned as PartitioningCollection.
private def expandOutputPartitioning(partitioning: HashPartitioning): PartitioningCollection = {
- val maxNumCombinations = sqlContext.conf.broadcastHashJoinOutputPartitioningExpandLimit
+ val maxNumCombinations = session.sqlContext.conf.broadcastHashJoinOutputPartitioningExpandLimit
var currentNumCombinations = 0
def generateExprCombinations(
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala
index 9eb666fcc..263af0ddb 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarShuffledHashJoinExec.scala
@@ -50,7 +50,8 @@ case class ColumnarShuffledHashJoinExec(
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
- right: SparkPlan)
+ right: SparkPlan,
+ isSkewJoin: Boolean)
extends HashJoin with ShuffledJoin {
override lazy val metrics = Map(
@@ -81,6 +82,9 @@ case class ColumnarShuffledHashJoinExec(
override def outputPartitioning: Partitioning = super[ShuffledJoin].outputPartitioning
+ override protected def withNewChildrenInternal(newLeft: SparkPlan, newRight: SparkPlan):
+ ColumnarShuffledHashJoinExec = copy(left = newLeft, right = newRight)
+
override def outputOrdering: Seq[SortOrder] = joinType match {
case FullOuter => Nil
case _ => super.outputOrdering
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala
index 59b763428..d55af2d9d 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala
@@ -68,6 +68,12 @@ class ColumnarSortMergeJoinExec(
if (isSkewJoin) "OmniColumnarSortMergeJoin(skew=true)" else "OmniColumnarSortMergeJoin"
}
+ override protected def withNewChildrenInternal(newLeft: SparkPlan,
+ newRight: SparkPlan): ColumnarSortMergeJoinExec = {
+ new ColumnarSortMergeJoinExec(this.leftKeys, this.rightKeys, this.joinType,
+ this.condition, newLeft, newRight, this.isSkewJoin)
+ }
+
val SMJ_NEED_ADD_STREAM_TBL_DATA = 2
val SMJ_NEED_ADD_BUFFERED_TBL_DATA = 3
val SCAN_FINISH = 4
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
deleted file mode 100644
index 0503b2b7b..000000000
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.hadoop.hive.common.StatsSetupConst
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.analysis.CastSupport
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, ExpressionSet, PredicateHelper, SubqueryExpression}
-import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-
-/**
- * Prune hive table partitions using partition filters on [[HiveTableRelation]]. The pruned
- * partitions will be kept in [[HiveTableRelation.prunedPartitions]], and the statistics of
- * the hive table relation will be updated based on pruned partitions.
- *
- * This rule is executed in optimization phase, so the statistics can be updated before physical
- * planning, which is useful for some spark strategy, e.g.
- * [[org.apache.spark.sql.execution.SparkStrategies.JoinSelection]].
- *
- * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source.
- */
-private[sql] class PruneHiveTablePartitions(session: SparkSession)
- extends Rule[LogicalPlan] with CastSupport with PredicateHelper {
-
- /**
- * Extract the partition filters from the filters on the table.
- */
- private def getPartitionKeyFilters(
- filters: Seq[Expression],
- relation: HiveTableRelation): ExpressionSet = {
- val normalizedFilters = DataSourceStrategy.normalizeExprs(
- filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), relation.output)
- val partitionColumnSet = AttributeSet(relation.partitionCols)
- ExpressionSet(
- normalizedFilters.flatMap(extractPredicatesWithinOutputSet(_, partitionColumnSet)))
- }
-
- /**
- * Prune the hive table using filters on the partitions of the table.
- */
- private def prunePartitions(
- relation: HiveTableRelation,
- partitionFilters: ExpressionSet): Seq[CatalogTablePartition] = {
- if (conf.metastorePartitionPruning) {
- session.sessionState.catalog.listPartitionsByFilter(
- relation.tableMeta.identifier, partitionFilters.toSeq)
- } else {
- ExternalCatalogUtils.prunePartitionsByFilter(relation.tableMeta,
- session.sessionState.catalog.listPartitions(relation.tableMeta.identifier),
- partitionFilters.toSeq, conf.sessionLocalTimeZone)
- }
- }
-
- /**
- * Update the statistics of the table.
- */
- private def updateTableMeta(
- relation: HiveTableRelation,
- prunedPartitions: Seq[CatalogTablePartition],
- partitionKeyFilters: ExpressionSet): CatalogTable = {
- val sizeOfPartitions = prunedPartitions.map { partition =>
- val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
- val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
- if (rawDataSize.isDefined && rawDataSize.get > 0) {
- rawDataSize.get
- } else if (totalSize.isDefined && totalSize.get > 0L) {
- totalSize.get
- } else {
- 0L
- }
- }
- // Fix spark issue SPARK-34119(row 95-106)
- if (sizeOfPartitions.forall(_ > 0)) {
- val filteredStats =
- FilterEstimation(Filter(partitionKeyFilters.reduce(And), relation)).estimate
- val colStats = filteredStats.map(_.attributeStats.map { case (attr, colStat) =>
- (attr.name, colStat.toCatalogColumnStat(attr.name, attr.dataType))
- })
- relation.tableMeta.copy(
- stats = Some(CatalogStatistics(
- sizeInBytes = BigInt(sizeOfPartitions.sum),
- rowCount = filteredStats.flatMap(_.rowCount),
- colStats = colStats.getOrElse(Map.empty))))
- } else {
- relation.tableMeta
- }
- }
-
- override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case op @ PhysicalOperation(projections, filters, relation: HiveTableRelation)
- if filters.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty =>
- val partitionKeyFilters = getPartitionKeyFilters(filters, relation)
- if (partitionKeyFilters.nonEmpty) {
- val newPartitions = prunePartitions(relation, partitionKeyFilters)
- // Fix spark issue SPARK-34119(row 117)
- val newTableMeta = updateTableMeta(relation, newPartitions, partitionKeyFilters)
- val newRelation = relation.copy(
- tableMeta = newTableMeta, prunedPartitions = Some(newPartitions))
- // Keep partition filters so that they are visible in physical planning
- Project(projections, Filter(filters.reduceLeft(And), newRelation))
- } else {
- op
- }
- }
-}
diff --git a/omnioperator/omniop-spark-extension/pom.xml b/omnioperator/omniop-spark-extension/pom.xml
index 026fc5997..c95b391b0 100644
--- a/omnioperator/omniop-spark-extension/pom.xml
+++ b/omnioperator/omniop-spark-extension/pom.xml
@@ -8,14 +8,14 @@
com.huawei.kunpeng
boostkit-omniop-spark-parent
pom
- 3.1.1-1.1.0
+ 3.3.1-1.1.0
BoostKit Spark Native Sql Engine Extension Parent Pom
2.12.10
2.12
- 3.1.1
+ 3.3.1
3.2.2
UTF-8
UTF-8
@@ -55,6 +55,18 @@
org.apache.curator
curator-recipes
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
@@ -101,6 +113,20 @@
${omniruntime.version}
aarch64
provided
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
com.google.protobuf
@@ -124,6 +150,18 @@
org.apache.curator
curator-recipes
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
--
Gitee
From e96b2ab508da58f1900689cd296443ae5203487e Mon Sep 17 00:00:00 2001
From: chenyidao <979136761@qq.com>
Date: Tue, 21 Mar 2023 17:55:49 +0800
Subject: [PATCH 2/2] solve assert fail question for aqe
---
.../scala/org/apache/spark/sql/execution/ColumnarExec.scala | 2 --
1 file changed, 2 deletions(-)
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala
index d6ff2b40a..47a59336e 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala
@@ -298,8 +298,6 @@ case class RowToOmniColumnarExec(child: SparkPlan) extends RowToColumnarTransiti
case class OmniColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition {
- assert(child.supportsColumnar)
-
override def nodeName: String = "OmniColumnarToRow"
override def output: Seq[Attribute] = child.output
--
Gitee