1 Star 0 Fork 29

zhangxingrong/spark

forked from src-openEuler/spark 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0003-Fix-correctness-issue-with-persist-using-StorageLevel.NONE-on-Dataset.patch 4.22 KB
一键复制 编辑 原始数据 按行查看 历史
zhangxingrong 提交于 2024-08-16 17:45 +08:00 . add some upstream patchs
From 6468f96ea42f6efe42033507c4e26600b751bfcc Mon Sep 17 00:00:00 2001
From: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Date: Thu, 5 Oct 2023 09:41:08 +0900
Subject: [PATCH] [SPARK-45386][SQL][3.5] Fix correctness issue with persist
using StorageLevel.NONE on Dataset
### What changes were proposed in this pull request?
Support for InMememoryTableScanExec in AQE was added in #39624, but this patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. Before that patch a query like:
```
import org.apache.spark.storage.StorageLevel
spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count()
```
would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here:
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294
that the input is empty. The problem is that the action that should make sure the statistics are collected here
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291
never use the iterator and when we have `StorageLevel.NONE` the persisting will also not use the iterator and we will not gather the correct statistics.
The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances.
### Why are the changes needed?
The current code has a correctness issue.
### Does this PR introduce _any_ user-facing change?
Yes, fixes the correctness issue.
### How was this patch tested?
New and existing unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43213 from eejbyfeldt/SPARK-45386-branch-3.5.
Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
---
.../scala/org/apache/spark/sql/execution/CacheManager.scala | 4 +++-
.../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++++++
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 064819275e004..e906c74f8a5ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -113,7 +113,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
planToCache: LogicalPlan,
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
- if (lookupCachedData(planToCache).nonEmpty) {
+ if (storageLevel == StorageLevel.NONE) {
+ // Do nothing for StorageLevel.NONE since it will not actually cache any data.
+ } else if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index c967540541a5c..6d9c43f866a0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -45,6 +45,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel
case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2)
case class TestDataPoint2(x: Int, s: String)
@@ -2535,6 +2536,11 @@ class DatasetSuite extends QueryTest
checkDataset(ds.filter(f(col("_1"))), Tuple1(ValueClass(2)))
}
+
+ test("SPARK-45386: persist with StorageLevel.NONE should give correct count") {
+ val ds = Seq(1, 2).toDS().persist(StorageLevel.NONE)
+ assert(ds.count() == 2)
+ }
}
class DatasetLargeResultCollectingSuite extends QueryTest
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangxingrong/spark.git
git@gitee.com:zhangxingrong/spark.git
zhangxingrong
spark
spark
master

搜索帮助