diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 36c9ca1e143863e3960b3c20e14ee0ff090e2fe0..9d642404370456478c040269cc69febc6c45e957 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -18,17 +18,15 @@ package org.apache.spark.sql.util -import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{JsonObjectExtension, JsonArrayExtension} import com.huawei.boostkit.spark.util.ModifyUtilAdaptor import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{ExprId, Expression, PromotePrecision} -import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{CombineJoinedAggregates, MergeSubqueryFilters} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.{ColumnarToRowExec, OmniColumnarToRowExec, SparkPlan} import org.apache.spark.sql.types.DataType -import com.google.gson.{JsonArray, JsonObject} +import com.google.gson.JsonObject object ModifyUtil extends Logging { @@ -39,26 +37,6 @@ object ModifyUtil extends Logging { expr match { case promotePrecision: PromotePrecision => func(promotePrecision.child, exprsIndexMap, promotePrecision.child.dataType) - - case staticInvoke: StaticInvoke => - { - val funcName = staticInvoke.functionName - funcName match { - case "varcharTypeWriteSideCheck" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("function_name", "StaticInvokeVarcharTypeWriteSideCheck") - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case _ => throw new UnsupportedOperationException(s"StaticInvoke function: $funcName is not supported currently"); - } - } - case _ => null } diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 2b94582c4138266aebe84a204a39f268216fcad8..1e31047933b7d8daf3569c5cdba32055201ac574 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -23,7 +23,6 @@ import com.huawei.boostkit.spark.util.ModifyUtilAdaptor import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, ExprId, Expression, PromotePrecision} -import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{CombineJoinedAggregates, MergeSubqueryFilters} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.{BroadcastExchangeExecProxy, ColumnarBloomFilterSubquery, ColumnarDataWritingCommandExec, ColumnarToRowExec, OmniColumnarToRowExec, SparkPlan} @@ -69,25 +68,6 @@ object ModifyUtil extends Logging { .put("dataType", 2) .put("value", bfAddress) - case staticInvoke: StaticInvoke => - { - val funcName = staticInvoke.functionName - funcName match { - case "varcharTypeWriteSideCheck" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("function_name", "StaticInvokeVarcharTypeWriteSideCheck") - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case _ => throw new UnsupportedOperationException(s"StaticInvoke function: $funcName is not supported currently"); - } - } - case _ => null } diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 995555e52869c696e575591aa0c549151c3a8612..90875c2c2902bb78e52980f3d4f9e0e84b514508 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -23,7 +23,6 @@ import com.huawei.boostkit.spark.util.ModifyUtilAdaptor import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, ExprId, Expression, TryEval} -import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{CombineJoinedAggregates, MergeSubqueryFilters} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.{BroadcastExchangeExecProxy, ColumnarBloomFilterSubquery, ColumnarToRowExec, OmniColumnarToRowExec, SparkPlan} @@ -61,40 +60,8 @@ object ModifyUtil extends Logging { .put("isNull", bfAddress == 0L) .put("dataType", 2) .put("value", bfAddress) - case tryEval: TryEval => func(tryEval.child, exprsIndexMap, returnDatatype) - - case staticInvoke: StaticInvoke => - { - val funcName = staticInvoke.functionName - funcName match { - case "readSidePadding" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .put("function_name", "StaticInvokeCharReadPadding") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case "varcharTypeWriteSideCheck" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("function_name", "StaticInvokeVarcharTypeWriteSideCheck") - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case _ => throw new UnsupportedOperationException(s"StaticInvoke function: $funcName is not supported currently") - } - } - case _ => null } diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 6a06bca012ead69bdb7f757477b458739ad02e16..3ba2d82773a84ed93b95fdad5dcaff9751a9541b 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -23,7 +23,6 @@ import com.huawei.boostkit.spark.util.ModifyUtilAdaptor import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, ExprId, Expression, TryEval} -import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{CombineJoinedAggregates, MergeSubqueryFilters} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.{BroadcastExchangeExecProxy, ColumnarBloomFilterSubquery, ColumnarToRowExec, OmniColumnarToRowExec, SparkPlan} @@ -66,37 +65,6 @@ object ModifyUtil extends Logging { .put("value", bfAddress) case tryEval: TryEval => func(tryEval.child, exprsIndexMap, returnDatatype) - - case staticInvoke: StaticInvoke => - { - val funcName = staticInvoke.functionName - funcName match { - case "readSidePadding" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .put("function_name", "StaticInvokeCharReadPadding") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case "varcharTypeWriteSideCheck" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("function_name", "StaticInvokeVarcharTypeWriteSideCheck") - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case _ => throw new UnsupportedOperationException(s"StaticInvoke function: $funcName is not supported currently") - } - } - case _ => null } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index eba3d4d47f3b90fb003f98db9b311fa09b0a53c5..1d26660aa33c4f58f47480a9fd7e05a7122af3a2 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarFi import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.execution.ColumnarDataWritingCommandExec -import org.apache.spark.sql.execution.aggregate.HashAggregateExec import scala.concurrent.Future @@ -301,35 +300,4 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { Row(13, "6884578", 6, null, null)) assert(QueryTest.sameRows(runRowsNP, expectedRowsNP).isEmpty, "the run value is error") } - - test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index 085b7ded4d0fbc7dfdc226c6a4d22668482f9df9..03b019e6cc33770743219e7cced38c5ea28b1b5d 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarFi import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.execution.ColumnarDataWritingCommandExec -import org.apache.spark.sql.execution.aggregate.HashAggregateExec import scala.concurrent.Future @@ -302,35 +301,4 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { Row(13, "6884578", 6, null, null)) assert(QueryTest.sameRows(runRowsNP, expectedRowsNP).isEmpty, "the run value is error") } - - test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index e8af6bb75760fba0a066aad1f8ded977152435bb..e1d9064d3cfa9a02b8d70090e164486107b7f99a 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ColumnarBroa import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarFilterExec, ColumnarProjectExec, ColumnarTakeOrderedAndProjectExec, CommandResultExec, LeafExecNode, OmniColumnarToRowExec, ProjectExec, RowToOmniColumnarExec, SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.execution.aggregate.HashAggregateExec import scala.concurrent.Future @@ -204,58 +203,4 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { Row(13, "6884578", 6, null, null)) assert(QueryTest.sameRows(runRowsNP, expectedRowsNP).isEmpty, "the run value is error") } - - test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } - - test("Unsupported StaticInvoke function readSidePadding") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name char(5)) using parquet") - createTable.collect() - val insert = spark.sql("insert into table source_table values" + - "(1, 'Bob'), (2, '測試中文'), (3, NULL), (4, 'abide')") - insert.collect() - val select = spark.sql("select * from source_table order by id") - val runRows = select.collect() - assert(select.queryExecution.executedPlan.toString().contains("OmniColumnarProject"), - "use columnar data writing command") - val columnarDataWrite = select.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] - .find({ - case _: ProjectExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val expectedRows = Seq(Row(1, "Bob "), Row(2, "測試中文 "), Row(3, null), Row(4, "abide")) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index e8af6bb75760fba0a066aad1f8ded977152435bb..e1d9064d3cfa9a02b8d70090e164486107b7f99a 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ColumnarBroa import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarFilterExec, ColumnarProjectExec, ColumnarTakeOrderedAndProjectExec, CommandResultExec, LeafExecNode, OmniColumnarToRowExec, ProjectExec, RowToOmniColumnarExec, SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.execution.aggregate.HashAggregateExec import scala.concurrent.Future @@ -204,58 +203,4 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { Row(13, "6884578", 6, null, null)) assert(QueryTest.sameRows(runRowsNP, expectedRowsNP).isEmpty, "the run value is error") } - - test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } - - test("Unsupported StaticInvoke function readSidePadding") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name char(5)) using parquet") - createTable.collect() - val insert = spark.sql("insert into table source_table values" + - "(1, 'Bob'), (2, '測試中文'), (3, NULL), (4, 'abide')") - insert.collect() - val select = spark.sql("select * from source_table order by id") - val runRows = select.collect() - assert(select.queryExecution.executedPlan.toString().contains("OmniColumnarProject"), - "use columnar data writing command") - val columnarDataWrite = select.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] - .find({ - case _: ProjectExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val expectedRows = Seq(Row(1, "Bob "), Row(2, "測試中文 "), Row(3, null), Row(4, "abide")) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } }