diff --git a/omnioperator/omniop-native-reader/java/pom.xml b/omnioperator/omniop-native-reader/java/pom.xml
index 8a40c93eac15ec938a24759a532cdfeef401f894..9b3b3299f4df1170d272bb294873d779f0eed95c 100644
--- a/omnioperator/omniop-native-reader/java/pom.xml
+++ b/omnioperator/omniop-native-reader/java/pom.xml
@@ -39,7 +39,7 @@
spark-3.3
- 3.3.1
+ 3.3.3
diff --git a/omnioperator/omniop-spark-extension/pom.xml b/omnioperator/omniop-spark-extension/pom.xml
index 4f3241582730ff5b7ac2ce2b8acc6b3c02d3572d..4e51d61215550dedd97fd3622610fd27dbbee015 100644
--- a/omnioperator/omniop-spark-extension/pom.xml
+++ b/omnioperator/omniop-spark-extension/pom.xml
@@ -47,7 +47,7 @@
spark-3.3
- 3.3.1
+ 3.3.3
spark33-shim
spark33-modify
spark33-ut
diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index a6876ba80f566b630d13b4853fc1820426850e6a..e7aac8050162900f85cb9df69b33ed21f3a39d68 100644
--- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -82,6 +82,7 @@ object TreePattern extends Enumeration {
val SCALAR_SUBQUERY: Value = Value
val SCALAR_SUBQUERY_REFERENCE: Value = Value
val SCALA_UDF: Value = Value
+ val SESSION_WINDOW: Value = Value
val SORT: Value = Value
val SUBQUERY_ALIAS: Value = Value
val SUBQUERY_WRAPPER: Value = Value
diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 81bf07db7e95fad634dfe48ad955e54c8a28b1d9..c9c002f42ed6c0c25d449dc4880d833b53972f3f 100644
--- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -85,7 +85,9 @@ case class AdaptiveSparkPlanExec(
@transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
// The logical plan optimizer for re-optimizing the current logical plan.
- @transient private val optimizer = new AQEOptimizer(conf)
+ @transient private val optimizer = new AQEOptimizer(conf,
+ session.sessionState.adaptiveRulesHolder.runtimeOptimizerRules
+ )
// `EnsureRequirements` may remove user-specified repartition and assume the query plan won't
// change its output partitioning. This assumption is not true in AQE. Here we check the
@@ -124,7 +126,7 @@ case class AdaptiveSparkPlanExec(
RemoveRedundantSorts,
DisableUnnecessaryBucketedScan,
OptimizeSkewedJoin(ensureRequirements)
- ) ++ context.session.sessionState.queryStagePrepRules
+ ) ++ context.session.sessionState.adaptiveRulesHolder.queryStagePrepRules
}
// A list of physical optimizer rules to be applied to a new stage before its execution. These
@@ -224,6 +226,8 @@ case class AdaptiveSparkPlanExec(
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
}
+ def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity)
+
private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
if (isFinalPlan) return currentPhysicalPlan