diff --git a/omnioperator/omniop-spark-extension/java/pom.xml b/omnioperator/omniop-spark-extension/java/pom.xml index 3bf7ed7a51c7fe380a111a5fd581e12834456651..c2a2c82f470637c29712c22d8c27fbb3a954fb60 100644 --- a/omnioperator/omniop-spark-extension/java/pom.xml +++ b/omnioperator/omniop-spark-extension/java/pom.xml @@ -98,7 +98,7 @@ org.apache.spark spark-hive_${scala.binary.version} 3.1.1 - test + provided diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala index f3d1334f39e6e87ada0ea699355383b7ba8cda80..c13aee942768a7e452813b254671e8b578344369 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala @@ -155,6 +155,8 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging { val maxRowCount = conf.getConfString("spark.sql.columnar.maxRowCount", "20000").toInt + val enableColumnarUdf: Boolean = conf.getConfString("spark.omni.sql.columnar.udf", "true").toBoolean + val enableOmniExpCheck : Boolean = conf.getConfString("spark.omni.sql.omniExp.check", "true").toBoolean } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index 91421657bb494eaa6916e0450786c3dbe54ebeb4..1788852baa489c4de30287d5d21a4df107fe85f9 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -27,13 +27,16 @@ import nova.hetu.omniruntime.constants.FunctionType.{OMNI_AGGREGATION_TYPE_AVG, import nova.hetu.omniruntime.constants.JoinType._ import nova.hetu.omniruntime.operator.OmniExprVerify +import com.huawei.boostkit.spark.ColumnarPluginConfig import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.util.CharVarcharUtils.getRawTypeString +import org.apache.spark.sql.hive.HiveUdfAdaptorUtil import org.apache.spark.sql.types.{BooleanType, DataType, DateType, Decimal, DecimalType, DoubleType, IntegerType, LongType, Metadata, ShortType, StringType} +import java.util.Locale import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -540,10 +543,33 @@ object OmniExpressionAdaptor extends Logging { getConcatJsonStr(concat, exprsIndexMap) case attr: Attribute => toOmniJsonAttribute(attr, exprsIndexMap(attr.exprId)) case _ => + if (HiveUdfAdaptorUtil.isHiveUdf(expr) && ColumnarPluginConfig.getSessionConf.enableColumnarUdf) { + val hiveUdf = HiveUdfAdaptorUtil.asHiveSimpleUDF(expr) + val nameSplit = hiveUdf.name.split("\\.") + val udfName = if (nameSplit.size == 1) nameSplit(0).toLowerCase(Locale.ROOT) else nameSplit(1).toLowerCase(Locale.ROOT) + return ("{\"exprType\":\"FUNCTION\",\"returnType\":%s,\"function_name\":\"%s\"," + + "\"arguments\":[%s]}").format(sparkTypeToOmniExpJsonType(hiveUdf.dataType), udfName, + getJsonExprArgumentsByChildren(hiveUdf.children, exprsIndexMap)) + } throw new UnsupportedOperationException(s"Unsupported expression: $expr") } } + private def getJsonExprArgumentsByChildren(children: Seq[Expression], + exprsIndexMap: Map[ExprId, Int]): String = { + val size = children.size + val stringBuild = new mutable.StringBuilder + if (size == 0) { + return stringBuild.toString() + } + for (i <- 0 until size - 1) { + stringBuild.append(rewriteToOmniJsonExpressionLiteral(children(i), exprsIndexMap)) + stringBuild.append(",") + } + stringBuild.append(rewriteToOmniJsonExpressionLiteral(children(size - 1), exprsIndexMap)) + stringBuild.toString() + } + private def checkInputDataTypes(children: Seq[Expression]): Unit = { val childTypes = children.map(_.dataType) for (dataType <- childTypes) { diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/HiveUdfAdaptorUtil.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/HiveUdfAdaptorUtil.scala new file mode 100644 index 0000000000000000000000000000000000000000..f9c416136cd17f3b09324607304ee22c8f53ff9e --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/HiveUdfAdaptorUtil.scala @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.expressions.Expression + +object HiveUdfAdaptorUtil { + def isHiveUdf(expr: Expression): Boolean = { + expr.isInstanceOf[HiveSimpleUDF] + } + + def asHiveSimpleUDF(expr: Expression): HiveSimpleUDF = { + expr.asInstanceOf[HiveSimpleUDF] + } +} diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarUdfFuncSqlSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarUdfFuncSqlSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..0a0e258e190e248b3291ac4e2a0db6731aab3973 --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarUdfFuncSqlSuite.scala @@ -0,0 +1,258 @@ +/* + * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.forsql + +import org.apache.spark.sql.execution.{ColumnarProjectExec, ColumnarSparkPlanTest, ProjectExec} +import org.apache.spark.sql.Row + +class ColumnarUdfFuncSqlSuite extends ColumnarSparkPlanTest { + import testImplicits.{localSeqToDatasetHolder, newProductEncoder} + + protected override def beforeAll(): Unit = { + super.beforeAll() + val dataFrame = Seq[(java.lang.Long, java.lang.Long, java.lang.Integer, java.lang.Integer, String, String, String, String, java.lang.Double, java.lang.Double)]( + (1001L, 8001L, 1, null, "中文1", "string100_normal", "", null, 1.0D, null), + (1002L, null, 2, 20, "中文2", "string200_normal", " string_empty2", null, 2.0D, 20.0D), + (1003L, null, 3, 30, "中文3", "string300_normal", "string_empty3", "string_null3", 3.0D, 30.0D), + (1004L, 8004L, 4, null, "中文4", "string400_normal", "string_empty3", "string_null4", 4.0D, 40.0D) + ).toDF("long_normal", "long_null", "int_normal", "int_null", + "ch_col", "string_normal", "string_empty", "string_null", "double_normal", "double_null") + dataFrame.createOrReplaceTempView("test_table") + } + + // literal string param + test("Test ColumnarProjectExec happen and result is correct when execute " + + "RmvDupstrWithSeq(string_param, string_param)") { + val res = spark.sql("select RmvDupstrWithSeq('as,de,frd,s,de',',','&')") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq(Row("as&de&fed&s")) + ) + } + + // literal string & int param + test("Test ColumnarProjectExec happen and result is correct when execute " + + "SplitStr(string_param, string_param, int)") { + val res = spark.sql("select SplitStr('gf-dh|232|sdaASF','\\|',1)") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq(Row("gf-dj")) + ) + } + + // literal long param + test("Test ColumnarProjectExec happen and result is correct when execute " + + "LongToIp(long_param)") { + val res = spark.sql("select LongToIp(19216811) from test_table") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq( + Row("1.37.57.171") + ) + ) + } + + // literal double param + test("Test ColumnarProjectExec happen and result is correct when execute " + + "LinkRelativeRatio(double_param, double_param, int_param)") { + val res = spark.sql("select LinkRelativeRatio(1.1, 2.3, 5)") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq(Row("1.09091")) + ) + } + + // string param + test("Test ColumnarProjectExec happen and result is correct when execute " + + "UrlDecoder(string_normal)") { + val res = spark.sql("select UrlDecoder(string_normal) from test_table") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq( + Row("string100_normal"), + Row("string200_normal"), + Row("string300_normal"), + Row("string400_normal") + ) + ) + } + + test("Test ColumnarProjectExec happen and result is correct when execute " + + "UrlDecoder(string_null)") { + val res = spark.sql("select UrlDecoder(string_null) from test_table") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq( + Row(null), + Row(null), + Row("string_null3"), + Row("string_null4") + ) + ) + } + + test("Test ColumnarProjectExec happen and result is correct when execute " + + "UrlDecoder(string_empty)") { + val res = spark.sql("select UrlDecoder(string_empty) from test_table") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq( + Row(null), + Row(" string_empty2"), + Row("string_empty3"), + Row("string_empty4") + ) + ) + } + + // int param + test("Test ColumnarProjectExec happen and result is correct when execute " + + "DateUtil(int_normal)") { + val res = spark.sql("select DateUtil(int_normal) from test_table") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq( + Row("00:00:01"), + Row("00:00:02"), + Row("00:00:03"), + Row("00:00:04") + ) + ) + } + + test("Test ColumnarProjectExec happen and result is correct when execute " + + "DateUtil(int_null)") { + val res = spark.sql("select DateUtil(int_null) from test_table") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq( + Row(null), + Row("00:00:20"), + Row("00:00:30"), + Row(null) + ) + ) + } + + // long param + test("Test ColumnarProjectExec happen and result is correct when execute " + + "LongToIp(long_normal)") { + val res = spark.sql("select LongToIp(long_normal) from test_table") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq( + Row("0.0.3.233"), + Row("0.0.3.234"), + Row("0.0.3.235"), + Row("0.0.3.236") + ) + ) + } + + test("Test ColumnarProjectExec happen and result is correct when execute " + + "LongToIp(long_null)") { + val res = spark.sql("select LongToIp(long_null) from test_table") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq( + Row("0.0.3.65"), + Row("-999"), + Row("-999"), + Row("0.0.31.68") + ) + ) + } + + // double param + test("Test ColumnarProjectExec happen and result is correct when execute " + + "FeeToUSDUDF(double_normal, double_normal)") { + val res = spark.sql("select FeeToUSDUDF(double_normal, double_normal) from test_table") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq( + Row(1.0), + Row(4.0), + Row(9.0), + Row(16.0) + ) + ) + } + + // unsupported param type + test("Test ColumnarProjectExec rollback and result is correct when execute " + + "ContainsAny(ArrayList)") { + val res = spark.sql("select ContainsAny(split('a1, b2, c1', ',')), array('c1', 'd1', 'e1')") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isEmpty, s"ColumnarProjectExec happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isDefined, s"ProjectExec not happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq(Row(true)) + ) + } + + // unsupported return type: Map + test("Test ColumnarProjectExec rollback and result is correct when execute " + + "GetHonorTimeAndVersions(string_param)") { + val res = spark.sql("select GetHonorTimeAndVersions('20151001')") + val executedPlan = res.queryExecution.executedPlan + assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isEmpty, s"ColumnarProjectExec happened, executedPlan as follows: \n$executedPlan") + assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isDefined, s"ProjectExec not happened, executedPlan as follows: \n$executedPlan") + checkAnswer( + res, + Seq(Row("{\"date0\":\"2015-09-01\",\"date1\":\"2015-09-16\",\"new_version\":\"5.10.1\"}")) + ) + } +} \ No newline at end of file