From 801ef74774b6ce9f8196934353a2156f9925c3f3 Mon Sep 17 00:00:00 2001 From: s30074194 Date: Mon, 31 Mar 2025 17:51:40 +0800 Subject: [PATCH] br_feature_omnioperator_spark_Haas_Performance_330 adapts to spark3.3.3 --- omnioperator/omniop-native-reader/java/pom.xml | 2 +- omnioperator/omniop-spark-extension/pom.xml | 2 +- .../apache/spark/sql/catalyst/trees/TreePatterns.scala | 1 + .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 8 ++++++-- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/omnioperator/omniop-native-reader/java/pom.xml b/omnioperator/omniop-native-reader/java/pom.xml index 8a40c93ea..9b3b3299f 100644 --- a/omnioperator/omniop-native-reader/java/pom.xml +++ b/omnioperator/omniop-native-reader/java/pom.xml @@ -39,7 +39,7 @@ spark-3.3 - 3.3.1 + 3.3.3 diff --git a/omnioperator/omniop-spark-extension/pom.xml b/omnioperator/omniop-spark-extension/pom.xml index 4f3241582..4e51d6121 100644 --- a/omnioperator/omniop-spark-extension/pom.xml +++ b/omnioperator/omniop-spark-extension/pom.xml @@ -47,7 +47,7 @@ spark-3.3 - 3.3.1 + 3.3.3 spark33-shim spark33-modify spark33-ut diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index a6876ba80..e7aac8050 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -82,6 +82,7 @@ object TreePattern extends Enumeration { val SCALAR_SUBQUERY: Value = Value val SCALAR_SUBQUERY_REFERENCE: Value = Value val SCALA_UDF: Value = Value + val SESSION_WINDOW: Value = Value val SORT: Value = Value val SUBQUERY_ALIAS: Value = Value val SUBQUERY_WRAPPER: Value = Value diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 81bf07db7..c9c002f42 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -85,7 +85,9 @@ case class AdaptiveSparkPlanExec( @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]() // The logical plan optimizer for re-optimizing the current logical plan. - @transient private val optimizer = new AQEOptimizer(conf) + @transient private val optimizer = new AQEOptimizer(conf, + session.sessionState.adaptiveRulesHolder.runtimeOptimizerRules + ) // `EnsureRequirements` may remove user-specified repartition and assume the query plan won't // change its output partitioning. This assumption is not true in AQE. Here we check the @@ -124,7 +126,7 @@ case class AdaptiveSparkPlanExec( RemoveRedundantSorts, DisableUnnecessaryBucketedScan, OptimizeSkewedJoin(ensureRequirements) - ) ++ context.session.sessionState.queryStagePrepRules + ) ++ context.session.sessionState.adaptiveRulesHolder.queryStagePrepRules } // A list of physical optimizer rules to be applied to a new stage before its execution. These @@ -224,6 +226,8 @@ case class AdaptiveSparkPlanExec( .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) } + def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity) + private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { if (isFinalPlan) return currentPhysicalPlan -- Gitee