From 343835be4bc6daa96bb489c5e317c858b22fcbe3 Mon Sep 17 00:00:00 2001
From: chenyidao <979136761@qq.com>
Date: Thu, 18 Aug 2022 15:13:44 +0800
Subject: [PATCH] spark support hive udf frame
---
.../omniop-spark-extension/java/pom.xml | 2 +-
.../boostkit/spark/ColumnarPluginConfig.scala | 2 +
.../expression/OmniExpressionAdaptor.scala | 26 ++
.../spark/sql/hive/HiveUdfAdaptorUtil.scala | 31 +++
.../forsql/ColumnarUdfFuncSqlSuite.scala | 258 ++++++++++++++++++
5 files changed, 318 insertions(+), 1 deletion(-)
create mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/HiveUdfAdaptorUtil.scala
create mode 100644 omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarUdfFuncSqlSuite.scala
diff --git a/omnioperator/omniop-spark-extension/java/pom.xml b/omnioperator/omniop-spark-extension/java/pom.xml
index 3bf7ed7a5..c2a2c82f4 100644
--- a/omnioperator/omniop-spark-extension/java/pom.xml
+++ b/omnioperator/omniop-spark-extension/java/pom.xml
@@ -98,7 +98,7 @@
org.apache.spark
spark-hive_${scala.binary.version}
3.1.1
- test
+ provided
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala
index f3d1334f3..c13aee942 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPluginConfig.scala
@@ -155,6 +155,8 @@ class ColumnarPluginConfig(conf: SQLConf) extends Logging {
val maxRowCount =
conf.getConfString("spark.sql.columnar.maxRowCount", "20000").toInt
+ val enableColumnarUdf: Boolean = conf.getConfString("spark.omni.sql.columnar.udf", "true").toBoolean
+
val enableOmniExpCheck : Boolean = conf.getConfString("spark.omni.sql.omniExp.check", "true").toBoolean
}
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala
index 91421657b..1788852ba 100644
--- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala
@@ -27,13 +27,16 @@ import nova.hetu.omniruntime.constants.FunctionType.{OMNI_AGGREGATION_TYPE_AVG,
import nova.hetu.omniruntime.constants.JoinType._
import nova.hetu.omniruntime.operator.OmniExprVerify
+import com.huawei.boostkit.spark.ColumnarPluginConfig
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils.getRawTypeString
+import org.apache.spark.sql.hive.HiveUdfAdaptorUtil
import org.apache.spark.sql.types.{BooleanType, DataType, DateType, Decimal, DecimalType, DoubleType, IntegerType, LongType, Metadata, ShortType, StringType}
+import java.util.Locale
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -540,10 +543,33 @@ object OmniExpressionAdaptor extends Logging {
getConcatJsonStr(concat, exprsIndexMap)
case attr: Attribute => toOmniJsonAttribute(attr, exprsIndexMap(attr.exprId))
case _ =>
+ if (HiveUdfAdaptorUtil.isHiveUdf(expr) && ColumnarPluginConfig.getSessionConf.enableColumnarUdf) {
+ val hiveUdf = HiveUdfAdaptorUtil.asHiveSimpleUDF(expr)
+ val nameSplit = hiveUdf.name.split("\\.")
+ val udfName = if (nameSplit.size == 1) nameSplit(0).toLowerCase(Locale.ROOT) else nameSplit(1).toLowerCase(Locale.ROOT)
+ return ("{\"exprType\":\"FUNCTION\",\"returnType\":%s,\"function_name\":\"%s\"," +
+ "\"arguments\":[%s]}").format(sparkTypeToOmniExpJsonType(hiveUdf.dataType), udfName,
+ getJsonExprArgumentsByChildren(hiveUdf.children, exprsIndexMap))
+ }
throw new UnsupportedOperationException(s"Unsupported expression: $expr")
}
}
+ private def getJsonExprArgumentsByChildren(children: Seq[Expression],
+ exprsIndexMap: Map[ExprId, Int]): String = {
+ val size = children.size
+ val stringBuild = new mutable.StringBuilder
+ if (size == 0) {
+ return stringBuild.toString()
+ }
+ for (i <- 0 until size - 1) {
+ stringBuild.append(rewriteToOmniJsonExpressionLiteral(children(i), exprsIndexMap))
+ stringBuild.append(",")
+ }
+ stringBuild.append(rewriteToOmniJsonExpressionLiteral(children(size - 1), exprsIndexMap))
+ stringBuild.toString()
+ }
+
private def checkInputDataTypes(children: Seq[Expression]): Unit = {
val childTypes = children.map(_.dataType)
for (dataType <- childTypes) {
diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/HiveUdfAdaptorUtil.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/HiveUdfAdaptorUtil.scala
new file mode 100644
index 000000000..f9c416136
--- /dev/null
+++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/hive/HiveUdfAdaptorUtil.scala
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+object HiveUdfAdaptorUtil {
+ def isHiveUdf(expr: Expression): Boolean = {
+ expr.isInstanceOf[HiveSimpleUDF]
+ }
+
+ def asHiveSimpleUDF(expr: Expression): HiveSimpleUDF = {
+ expr.asInstanceOf[HiveSimpleUDF]
+ }
+}
diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarUdfFuncSqlSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarUdfFuncSqlSuite.scala
new file mode 100644
index 000000000..0a0e258e1
--- /dev/null
+++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/execution/forsql/ColumnarUdfFuncSqlSuite.scala
@@ -0,0 +1,258 @@
+/*
+ * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.forsql
+
+import org.apache.spark.sql.execution.{ColumnarProjectExec, ColumnarSparkPlanTest, ProjectExec}
+import org.apache.spark.sql.Row
+
+class ColumnarUdfFuncSqlSuite extends ColumnarSparkPlanTest {
+ import testImplicits.{localSeqToDatasetHolder, newProductEncoder}
+
+ protected override def beforeAll(): Unit = {
+ super.beforeAll()
+ val dataFrame = Seq[(java.lang.Long, java.lang.Long, java.lang.Integer, java.lang.Integer, String, String, String, String, java.lang.Double, java.lang.Double)](
+ (1001L, 8001L, 1, null, "中文1", "string100_normal", "", null, 1.0D, null),
+ (1002L, null, 2, 20, "中文2", "string200_normal", " string_empty2", null, 2.0D, 20.0D),
+ (1003L, null, 3, 30, "中文3", "string300_normal", "string_empty3", "string_null3", 3.0D, 30.0D),
+ (1004L, 8004L, 4, null, "中文4", "string400_normal", "string_empty3", "string_null4", 4.0D, 40.0D)
+ ).toDF("long_normal", "long_null", "int_normal", "int_null",
+ "ch_col", "string_normal", "string_empty", "string_null", "double_normal", "double_null")
+ dataFrame.createOrReplaceTempView("test_table")
+ }
+
+ // literal string param
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "RmvDupstrWithSeq(string_param, string_param)") {
+ val res = spark.sql("select RmvDupstrWithSeq('as,de,frd,s,de',',','&')")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(Row("as&de&fed&s"))
+ )
+ }
+
+ // literal string & int param
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "SplitStr(string_param, string_param, int)") {
+ val res = spark.sql("select SplitStr('gf-dh|232|sdaASF','\\|',1)")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(Row("gf-dj"))
+ )
+ }
+
+ // literal long param
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "LongToIp(long_param)") {
+ val res = spark.sql("select LongToIp(19216811) from test_table")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(
+ Row("1.37.57.171")
+ )
+ )
+ }
+
+ // literal double param
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "LinkRelativeRatio(double_param, double_param, int_param)") {
+ val res = spark.sql("select LinkRelativeRatio(1.1, 2.3, 5)")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(Row("1.09091"))
+ )
+ }
+
+ // string param
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "UrlDecoder(string_normal)") {
+ val res = spark.sql("select UrlDecoder(string_normal) from test_table")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(
+ Row("string100_normal"),
+ Row("string200_normal"),
+ Row("string300_normal"),
+ Row("string400_normal")
+ )
+ )
+ }
+
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "UrlDecoder(string_null)") {
+ val res = spark.sql("select UrlDecoder(string_null) from test_table")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(
+ Row(null),
+ Row(null),
+ Row("string_null3"),
+ Row("string_null4")
+ )
+ )
+ }
+
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "UrlDecoder(string_empty)") {
+ val res = spark.sql("select UrlDecoder(string_empty) from test_table")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(
+ Row(null),
+ Row(" string_empty2"),
+ Row("string_empty3"),
+ Row("string_empty4")
+ )
+ )
+ }
+
+ // int param
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "DateUtil(int_normal)") {
+ val res = spark.sql("select DateUtil(int_normal) from test_table")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(
+ Row("00:00:01"),
+ Row("00:00:02"),
+ Row("00:00:03"),
+ Row("00:00:04")
+ )
+ )
+ }
+
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "DateUtil(int_null)") {
+ val res = spark.sql("select DateUtil(int_null) from test_table")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(
+ Row(null),
+ Row("00:00:20"),
+ Row("00:00:30"),
+ Row(null)
+ )
+ )
+ }
+
+ // long param
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "LongToIp(long_normal)") {
+ val res = spark.sql("select LongToIp(long_normal) from test_table")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(
+ Row("0.0.3.233"),
+ Row("0.0.3.234"),
+ Row("0.0.3.235"),
+ Row("0.0.3.236")
+ )
+ )
+ }
+
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "LongToIp(long_null)") {
+ val res = spark.sql("select LongToIp(long_null) from test_table")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(
+ Row("0.0.3.65"),
+ Row("-999"),
+ Row("-999"),
+ Row("0.0.31.68")
+ )
+ )
+ }
+
+ // double param
+ test("Test ColumnarProjectExec happen and result is correct when execute " +
+ "FeeToUSDUDF(double_normal, double_normal)") {
+ val res = spark.sql("select FeeToUSDUDF(double_normal, double_normal) from test_table")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isEmpty, s"ProjectExec happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(
+ Row(1.0),
+ Row(4.0),
+ Row(9.0),
+ Row(16.0)
+ )
+ )
+ }
+
+ // unsupported param type
+ test("Test ColumnarProjectExec rollback and result is correct when execute " +
+ "ContainsAny(ArrayList)") {
+ val res = spark.sql("select ContainsAny(split('a1, b2, c1', ',')), array('c1', 'd1', 'e1')")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isEmpty, s"ColumnarProjectExec happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isDefined, s"ProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(Row(true))
+ )
+ }
+
+ // unsupported return type: Map
+ test("Test ColumnarProjectExec rollback and result is correct when execute " +
+ "GetHonorTimeAndVersions(string_param)") {
+ val res = spark.sql("select GetHonorTimeAndVersions('20151001')")
+ val executedPlan = res.queryExecution.executedPlan
+ assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isEmpty, s"ColumnarProjectExec happened, executedPlan as follows: \n$executedPlan")
+ assert(executedPlan.find(_.isInstanceOf[ProjectExec]).isDefined, s"ProjectExec not happened, executedPlan as follows: \n$executedPlan")
+ checkAnswer(
+ res,
+ Seq(Row("{\"date0\":\"2015-09-01\",\"date1\":\"2015-09-16\",\"new_version\":\"5.10.1\"}"))
+ )
+ }
+}
\ No newline at end of file
--
Gitee