代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/spark 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From 6468f96ea42f6efe42033507c4e26600b751bfcc Mon Sep 17 00:00:00 2001
From: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Date: Thu, 5 Oct 2023 09:41:08 +0900
Subject: [PATCH] [SPARK-45386][SQL][3.5] Fix correctness issue with persist
using StorageLevel.NONE on Dataset
### What changes were proposed in this pull request?
Support for InMememoryTableScanExec in AQE was added in #39624, but this patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. Before that patch a query like:
```
import org.apache.spark.storage.StorageLevel
spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count()
```
would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here:
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294
that the input is empty. The problem is that the action that should make sure the statistics are collected here
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291
never use the iterator and when we have `StorageLevel.NONE` the persisting will also not use the iterator and we will not gather the correct statistics.
The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances.
### Why are the changes needed?
The current code has a correctness issue.
### Does this PR introduce _any_ user-facing change?
Yes, fixes the correctness issue.
### How was this patch tested?
New and existing unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43213 from eejbyfeldt/SPARK-45386-branch-3.5.
Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
---
.../scala/org/apache/spark/sql/execution/CacheManager.scala | 4 +++-
.../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++++++
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 064819275e004..e906c74f8a5ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -113,7 +113,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
planToCache: LogicalPlan,
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
- if (lookupCachedData(planToCache).nonEmpty) {
+ if (storageLevel == StorageLevel.NONE) {
+ // Do nothing for StorageLevel.NONE since it will not actually cache any data.
+ } else if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index c967540541a5c..6d9c43f866a0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -45,6 +45,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel
case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2)
case class TestDataPoint2(x: Int, s: String)
@@ -2535,6 +2536,11 @@ class DatasetSuite extends QueryTest
checkDataset(ds.filter(f(col("_1"))), Tuple1(ValueClass(2)))
}
+
+ test("SPARK-45386: persist with StorageLevel.NONE should give correct count") {
+ val ds = Seq(1, 2).toDS().persist(StorageLevel.NONE)
+ assert(ds.count() == 2)
+ }
}
class DatasetLargeResultCollectingSuite extends QueryTest
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。