diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index 170393144eddc51db16bf41980cd1d7377c0cca9..26555cc23b21a78180403ada2e9a3c3921543c6b 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -305,6 +305,11 @@ object OmniExpressionAdaptor extends Logging { (!isDecimalOrStringType(cast.dataType) && cast.child.dataType.isInstanceOf[StringType])) { throw new UnsupportedOperationException(s"Unsupported expression: $expr") } + + // not support Cast(double as decimal) + if (cast.dataType.isInstanceOf[DecimalType] && cast.child.dataType.isInstanceOf[DoubleType]) { + throw new UnsupportedOperationException(s"Unsupported expression: $expr") + } } def toOmniLiteral(literal: Literal): String = { @@ -500,6 +505,11 @@ object OmniExpressionAdaptor extends Logging { .format(sparkTypeToOmniExpJsonType(lower.dataType), rewriteToOmniJsonExpressionLiteral(lower.child, exprsIndexMap)) + case upper: Upper => + "{\"exprType\":\"FUNCTION\",\"returnType\":%s,\"function_name\":\"upper\", \"arguments\":[%s]}" + .format(sparkTypeToOmniExpJsonType(upper.dataType), + rewriteToOmniJsonExpressionLiteral(upper.child, exprsIndexMap)) + case length: Length => "{\"exprType\":\"FUNCTION\",\"returnType\":%s,\"function_name\":\"length\", \"arguments\":[%s]}" .format(sparkTypeToOmniExpJsonType(length.dataType), diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala index 3769441cf7b315638a12ef3dbc8916bb7b96420c..d137388ab3c41c3ee103ac974cb594990379d394 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeAdaptorExec.scala @@ -42,6 +42,7 @@ case class ColumnarBroadcastExchangeAdaptorExec(child: SparkPlan, numPartitions: override def doExecute(): RDD[InternalRow] = { val numOutputRows: SQLMetric = longMetric("numOutputRows") val numOutputBatches: SQLMetric = longMetric("numOutputBatches") + val processTime: SQLMetric = longMetric("processTime") val inputRdd: BroadcastColumnarRDD = BroadcastColumnarRDD( sparkContext, metrics, @@ -49,7 +50,7 @@ case class ColumnarBroadcastExchangeAdaptorExec(child: SparkPlan, numPartitions: child.executeBroadcast(), StructType.fromAttributes(child.output)) inputRdd.mapPartitions { batches => - ColumnarBatchToInternalRow.convert(output, batches, numOutputRows, numOutputBatches) + ColumnarBatchToInternalRow.convert(output, batches, numOutputRows, numOutputBatches, processTime) } } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala index 92c6b6145c0c63c0ebd1821f087ede91f2e3d8e2..b1fd51f4867cf2c435c8ddd7036bf6f8b6818212 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import nova.hetu.omniruntime.vector.Vec +import java.util.concurrent.TimeUnit.NANOSECONDS import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -34,6 +34,8 @@ import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OmniColum import org.apache.spark.sql.types.{BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DecimalType, DoubleType, IntegerType, LongType, ShortType, StringType, StructType, TimestampType} import org.apache.spark.sql.vectorized.ColumnarBatch +import nova.hetu.omniruntime.vector.Vec + /** * Holds a user defined rule that can be used to inject columnar implementations of various * operators in the plan. The [[preColumnarTransitions]] [[Rule]] can be used to replace @@ -226,13 +228,15 @@ case class RowToOmniColumnarExec(child: SparkPlan) extends RowToColumnarTransiti override lazy val metrics: Map[String, SQLMetric] = Map( "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), - "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches") + "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), + "rowToOmniColumnarTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in row to OmniColumnar") ) override def doExecuteColumnar(): RDD[ColumnarBatch] = { val enableOffHeapColumnVector = sqlContext.conf.offHeapColumnVectorEnabled val numInputRows = longMetric("numInputRows") val numOutputBatches = longMetric("numOutputBatches") + val rowToOmniColumnarTime = longMetric("rowToOmniColumnarTime") // Instead of creating a new config we are reusing columnBatchSize. In the future if we do // combine with some of the Arrow conversion tools we will need to unify some of the configs. val numRows = conf.columnBatchSize @@ -249,6 +253,7 @@ case class RowToOmniColumnarExec(child: SparkPlan) extends RowToColumnarTransiti } override def next(): ColumnarBatch = { + val startTime = System.nanoTime() val vectors: Seq[WritableColumnVector] = OmniColumnVector.allocateColumns(numRows, localSchema, true) val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) @@ -268,6 +273,7 @@ case class RowToOmniColumnarExec(child: SparkPlan) extends RowToColumnarTransiti cb.setNumRows(rowCount) numInputRows += rowCount numOutputBatches += 1 + rowToOmniColumnarTime += NANOSECONDS.toMillis(System.nanoTime() - startTime) cb } } @@ -292,17 +298,19 @@ case class OmniColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransiti override lazy val metrics: Map[String, SQLMetric] = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches") + "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), + "omniColumnarToRowTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in omniColumnar to row") ) override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val numInputBatches = longMetric("numInputBatches") + val omniColumnarToRowTime = longMetric("omniColumnarToRowTime") // This avoids calling `output` in the RDD closure, so that we don't need to include the entire // plan (this) in the closure. val localOutput = this.output child.executeColumnar().mapPartitionsInternal { batches => - ColumnarBatchToInternalRow.convert(localOutput, batches, numOutputRows, numInputBatches) + ColumnarBatchToInternalRow.convert(localOutput, batches, numOutputRows, numInputBatches, omniColumnarToRowTime) } } } @@ -310,10 +318,11 @@ case class OmniColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransiti object ColumnarBatchToInternalRow { def convert(output: Seq[Attribute], batches: Iterator[ColumnarBatch], - numOutputRows: SQLMetric, numInputBatches: SQLMetric ): Iterator[InternalRow] = { + numOutputRows: SQLMetric, numInputBatches: SQLMetric, + rowToOmniColumnarTime: SQLMetric): Iterator[InternalRow] = { + val startTime = System.nanoTime() val toUnsafe = UnsafeProjection.create(output, output) val vecsTmp = new ListBuffer[Vec] - val batchIter = batches.flatMap { batch => // store vec since tablescan reuse batch for (i <- 0 until batch.numCols()) { @@ -325,7 +334,9 @@ object ColumnarBatchToInternalRow { } numInputBatches += 1 numOutputRows += batch.numRows() - batch.rowIterator().asScala.map(toUnsafe) + val iter = batch.rowIterator().asScala.map(toUnsafe) + rowToOmniColumnarTime += NANOSECONDS.toMillis(System.nanoTime() - startTime) + iter } SparkMemoryUtils.addLeakSafeTaskCompletionListener { _ => diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala index b538a8613549b4986b968e2973434e79a6b4d38c..632f718a1c1daac5ca9996174a4c8533870ba16f 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/joins/ColumnarSortMergeJoinExec.scala @@ -76,15 +76,15 @@ class ColumnarSortMergeJoinExec( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "streamedAddInputTime" -> - SQLMetrics.createMetric(sparkContext, "time in omni streamed addInput"), + SQLMetrics.createTimingMetric(sparkContext, "time in omni streamed addInput"), "streamedCodegenTime" -> - SQLMetrics.createMetric(sparkContext, "time in omni streamed codegen"), + SQLMetrics.createTimingMetric(sparkContext, "time in omni streamed codegen"), "bufferedAddInputTime" -> - SQLMetrics.createMetric(sparkContext, "time in omni buffered addInput"), + SQLMetrics.createTimingMetric(sparkContext, "time in omni buffered addInput"), "bufferedCodegenTime" -> - SQLMetrics.createMetric(sparkContext, "time in omni buffered codegen"), + SQLMetrics.createTimingMetric(sparkContext, "time in omni buffered codegen"), "getOutputTime" -> - SQLMetrics.createMetric(sparkContext, "time in omni buffered getOutput"), + SQLMetrics.createTimingMetric(sparkContext, "time in omni buffered getOutput"), "numOutputVecBatchs" -> SQLMetrics.createMetric(sparkContext, "number of output vecBatchs"), "numMergedVecBatchs" -> SQLMetrics.createMetric(sparkContext, "number of merged vecBatchs"), diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarBuiltInFuncSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarBuiltInFuncSuite.scala index ce3e7ab8576a47a40e7a434847547b0978824043..89b77707730db485e21efaacf2ea5e6d8a63fd60 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarBuiltInFuncSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarBuiltInFuncSuite.scala @@ -29,434 +29,410 @@ class ColumnarBuiltInFuncSuite extends ColumnarSparkPlanTest{ protected override def beforeAll(): Unit = { super.beforeAll() buildInDf = Seq[(String, String, String, String, Long, Int, String, String)]( - (null, "ChaR1 R", null, " varchar100 ", 1001L, 1, "中文1", "varchar100_normal"), - ("char200 ", "char2 ", "varchar2", "", 1002L, 2, "中文2", "varchar200_normal"), - ("char300 ", "char3 ", "varchar3", "varchar300", 1003L, 3, "中文3", "varchar300_normal"), - (null, "char4 ", "varchar4", "varchar400", 1004L, 4, "中文4", "varchar400_normal") + (null, "ChaR1 R", null, " varchar100 ", 1001L, 1, " 中文1aA ", "varchar100_normal"), + ("char200 ", "char2 ", "varchar2", "", 1002L, 2, "中文2bB", "varchar200_normal"), + ("char300 ", "char3 ", "varchar3", "varchar300", 1003L, 3, "中文3cC", "varchar300_normal"), + (null, "char4 ", "varchar4", "varchar400", 1004L, 4, null, "varchar400_normal") ).toDF("char_null", "char_normal", "varchar_null", "varchar_empty", "long_col", "int_col", "ch_col", "varchar_normal") buildInDf.createOrReplaceTempView("builtin_table") } test("Test ColumnarProjectExec happen and result is same as native " + "when execute lower with normal") { - val res = spark.sql("select lower(char_normal) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row("char1 r"), - Row("char2 "), - Row("char3 "), - Row("char4 ") - ) + val sql = "select lower(char_normal) from builtin_table" + val expected = Seq( + Row("char1 r"), + Row("char2 "), + Row("char3 "), + Row("char4 ") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute lower with null") { - val res = spark.sql("select lower(char_null) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null), - Row("char200 "), - Row("char300 "), - Row(null) - ) + val sql = "select lower(char_null) from builtin_table" + val expected = Seq( + Row(null), + Row("char200 "), + Row("char300 "), + Row(null) ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute lower with space/empty string") { - val res = spark.sql("select lower(varchar_empty) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(" varchar100 "), - Row(""), - Row("varchar300"), - Row("varchar400") - ) + val sql = "select lower(varchar_empty) from builtin_table" + val expected = Seq( + Row(" varchar100 "), + Row(""), + Row("varchar300"), + Row("varchar400") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute lower-lower") { - val res = spark.sql("select lower(char_null), lower(varchar_null) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null, null), - Row("char200 ", "varchar2"), - Row("char300 ", "varchar3"), - Row(null, "varchar4"), - ) + val sql = "select lower(char_null), lower(varchar_null) from builtin_table" + val expected = Seq( + Row(null, null), + Row("char200 ", "varchar2"), + Row("char300 ", "varchar3"), + Row(null, "varchar4"), ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute lower(lower())") { - val res = spark.sql("select lower(lower(char_null)) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null), - Row("char200 "), - Row("char300 "), - Row(null) - ) + val sql = "select lower(lower(char_null)) from builtin_table" + val expected = Seq( + Row(null), + Row("char200 "), + Row("char300 "), + Row(null) ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute lower with subQuery") { - val res = spark.sql("select lower(l) from (select lower(char_normal) as l from builtin_table)") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row("char1 r"), - Row("char2 "), - Row("char3 "), - Row("char4 ") - ) + val sql = "select lower(l) from (select lower(char_normal) as l from builtin_table)" + val expected = Seq( + Row("char1 r"), + Row("char2 "), + Row("char3 "), + Row("char4 ") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute lower with ch") { - val res = spark.sql("select lower(ch_col) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row("中文1"), - Row("中文2"), - Row("中文3"), - Row("中文4") - ) + val sql = "select lower(ch_col) from builtin_table" + val expected = Seq( + Row(" 中文1aa "), + Row("中文2bb"), + Row("中文3cc"), + Row(null) ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute length with normal") { - val res = spark.sql("select length(char_normal) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(10), - Row(10), - Row(10), - Row(10) - ) + val sql = "select length(char_normal) from builtin_table" + val expected = Seq( + Row(10), + Row(10), + Row(10), + Row(10) ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute length with null") { - val res = spark.sql("select length(char_null) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null), - Row(10), - Row(10), - Row(null) - ) + val sql = "select length(char_null) from builtin_table" + val expected = Seq( + Row(null), + Row(10), + Row(10), + Row(null) ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute length with space/empty string") { - val res = spark.sql("select length(varchar_empty) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(13), - Row(0), - Row(10), - Row(10) - ) + val sql = "select length(varchar_empty) from builtin_table" + val expected = Seq( + Row(13), + Row(0), + Row(10), + Row(10) ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute length with expr") { - val res = spark.sql("select length(char_null) / 2 from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null), - Row(5.0), - Row(5.0), - Row(null) - ) + val sql = "select length(char_null) / 2 from builtin_table" + val expected = Seq( + Row(null), + Row(5.0), + Row(5.0), + Row(null) ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute length-length") { - val res = spark.sql("select length(char_null),length(varchar_null) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null, null), - Row(10, 8), - Row(10, 8), - Row(null, 8) - ) + val sql = "select length(char_null),length(varchar_null) from builtin_table" + val expected = Seq( + Row(null, null), + Row(10, 8), + Row(10, 8), + Row(null, 8) ) + checkResult(sql, expected) } // replace(str, search, replaceStr) test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with matched and replace str") { - val res = spark.sql("select replace(varchar_normal,varchar_empty,char_normal) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row("varchar100_normal"), - Row("varchar200_normal"), - Row("char3 _normal"), - Row("char4 _normal") - ) + val sql = "select replace(varchar_normal,varchar_empty,char_normal) from builtin_table" + val expected = Seq( + Row("varchar100_normal"), + Row("varchar200_normal"), + Row("char3 _normal"), + Row("char4 _normal") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with not matched") { - val res = spark.sql("select replace(char_normal,varchar_normal,char_normal) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row("ChaR1 R"), - Row("char2 "), - Row("char3 "), - Row("char4 ") - ) + val sql = "select replace(char_normal,varchar_normal,char_normal) from builtin_table" + val expected = Seq( + Row("ChaR1 R"), + Row("char2 "), + Row("char3 "), + Row("char4 ") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with str null") { - val res = spark.sql("select replace(varchar_null,char_normal,varchar_normal) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null), - Row("varchar2"), - Row("varchar3"), - Row("varchar4") - ) + val sql = "select replace(varchar_null,char_normal,varchar_normal) from builtin_table" + val expected = Seq( + Row(null), + Row("varchar2"), + Row("varchar3"), + Row("varchar4") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with str space/empty") { - val res = spark.sql("select replace(varchar_empty,varchar_empty,varchar_normal) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row("varchar100_normal"), - Row(""), - Row("varchar300_normal"), - Row("varchar400_normal") - ) + val sql = "select replace(varchar_empty,varchar_empty,varchar_normal) from builtin_table" + val expected = Seq( + Row("varchar100_normal"), + Row(""), + Row("varchar300_normal"), + Row("varchar400_normal") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with search null") { - val res = spark.sql("select replace(varchar_normal,varchar_null,char_normal) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null), - Row("char2 00_normal"), - Row("char3 00_normal"), - Row("char4 00_normal") - ) + val sql = "select replace(varchar_normal,varchar_null,char_normal) from builtin_table" + val expected = Seq( + Row(null), + Row("char2 00_normal"), + Row("char3 00_normal"), + Row("char4 00_normal") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with search space/empty") { - val res = spark.sql("select replace(varchar_normal,varchar_empty,char_normal) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row("varchar100_normal"), - Row("varchar200_normal"), - Row("char3 _normal"), - Row("char4 _normal") - ) + val sql = "select replace(varchar_normal,varchar_empty,char_normal) from builtin_table" + val expected = Seq( + Row("varchar100_normal"), + Row("varchar200_normal"), + Row("char3 _normal"), + Row("char4 _normal") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with replaceStr null") { - val res = spark.sql("select replace(varchar_normal,varchar_empty,varchar_null) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null), - Row("varchar200_normal"), - Row("varchar3_normal"), - Row("varchar4_normal") - ) + val sql = "select replace(varchar_normal,varchar_empty,varchar_null) from builtin_table" + val expected = Seq( + Row(null), + Row("varchar200_normal"), + Row("varchar3_normal"), + Row("varchar4_normal") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with replaceStr space/empty") { - val res = spark.sql("select replace(varchar_normal,varchar_normal,varchar_empty) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(" varchar100 "), - Row(""), - Row("varchar300"), - Row("varchar400") - ) + val sql = "select replace(varchar_normal,varchar_normal,varchar_empty) from builtin_table" + val expected = Seq( + Row(" varchar100 "), + Row(""), + Row("varchar300"), + Row("varchar400") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with str/search/replace all null") { - val res = spark.sql("select replace(varchar_null,varchar_null,char_null) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null), - Row("char200 "), - Row("char300 "), - Row(null) - ) + val sql = "select replace(varchar_null,varchar_null,char_null) from builtin_table" + val expected = Seq( + Row(null), + Row("char200 "), + Row("char300 "), + Row(null) ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with replaceStr default") { - val res = spark.sql("select replace(varchar_normal,varchar_normal) from builtin_table") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(""), - Row(""), - Row(""), - Row("") - ) + val sql = "select replace(varchar_normal,varchar_normal) from builtin_table" + val expected = Seq( + Row(""), + Row(""), + Row(""), + Row("") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with subReplace(normal,normal,normal)") { - val res = spark.sql("select replace(res,'c','ccc') from (select replace(varchar_normal,varchar_empty,char_normal) as res from builtin_table)") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row("varccchar100_normal"), - Row("varccchar200_normal"), - Row("ccchar3 _normal"), - Row("ccchar4 _normal") - ) + val sql = "select replace(res,'c','ccc') from (select replace(varchar_normal,varchar_empty,char_normal) as res from builtin_table)" + val expected = Seq( + Row("varccchar100_normal"), + Row("varccchar200_normal"), + Row("ccchar3 _normal"), + Row("ccchar4 _normal") ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace with subReplace(null,null,null)") { - val res = spark.sql("select replace(res,'c','ccc') from (select replace(varchar_null,varchar_null,char_null) as res from builtin_table)") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row(null), - Row("ccchar200 "), - Row("ccchar300 "), - Row(null) - ) + val sql = "select replace(res,'c','ccc') from (select replace(varchar_null,varchar_null,char_null) as res from builtin_table)" + val expected = Seq( + Row(null), + Row("ccchar200 "), + Row("ccchar300 "), + Row(null) ) + checkResult(sql, expected) } test("Test ColumnarProjectExec happen and result is same as native " + "when execute replace(replace)") { - val res = spark.sql("select replace(replace('ABCabc','AB','abc'),'abc','DEF')") - val executedPlan = res.queryExecution.executedPlan - assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") - assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") - checkAnswer( - res, - Seq( - Row("DEFCDEF") - ) + val sql = "select replace(replace('ABCabc','AB','abc'),'abc','DEF')" + val expected = Seq( + Row("DEFCDEF") ) + checkResult(sql, expected) + } + + // upper + test("Test ColumnarProjectExec happen and result is same as native " + + "when execute upper with normal") { + val sql = "select upper(char_normal) from builtin_table" + val expected = Seq( + Row("CHAR1 R"), + Row("CHAR2 "), + Row("CHAR3 "), + Row("CHAR4 ") + ) + checkResult(sql, expected) + } + + test("Test ColumnarProjectExec happen and result is same as native " + + "when execute upper with null") { + val sql = "select upper(char_null) from builtin_table" + val expected = Seq( + Row(null), + Row("CHAR200 "), + Row("CHAR300 "), + Row(null) + ) + checkResult(sql, expected) + } + + test("Test ColumnarProjectExec happen and result is same as native " + + "when execute upper with space/empty string") { + val sql = "select upper(varchar_empty) from builtin_table" + val expected = Seq( + Row(" VARCHAR100 "), + Row(""), + Row("VARCHAR300"), + Row("VARCHAR400") + ) + checkResult(sql, expected) + } + + test("Test ColumnarProjectExec happen and result is same as native " + + "when execute upper-upper") { + val sql = "select upper(char_null), upper(varchar_null) from builtin_table" + val expected = Seq( + Row(null, null), + Row("CHAR200 ", "VARCHAR2"), + Row("CHAR300 ", "VARCHAR3"), + Row(null, "VARCHAR4"), + ) + checkResult(sql, expected) + } + + test("Test ColumnarProjectExec happen and result is same as native " + + "when execute upper(upper())") { + val sql = "select upper(upper(char_null)) from builtin_table" + val expected = Seq( + Row(null), + Row("CHAR200 "), + Row("CHAR300 "), + Row(null) + ) + checkResult(sql, expected) + } + + test("Test ColumnarProjectExec happen and result is same as native " + + "when execute upper with subQuery") { + val sql = "select upper(l) from (select upper(char_normal) as l from builtin_table)" + val expected = Seq( + Row("CHAR1 R"), + Row("CHAR2 "), + Row("CHAR3 "), + Row("CHAR4 ") + ) + checkResult(sql, expected) + } + + test("Test ColumnarProjectExec happen and result is same as native " + + "when execute upper with ch") { + val sql = "select upper(ch_col) from builtin_table" + val expected = Seq( + Row(" 中文1AA "), + Row("中文2BB"), + Row("中文3CC"), + Row(null) + ) + checkResult(sql, expected) + } + + def checkResult(sql: String, expected: Seq[Row], isUseOmni: Boolean = true): Unit = { + def assertOmniProjectHappen(res: DataFrame): Unit = { + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + } + def assertOmniProjectNotHappen(res: DataFrame): Unit = { + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isEmpty, s"ColumnarProjectExec happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isDefined, s"ProjectExec not happened, executedPlan as follows: \n$executedPlan") + } + val res = spark.sql(sql) + if (isUseOmni) assertOmniProjectHappen(res) else assertOmniProjectNotHappen(res) + checkAnswer(res, expected) } }