From d42d9f51817ca4f008e3ad1cb4c40124f8880be2 Mon Sep 17 00:00:00 2001 From: hyy_cyan Date: Fri, 9 May 2025 06:27:27 +0000 Subject: [PATCH] =?UTF-8?q?=E5=9B=9E=E9=80=80=20'Pull=20Request=20!1167=20?= =?UTF-8?q?:=20support=20staticInvoke'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apache/spark/sql/util/ModifyUtil.scala | 24 +------- .../apache/spark/sql/util/ModifyUtil.scala | 20 ------- .../apache/spark/sql/util/ModifyUtil.scala | 33 ----------- .../apache/spark/sql/util/ModifyUtil.scala | 32 ----------- .../spark/TableWriteBasicFunctionSuite.scala | 32 ----------- .../spark/TableWriteBasicFunctionSuite.scala | 32 ----------- .../spark/TableWriteBasicFunctionSuite.scala | 55 ------------------- .../spark/TableWriteBasicFunctionSuite.scala | 55 ------------------- 8 files changed, 1 insertion(+), 282 deletions(-) diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 36c9ca1e1..9d6424043 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark32-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -18,17 +18,15 @@ package org.apache.spark.sql.util -import com.huawei.boostkit.spark.expression.OmniExpressionAdaptor.{JsonObjectExtension, JsonArrayExtension} import com.huawei.boostkit.spark.util.ModifyUtilAdaptor import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{ExprId, Expression, PromotePrecision} -import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{CombineJoinedAggregates, MergeSubqueryFilters} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.{ColumnarToRowExec, OmniColumnarToRowExec, SparkPlan} import org.apache.spark.sql.types.DataType -import com.google.gson.{JsonArray, JsonObject} +import com.google.gson.JsonObject object ModifyUtil extends Logging { @@ -39,26 +37,6 @@ object ModifyUtil extends Logging { expr match { case promotePrecision: PromotePrecision => func(promotePrecision.child, exprsIndexMap, promotePrecision.child.dataType) - - case staticInvoke: StaticInvoke => - { - val funcName = staticInvoke.functionName - funcName match { - case "varcharTypeWriteSideCheck" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("function_name", "StaticInvokeVarcharTypeWriteSideCheck") - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case _ => throw new UnsupportedOperationException(s"StaticInvoke function: $funcName is not supported currently"); - } - } - case _ => null } diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 2b94582c4..1e3104793 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark33-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -23,7 +23,6 @@ import com.huawei.boostkit.spark.util.ModifyUtilAdaptor import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, ExprId, Expression, PromotePrecision} -import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{CombineJoinedAggregates, MergeSubqueryFilters} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.{BroadcastExchangeExecProxy, ColumnarBloomFilterSubquery, ColumnarDataWritingCommandExec, ColumnarToRowExec, OmniColumnarToRowExec, SparkPlan} @@ -69,25 +68,6 @@ object ModifyUtil extends Logging { .put("dataType", 2) .put("value", bfAddress) - case staticInvoke: StaticInvoke => - { - val funcName = staticInvoke.functionName - funcName match { - case "varcharTypeWriteSideCheck" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("function_name", "StaticInvokeVarcharTypeWriteSideCheck") - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case _ => throw new UnsupportedOperationException(s"StaticInvoke function: $funcName is not supported currently"); - } - } - case _ => null } diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 995555e52..90875c2c2 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark34-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -23,7 +23,6 @@ import com.huawei.boostkit.spark.util.ModifyUtilAdaptor import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, ExprId, Expression, TryEval} -import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{CombineJoinedAggregates, MergeSubqueryFilters} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.{BroadcastExchangeExecProxy, ColumnarBloomFilterSubquery, ColumnarToRowExec, OmniColumnarToRowExec, SparkPlan} @@ -61,40 +60,8 @@ object ModifyUtil extends Logging { .put("isNull", bfAddress == 0L) .put("dataType", 2) .put("value", bfAddress) - case tryEval: TryEval => func(tryEval.child, exprsIndexMap, returnDatatype) - - case staticInvoke: StaticInvoke => - { - val funcName = staticInvoke.functionName - funcName match { - case "readSidePadding" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .put("function_name", "StaticInvokeCharReadPadding") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case "varcharTypeWriteSideCheck" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("function_name", "StaticInvokeVarcharTypeWriteSideCheck") - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case _ => throw new UnsupportedOperationException(s"StaticInvoke function: $funcName is not supported currently") - } - } - case _ => null } diff --git a/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala b/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala index 6a06bca01..3ba2d8277 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-shims/spark35-modify/src/main/scala/org/apache/spark/sql/util/ModifyUtil.scala @@ -23,7 +23,6 @@ import com.huawei.boostkit.spark.util.ModifyUtilAdaptor import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, ExprId, Expression, TryEval} -import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{CombineJoinedAggregates, MergeSubqueryFilters} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.{BroadcastExchangeExecProxy, ColumnarBloomFilterSubquery, ColumnarToRowExec, OmniColumnarToRowExec, SparkPlan} @@ -66,37 +65,6 @@ object ModifyUtil extends Logging { .put("value", bfAddress) case tryEval: TryEval => func(tryEval.child, exprsIndexMap, returnDatatype) - - case staticInvoke: StaticInvoke => - { - val funcName = staticInvoke.functionName - funcName match { - case "readSidePadding" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .put("function_name", "StaticInvokeCharReadPadding") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case "varcharTypeWriteSideCheck" => { - val arg0 = staticInvoke.arguments(0) - val arg1 = staticInvoke.arguments(1) - new JsonObject().put("exprType", "FUNCTION") - .addOmniExpJsonType("returnType", staticInvoke.dataType) - .put("function_name", "StaticInvokeVarcharTypeWriteSideCheck") - .put("arguments", new JsonArray() - .put(func(arg0, exprsIndexMap, arg0.dataType)) - .put(func(arg1, exprsIndexMap, arg1.dataType))) - } - - case _ => throw new UnsupportedOperationException(s"StaticInvoke function: $funcName is not supported currently") - } - } - case _ => null } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index eba3d4d47..1d26660aa 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark32-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarFi import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.execution.ColumnarDataWritingCommandExec -import org.apache.spark.sql.execution.aggregate.HashAggregateExec import scala.concurrent.Future @@ -301,35 +300,4 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { Row(13, "6884578", 6, null, null)) assert(QueryTest.sameRows(runRowsNP, expectedRowsNP).isEmpty, "the run value is error") } - - test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index 085b7ded4..03b019e6c 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarFi import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.execution.ColumnarDataWritingCommandExec -import org.apache.spark.sql.execution.aggregate.HashAggregateExec import scala.concurrent.Future @@ -302,35 +301,4 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { Row(13, "6884578", 6, null, null)) assert(QueryTest.sameRows(runRowsNP, expectedRowsNP).isEmpty, "the run value is error") } - - test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index e8af6bb75..e1d9064d3 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ColumnarBroa import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarFilterExec, ColumnarProjectExec, ColumnarTakeOrderedAndProjectExec, CommandResultExec, LeafExecNode, OmniColumnarToRowExec, ProjectExec, RowToOmniColumnarExec, SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.execution.aggregate.HashAggregateExec import scala.concurrent.Future @@ -204,58 +203,4 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { Row(13, "6884578", 6, null, null)) assert(QueryTest.sameRows(runRowsNP, expectedRowsNP).isEmpty, "the run value is error") } - - test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } - - test("Unsupported StaticInvoke function readSidePadding") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name char(5)) using parquet") - createTable.collect() - val insert = spark.sql("insert into table source_table values" + - "(1, 'Bob'), (2, '測試中文'), (3, NULL), (4, 'abide')") - insert.collect() - val select = spark.sql("select * from source_table order by id") - val runRows = select.collect() - assert(select.queryExecution.executedPlan.toString().contains("OmniColumnarProject"), - "use columnar data writing command") - val columnarDataWrite = select.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] - .find({ - case _: ProjectExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val expectedRows = Seq(Row(1, "Bob "), Row(2, "測試中文 "), Row(3, null), Row(4, "abide")) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } } diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala index e8af6bb75..e1d9064d3 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/com/huawei/boostkit/spark/TableWriteBasicFunctionSuite.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ColumnarBroa import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, ColumnarFilterExec, ColumnarProjectExec, ColumnarTakeOrderedAndProjectExec, CommandResultExec, LeafExecNode, OmniColumnarToRowExec, ProjectExec, RowToOmniColumnarExec, SparkPlan, TakeOrderedAndProjectExec, UnaryExecNode} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.execution.aggregate.HashAggregateExec import scala.concurrent.Future @@ -204,58 +203,4 @@ class TableWriteBasicFunctionSuite extends QueryTest with SharedSparkSession { Row(13, "6884578", 6, null, null)) assert(QueryTest.sameRows(runRowsNP, expectedRowsNP).isEmpty, "the run value is error") } - - test("Unsupported StaticInvoke function varcharTypeWriteSideCheck") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name string, amount int) using parquet") - createTable.collect() - val dropNP = spark.sql("drop table if exists target_table") - dropNP.collect() - val createTableNP = spark.sql("create table target_table" + - "(name varchar(5), total_amount long) using parquet") - createTableNP.collect() - var insert = spark.sql("insert into table source_table values" + - "(1, 'Bob', 250), (2, '測試中文', 250), (3, NULL, 250), (4, 'abide', 250)") - insert.collect() - insert = spark.sql("insert into table target_table select UPPER(name) as name, SUM(amount) as total_amount from " + - "source_table where amount >= 10 GROUP BY UPPER(name)") - insert.collect() - assert(insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.toString().contains("OmniColumnarHashAggregate"), "use columnar data writing command") - val columnarDataWrite = insert.queryExecution.executedPlan.asInstanceOf[CommandResultExec] - .commandPhysicalPlan.find({ - case _: HashAggregateExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val select = spark.sql("select * from target_table order by name") - val runRows = select.collect() - val expectedRows = Seq(Row(null, 250), Row("ABIDE", 250), Row("BOB", 250), Row("測試中文", 250)) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } - - test("Unsupported StaticInvoke function readSidePadding") { - val drop = spark.sql("drop table if exists source_table") - drop.collect() - val createTable = spark.sql("create table source_table" + - "(id int, name char(5)) using parquet") - createTable.collect() - val insert = spark.sql("insert into table source_table values" + - "(1, 'Bob'), (2, '測試中文'), (3, NULL), (4, 'abide')") - insert.collect() - val select = spark.sql("select * from source_table order by id") - val runRows = select.collect() - assert(select.queryExecution.executedPlan.toString().contains("OmniColumnarProject"), - "use columnar data writing command") - val columnarDataWrite = select.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] - .find({ - case _: ProjectExec => true - case _ => false - }) - assert(columnarDataWrite.isEmpty, "use columnar data writing command") - val expectedRows = Seq(Row(1, "Bob "), Row(2, "測試中文 "), Row(3, null), Row(4, "abide")) - assert(QueryTest.sameRows(runRows, expectedRows).isEmpty, "the run value is error") - } } -- Gitee