diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index a5f5b599dc4b8f77daad1daafa0147f638494315..a2cf44c0e67fdd97c557a0cef2133f1d4905a7f1 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -200,6 +200,8 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { def catalogCacheExpireTime: Int = conf.getConf(CATALOG_CACHE_EXPIRE_TIME) def joinOutputStringTypeCost: Int = conf.getConf(JOIN_OUTPUT_STRING_COST_ESTIMATE) + + def enableOmniStaticInvoke: Boolean = conf.getConf(ENABLED_OMNI_STATICINVOKE) } @@ -719,4 +721,10 @@ object ColumnarPluginConfig { .internal() .stringConf .createOptional + + val ENABLED_OMNI_STATICINVOKE = buildConf("spark.omni.sql.columnar.staticInvoke") + .internal() + .doc("enable omni staticInvoke") + .booleanConf + .createWithDefault(false) } diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 36c9ca1e143863e3960b3c20e14ee0ff090e2fe0..e8f7730a987ebbf2a5559b8c7e6181389fc56505 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.util import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{JsonObjectExtension, JsonArrayExtension} import com.huawei.boostkit.spark.util.ModifyUtilAdaptor +import com.huawei.boostkit.spark.ColumnarPluginConfig import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{ExprId, Expression, PromotePrecision} @@ -40,7 +41,7 @@ object ModifyUtil extends Logging { case promotePrecision: PromotePrecision => func(promotePrecision.child, exprsIndexMap, promotePrecision.child.dataType) - case staticInvoke: StaticInvoke => + case staticInvoke: StaticInvoke if ColumnarPluginConfig.getSessionConf.enableOmniStaticInvoke => { val funcName = staticInvoke.functionName funcName match { diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 2b94582c4138266aebe84a204a39f268216fcad8..e2565de231ec37307f0f990ad2d5714503bd7891 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.util import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{JsonObjectExtension, JsonArrayExtension} import com.huawei.boostkit.spark.util.ModifyUtilAdaptor +import com.huawei.boostkit.spark.ColumnarPluginConfig import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, ExprId, Expression, PromotePrecision} @@ -69,7 +70,7 @@ object ModifyUtil extends Logging { .put("dataType", 2) .put("value", bfAddress) - case staticInvoke: StaticInvoke => + case staticInvoke: StaticInvoke if ColumnarPluginConfig.getSessionConf.enableOmniStaticInvoke => { val funcName = staticInvoke.functionName funcName match { diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 995555e52869c696e575591aa0c549151c3a8612..9e66c26e41f35f5069c53440c0f1064f18368566 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.util import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{JsonObjectExtension, JsonArrayExtension} import com.huawei.boostkit.spark.util.ModifyUtilAdaptor +import com.huawei.boostkit.spark.ColumnarPluginConfig import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, ExprId, Expression, TryEval} @@ -65,7 +66,7 @@ object ModifyUtil extends Logging { case tryEval: TryEval => func(tryEval.child, exprsIndexMap, returnDatatype) - case staticInvoke: StaticInvoke => + case staticInvoke: StaticInvoke if ColumnarPluginConfig.getSessionConf.enableOmniStaticInvoke => { val funcName = staticInvoke.functionName funcName match { diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 6a06bca012ead69bdb7f757477b458739ad02e16..198b991b68ba51652ec27908587ee2f2de6817fe 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.util import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{JsonObjectExtension, JsonArrayExtension} import com.huawei.boostkit.spark.util.ModifyUtilAdaptor +import com.huawei.boostkit.spark.ColumnarPluginConfig import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, ExprId, Expression, TryEval} @@ -67,7 +68,7 @@ object ModifyUtil extends Logging { case tryEval: TryEval => func(tryEval.child, exprsIndexMap, returnDatatype) - case staticInvoke: StaticInvoke => + case staticInvoke: StaticInvoke if ColumnarPluginConfig.getSessionConf.enableOmniStaticInvoke => { val funcName = staticInvoke.functionName funcName match { diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index eba3d4d47f3b90fb003f98db9b311fa09b0a53c5..92d3af91d5fe5b357eab3d43555dfca264c3815a 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -303,33 +303,35 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { } test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + withSQLConf(("spark.omni.sql.columnar.staticInvoke", "true")) { + val drop = spark.sql("drop table if exists source_table") + drop.collect() + val createTable = spark.sql("create table source_table" + + "(id int, name string, amount int) using parquet") + createTable.collect() + val dropNP = spark.sql("drop table if exists target_table") + dropNP.collect() + val createTableNP = spark.sql("create table target_table" + + "(name varchar(5), total_amount long) using parquet") + createTableNP.collect() + var insert = spark.sql("insert into table source_table values" + + "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") + insert.collect() + insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + + "source_table where amount >= 10 GROUP BY UPPER(name)") + insert.collect() + assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] + .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") + val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] + .commandPhysicalPlan.find({ + case _: HashAggregateExec => true + case _ => false + }) + assert(columnarDataWrite.isEmpty, "use columnar data writing command") + val select = spark.sql("select * from target_table order by name") + val runRows = select.collect() + val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) + assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + } } } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index 085b7ded4d0fbc7dfdc226c6a4d22668482f9df9..abe610c05a6e6bb252f77cb3ca2b5957d7ba3173 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -304,33 +304,35 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { } test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + withSQLConf(("spark.omni.sql.columnar.staticInvoke", "true")) { + val drop = spark.sql("drop table if exists source_table") + drop.collect() + val createTable = spark.sql("create table source_table" + + "(id int, name string, amount int) using parquet") + createTable.collect() + val dropNP = spark.sql("drop table if exists target_table") + dropNP.collect() + val createTableNP = spark.sql("create table target_table" + + "(name varchar(5), total_amount long) using parquet") + createTableNP.collect() + var insert = spark.sql("insert into table source_table values" + + "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") + insert.collect() + insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + + "source_table where amount >= 10 GROUP BY UPPER(name)") + insert.collect() + assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] + .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") + val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] + .commandPhysicalPlan.find({ + case _: HashAggregateExec => true + case _ => false + }) + assert(columnarDataWrite.isEmpty, "use columnar data writing command") + val select = spark.sql("select * from target_table order by name") + val runRows = select.collect() + val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) + assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + } } } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index e8af6bb75760fba0a066aad1f8ded977152435bb..3af48624c771bb6844b612b23da513074b4bfe86 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -206,56 +206,60 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { } test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + withSQLConf(("spark.omni.sql.columnar.staticInvoke", "true")) { + val drop = spark.sql("drop table if exists source_table") + drop.collect() + val createTable = spark.sql("create table source_table" + + "(id int, name string, amount int) using parquet") + createTable.collect() + val dropNP = spark.sql("drop table if exists target_table") + dropNP.collect() + val createTableNP = spark.sql("create table target_table" + + "(name varchar(5), total_amount long) using parquet") + createTableNP.collect() + var insert = spark.sql("insert into table source_table values" + + "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") + insert.collect() + insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + + "source_table where amount >= 10 GROUP BY UPPER(name)") + insert.collect() + assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] + .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") + val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] + .commandPhysicalPlan.find({ + case _: HashAggregateExec => true + case _ => false + }) + assert(columnarDataWrite.isEmpty, "use columnar data writing command") + val select = spark.sql("select * from target_table order by name") + val runRows = select.collect() + val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) + assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + } } test("Unsupported StaticInvoke function readSidePadding") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name char(5)) using parquet") - createTable.collect() - val insert = spark.sql("insert into table source_table values" + - "(1, 'Bob'), (2, '測試中文'), (3, NULL), (4, 'abide')") - insert.collect() - val select = spark.sql("select * from source_table order by id") - val runRows = select.collect() - assert(select.queryExecution.executedPlan.toString().contains("OmniColumnarProject"), - "use columnar data writing command") - val columnarDataWrite = select.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] - .find({ - case _: ProjectExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val expectedRows = Seq(Row(1, "Bob "), Row(2, "測試中文 "), Row(3, null), Row(4, "abide")) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + withSQLConf(("spark.omni.sql.columnar.staticInvoke", "true")) { + val drop = spark.sql("drop table if exists source_table") + drop.collect() + val createTable = spark.sql("create table source_table" + + "(id int, name char(5)) using parquet") + createTable.collect() + val insert = spark.sql("insert into table source_table values" + + "(1, 'Bob'), (2, '測試中文'), (3, NULL), (4, 'abide')") + insert.collect() + val select = spark.sql("select * from source_table order by id") + val runRows = select.collect() + assert(select.queryExecution.executedPlan.toString().contains("OmniColumnarProject"), + "use columnar data writing command") + val columnarDataWrite = select.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + .find({ + case _: ProjectExec => true + case _ => false + }) + assert(columnarDataWrite.isEmpty, "use columnar data writing command") + val expectedRows = Seq(Row(1, "Bob "), Row(2, "測試中文 "), Row(3, null), Row(4, "abide")) + assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + } } } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index e8af6bb75760fba0a066aad1f8ded977152435bb..3af48624c771bb6844b612b23da513074b4bfe86 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -206,56 +206,60 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { } test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + withSQLConf(("spark.omni.sql.columnar.staticInvoke", "true")) { + val drop = spark.sql("drop table if exists source_table") + drop.collect() + val createTable = spark.sql("create table source_table" + + "(id int, name string, amount int) using parquet") + createTable.collect() + val dropNP = spark.sql("drop table if exists target_table") + dropNP.collect() + val createTableNP = spark.sql("create table target_table" + + "(name varchar(5), total_amount long) using parquet") + createTableNP.collect() + var insert = spark.sql("insert into table source_table values" + + "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") + insert.collect() + insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + + "source_table where amount >= 10 GROUP BY UPPER(name)") + insert.collect() + assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] + .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") + val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] + .commandPhysicalPlan.find({ + case _: HashAggregateExec => true + case _ => false + }) + assert(columnarDataWrite.isEmpty, "use columnar data writing command") + val select = spark.sql("select * from target_table order by name") + val runRows = select.collect() + val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) + assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + } } test("Unsupported StaticInvoke function readSidePadding") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name char(5)) using parquet") - createTable.collect() - val insert = spark.sql("insert into table source_table values" + - "(1, 'Bob'), (2, '測試中文'), (3, NULL), (4, 'abide')") - insert.collect() - val select = spark.sql("select * from source_table order by id") - val runRows = select.collect() - assert(select.queryExecution.executedPlan.toString().contains("OmniColumnarProject"), - "use columnar data writing command") - val columnarDataWrite = select.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] - .find({ - case _: ProjectExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val expectedRows = Seq(Row(1, "Bob "), Row(2, "測試中文 "), Row(3, null), Row(4, "abide")) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + withSQLConf(("spark.omni.sql.columnar.staticInvoke", "true")) { + val drop = spark.sql("drop table if exists source_table") + drop.collect() + val createTable = spark.sql("create table source_table" + + "(id int, name char(5)) using parquet") + createTable.collect() + val insert = spark.sql("insert into table source_table values" + + "(1, 'Bob'), (2, '測試中文'), (3, NULL), (4, 'abide')") + insert.collect() + val select = spark.sql("select * from source_table order by id") + val runRows = select.collect() + assert(select.queryExecution.executedPlan.toString().contains("OmniColumnarProject"), + "use columnar data writing command") + val columnarDataWrite = select.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + .find({ + case _: ProjectExec => true + case _ => false + }) + assert(columnarDataWrite.isEmpty, "use columnar data writing command") + val expectedRows = Seq(Row(1, "Bob "), Row(2, "測試中文 "), Row(3, null), Row(4, "abide")) + assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") + } } }