diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarLimit.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarLimit.scala index 410ce4127fe5038682e5195a7ec5e4f586439ab1..91ee64489731187e9edda7d56d42511f995c2f1d 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarLimit.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarLimit.scala @@ -70,7 +70,7 @@ trait ColumnarBaseLimitExec extends LimitExec { child.executeColumnar().mapPartitions { iter => val startCodegen = System.nanoTime() - val limitOperatorFactory = new OmniLimitOperatorFactory(limit) + val limitOperatorFactory = new OmniLimitOperatorFactory(limit, 0) val limitOperator = limitOperatorFactory.createOperator omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) @@ -155,11 +155,73 @@ case class ColumnarGlobalLimitExec(limit: Int, child: SparkPlan, offset: Int = 0 copy(child = newChild) def buildCheck(): Unit = { - if (offset > 0) { - throw new UnsupportedOperationException("ColumnarGlobalLimitExec doesn't support offset greater than 0.") - } child.output.foreach(attr => sparkTypeToOmniType(attr.dataType, attr.metadata)) } + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val addInputTime = longMetric("addInputTime") + val omniCodegenTime = longMetric("omniCodegenTime") + val getOutputTime = longMetric("getOutputTime") + val numOutputRows = longMetric("numOutputRows") + val numOutputVecBatches= longMetric("numOutputVecBatches") + + child.executeColumnar().mapPartitions { iter => + + val startCodegen = System.nanoTime() + val limitOperatorFactory = new OmniLimitOperatorFactory(limit, offset) + val limitOperator = limitOperatorFactory.createOperator + omniCodegenTime += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) + + // close operator + SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => { + limitOperator.close() + limitOperatorFactory.close() + }) + + val localSchema = this.schema + new Iterator[ColumnarBatch] { + private var results: java.util.Iterator[VecBatch] = _ + + override def hasNext: Boolean = { + while ((results == null || !results.hasNext) && iter.hasNext) { + val batch = iter.next() + val input = transColBatchToOmniVecs(batch) + val vecBatch = new VecBatch(input, batch.numRows()) + val startInput = System.nanoTime() + limitOperator.addInput(vecBatch) + addInputTime += NANOSECONDS.toMillis(System.nanoTime() - startInput) + + val startGetOp = System.nanoTime() + results = limitOperator.getOutput + getOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startGetOp) + } + if (results == null) { + false + } else { + val startGetOp: Long = System.nanoTime() + val hasNext = results.hasNext + getOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startGetOp) + hasNext + } + } + + override def next(): ColumnarBatch = { + val startGetOp = System.nanoTime() + val vecBatch = results.next() + getOutputTime += NANOSECONDS.toMillis(System.nanoTime() - startGetOp) + val vectors: Seq[OmniColumnVector] = OmniColumnVector.allocateColumns( + vecBatch.getRowCount, localSchema, false) + vectors.zipWithIndex.foreach { case (vector, i) => + vector.reset() + vector.setVec(vecBatch.getVectors()(i)) + } + numOutputRows += vecBatch.getRowCount + numOutputVecBatches+= 1 + vecBatch.close() + new ColumnarBatch(vectors.toArray, vecBatch.getRowCount) + } + } + } } case class ColumnarTakeOrderedAndProjectExec( @@ -168,7 +230,7 @@ case class ColumnarTakeOrderedAndProjectExec( projectList: Seq[NamedExpression], child: SparkPlan, offset: Int = 0) - extends UnaryExecNode { + extends OrderPreservingUnaryExecNode { override def supportsColumnar: Boolean = true @@ -218,9 +280,6 @@ case class ColumnarTakeOrderedAndProjectExec( } def buildCheck(): Unit = { - if (offset > 0) { - throw new UnsupportedOperationException("ColumnarTakeOrderedAndProjectExec doesn't support offset greater than 0.") - } genSortParam(child.output, sortOrder) val projectEqualChildOutput = projectList == child.output var omniInputTypes: Array[DataType] = null @@ -245,9 +304,9 @@ case class ColumnarTakeOrderedAndProjectExec( } else { val (sourceTypes, ascending, nullFirsts, sortColsExp) = genSortParam(child.output, sortOrder) - def computeTopN(iter: Iterator[ColumnarBatch], schema: StructType): Iterator[ColumnarBatch] = { + def computeTopN(iter: Iterator[ColumnarBatch], schema: StructType, offset: Int): Iterator[ColumnarBatch] = { val startCodegen = System.nanoTime() - val topNOperatorFactory = new OmniTopNWithExprOperatorFactory(sourceTypes, limit, sortColsExp, ascending, nullFirsts, + val topNOperatorFactory = new OmniTopNWithExprOperatorFactory(sourceTypes, limit, offset, sortColsExp, ascending, nullFirsts, new OperatorConfig(SpillConfig.NONE, new OverflowConfig(OmniAdaptorUtil.overflowConf()), IS_SKIP_VERIFY_EXP)) val topNOperator = topNOperatorFactory.createOperator longMetric("omniCodegenTime") += NANOSECONDS.toMillis(System.nanoTime() - startCodegen) @@ -265,7 +324,7 @@ case class ColumnarTakeOrderedAndProjectExec( } else { val localTopK: RDD[ColumnarBatch] = { child.executeColumnar().mapPartitionsWithIndexInternal { (_, iter) => - computeTopN(iter, this.child.schema) + computeTopN(iter, this.child.schema, 0) } } @@ -302,7 +361,7 @@ case class ColumnarTakeOrderedAndProjectExec( } singlePartitionRDD.mapPartitions { iter => // TopN = omni-top-n + omni-project - val topN: Iterator[ColumnarBatch] = computeTopN(iter, this.child.schema) + val topN: Iterator[ColumnarBatch] = computeTopN(iter, this.child.schema, offset) if (!projectEqualChildOutput) { dealPartitionData(null, null, addInputTime, omniCodegenTime, getOutputTime, omniInputTypes, omniExpressions, topN, this.schema) @@ -313,7 +372,9 @@ case class ColumnarTakeOrderedAndProjectExec( } } - override def outputOrdering: Seq[SortOrder] = sortOrder + override def outputExpressions: Seq[NamedExpression] = projectList + + override def orderingExpressions: Seq[SortOrder] = sortOrder override def outputPartitioning: Partitioning = SinglePartition