From d470ace96a73f89d143e6a0958353786d3b2cd8d Mon Sep 17 00:00:00 2001 From: guojunfei <970763131@qq.com> Date: Thu, 1 Feb 2024 16:08:22 +0800 Subject: [PATCH] add support for window spill --- .../sql/execution/ColumnarWindowExec.scala | 55 ++++++++++++++++++- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala index 9f20f5cb6..5825d6355 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarWindowExec.scala @@ -17,17 +17,21 @@ package org.apache.spark.sql.execution +import java.io.{File, IOException} +import java.util.UUID import java.util.concurrent.TimeUnit.NANOSECONDS +import com.huawei.boostkit.spark.ColumnarPluginConfig import com.huawei.boostkit.spark.Constant.IS_SKIP_VERIFY_EXP import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.`type`.DataType import nova.hetu.omniruntime.constants.{FunctionType, OmniWindowFrameBoundType, OmniWindowFrameType} -import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SpillConfig} +import nova.hetu.omniruntime.operator.config.{OperatorConfig, OverflowConfig, SparkSpillConfig} import nova.hetu.omniruntime.operator.window.OmniWindowWithExprOperatorFactory import nova.hetu.omniruntime.vector.VecBatch +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -40,6 +44,7 @@ import org.apache.spark.sql.execution.vectorized.OmniColumnVector import org.apache.spark.sql.execution.window.WindowExecBase import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], @@ -50,6 +55,8 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], override def supportsColumnar: Boolean = true + private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 + override lazy val metrics = Map( "addInputTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in omni addInput"), "numInputVecBatches" -> SQLMetrics.createMetric(sparkContext, "number of input vecBatches"), @@ -57,7 +64,8 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], "omniCodegenTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in omni codegen"), "getOutputTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in omni getOutput"), "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numOutputVecBatches" -> SQLMetrics.createMetric(sparkContext, "number of output vecBatches")) + "numOutputVecBatches" -> SQLMetrics.createMetric(sparkContext, "number of output vecBatches"), + "bytesSpilled" -> SQLMetrics.createSizeMetric(sparkContext, "window bytes spilled")) override def output: Seq[Attribute] = child.output ++ windowExpression.map(_.toAttribute) @@ -82,6 +90,33 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") } + val sparkConfTmp: SparkConf = sparkContext.conf + + private def generateLocalDirs(conf: SparkConf): Array[File] = { + Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => + val localDir = generateDirs(rootDir, "columnarWindowSpill") + Some(localDir) + } + } + + def generateDirs(root: String, namePrefix: String = "spark"): File = { + var attempts = 0 + val maxAttempts = MAX_DIR_CREATION_ATTEMPTS + var dir: File = null + while (dir == null) { + attempts += 1 + if (attempts > maxAttempts) { + throw new IOException("Directory conflict: failed to generate a temp directory for" + + "columnarWindowSpill (under " + root + ") after " + maxAttempts + " attempts!") + } + dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString) + if (dir.exists()) { + dir = null + } + } + dir.getCanonicalFile + } + def getWindowFrameParam(frame: SpecifiedWindowFrame): (OmniWindowFrameType, OmniWindowFrameBoundType, OmniWindowFrameBoundType, Int, Int) = { var windowFrameStartChannel = -1 @@ -222,6 +257,7 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], val numOutputRows = longMetric("numOutputRows") val numOutputVecBatches = longMetric("numOutputVecBatches") val getOutputTime = longMetric("getOutputTime") + val bytesSpilled = longMetric("bytesSpilled") val sourceTypes = new Array[DataType](child.output.size) val sortCols = new Array[Int](orderSpec.size) @@ -351,12 +387,24 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], val windowExpressionWithProjectConstant = windowExpressionWithProject child.executeColumnar().mapPartitionsWithIndexInternal { (index, iter) => + val columnarConf = ColumnarPluginConfig.getSessionConf + val windowSpillEnable = columnarConf.enableWindowSpill + val windowLocalDirs: Array[File] = generateLocalDirs(sparkConfTmp) + val hash = Utils.nonNegativeHash(SparkEnv.get.executorId) + val dirId = hash % windowLocalDirs.length + val spillPathDir = windowLocalDirs(dirId).getCanonicalPath + val spillDirDiskReserveSize = columnarConf.columnarSpillDirDiskReserveSize + val windowSpillRowThreshold = columnarConf.columnarWindowSpillRowThreshold + val spillMemPctThreshold = columnarConf.columnarSpillMemPctThreshold + val sparkSpillConfig = new SparkSpillConfig(windowSpillEnable, spillPathDir, + spillDirDiskReserveSize, windowSpillRowThreshold, spillMemPctThreshold) + val startCodegen = System.nanoTime() val windowOperatorFactory = new OmniWindowWithExprOperatorFactory(sourceTypes, outputCols, windowFunType, omminPartitionChannels, preGroupedChannels, sortCols, ascendings, nullFirsts, 0, 10000, windowArgKeys, windowFunRetType, windowFrameTypes, windowFrameStartTypes, windowFrameStartChannels, windowFrameEndTypes, windowFrameEndChannels, - new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) + new OperatorConfig(sparkSpillConfig, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val windowOperator = windowOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) @@ -379,6 +427,7 @@ case class ColumnarWindowExec(windowExpression: Seq[NamedExpression], val startGetOp = System.nanoTime() val results = windowOperator.getOutput getOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startGetOp) + bytesSpilled += windowOperator.getSpilledBytes var windowResultSchema = this.schema if (windowExpressionWithProjectConstant) { -- Gitee