From f1e6fa3df0fb2d621396ef0fe8d2efecd757a53a Mon Sep 17 00:00:00 2001 From: panmingyi Date: Tue, 8 Jul 2025 16:46:18 +0800 Subject: [PATCH 1/5] support concat_ws --- .../expression/OmniExpressionAdaptor.scala | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index d19ee1471..09910bf78 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -397,6 +397,10 @@ object OmniExpressionAdaptor extends Logging { case concat: Concat => getConcatJsonStr(concat, exprsIndexMap) + + case concatWs: ConcatWs => + getConcatWsJsonStr(concatWs, exprsIndexMap) + case greatest: Greatest => getGreatestJsonStr(greatest, exprsIndexMap) @@ -582,6 +586,39 @@ object OmniExpressionAdaptor extends Logging { res } + private def getConcatWsJsonStr(concatWs: ConcatWs, exprsIndexMap: Map[ExprId, Int]): JsonObject = { + val children: Seq[Expression] = concatWs.children + checkInputDataTypes(children) + + val separator = rewriteToOmniJsonExpressionLiteralJsonObject(children.head, exprsIndexMap) + val res = new JsonObject().put("exprType", "FUNCTION") + .addOmniExpJsonType("returnType", concatWs.dataType) + .put("function_name", "concatWs") + if (children.length == 1) { + res.put("arguments", new JsonArray().put(separator) + .put(createNullLiteralJson(concatWs.dataType)) + .put(createNullLiteralJson(concatWs.dataType))) + } else if (children.length == 2) { + res.put("arguments", new JsonArray().put(separator) + .put(rewriteToOmniJsonExpressionLiteralJsonObject(children(1), exprsIndexMap)) + .put(createNullLiteralJson(concatWs.dataType))) + } else { + res.put("arguments", new JsonArray().put(separator) + .put(rewriteToOmniJsonExpressionLiteralJsonObject(children(1), exprsIndexMap)) + .put(rewriteToOmniJsonExpressionLiteralJsonObject(children(2), exprsIndexMap))) + for (i <- 3 until children.length) { + val preResJson = new JsonObject().addAll(res) + res.put("arguments", new JsonArray().put(separator).put(preResJson) + .put(rewriteToOmniJsonExpressionLiteralJsonObject(children(i), exprsIndexMap))) + } + } + res + } + + private def createNullLiteralJson(dataType: DataType): JsonObject = { + new JsonObject().put("exprType", "LITERAL").addOmniExpJsonType("dataType", dataType).put("isNull", true) + } + private def getGreatestJsonStr(greatest: Greatest, exprsIndexMap: Map[ExprId, Int]): JsonObject = { if (greatest.children.length != 2) { throw new UnsupportedOperationException(s"Number of parameters is ${greatest.children.length}. " + -- Gitee From e1d003ca80a4f127649c6930b83a80b940160c8f Mon Sep 17 00:00:00 2001 From: panmingyi Date: Tue, 8 Jul 2025 17:00:13 +0800 Subject: [PATCH 2/5] support regexp_replace --- .../spark/expression/OmniExpressionAdaptor.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index 09910bf78..86e5186fb 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -527,6 +527,9 @@ object OmniExpressionAdaptor extends Logging { } } + case regExpReplace: RegExpReplace => + getRegExpReplaceStr(regExpReplace, exprsIndexMap) + case _ => val jsonObj = ModifyUtilAdaptor.rewriteToOmniJsonExpression(expr, exprsIndexMap, returnDatatype, rewriteToOmniJsonExpressionLiteralJsonObject) if (jsonObj != null) { @@ -544,6 +547,18 @@ object OmniExpressionAdaptor extends Logging { } } + private def getRegExpReplaceStr(regExpReplace: RegExpReplace, exprsIndexMap: Map[ExprId, Int]): JsonObject = { + val children: Seq[Expression] = regExpReplace.children + val arguments = new JsonArray() + for (i <- children.indices) { + arguments.put(rewriteToOmniJsonExpressionLiteralJsonObject(children(i), exprsIndexMap)) + } + new JsonObject().put("exprType", "FUNCTION") + .addOmniExpJsonType("returnType", regExpReplace.dataType) + .put("function_name", "regexpReplace") + .put("arguments", arguments) + } + private def getJsonExprArgumentsByChildren(children: Seq[Expression], exprsIndexMap: Map[ExprId, Int]): JsonArray = { val size = children.size -- Gitee From ac12b4077cf5095b4fb0dd9fe68fc3107e4c2118 Mon Sep 17 00:00:00 2001 From: panmingyi Date: Tue, 8 Jul 2025 17:14:43 +0800 Subject: [PATCH 3/5] support trim --- .../expression/OmniExpressionAdaptor.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index 86e5186fb..f9bdf1317 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -530,6 +530,9 @@ object OmniExpressionAdaptor extends Logging { case regExpReplace: RegExpReplace => getRegExpReplaceStr(regExpReplace, exprsIndexMap) + case trim: StringTrim => + getTrimStr(trim, exprsIndexMap) + case _ => val jsonObj = ModifyUtilAdaptor.rewriteToOmniJsonExpression(expr, exprsIndexMap, returnDatatype, rewriteToOmniJsonExpressionLiteralJsonObject) if (jsonObj != null) { @@ -559,6 +562,22 @@ object OmniExpressionAdaptor extends Logging { .put("arguments", arguments) } + private def getTrimStr(trim: StringTrim, exprsIndexMap: Map[ExprId, Int]): JsonObject = { + val children: Seq[Expression] = trim.children + val arguments = new JsonArray() + arguments.put(rewriteToOmniJsonExpressionLiteralJsonObject(children.head, exprsIndexMap)) + if (children.size == 1) { + arguments.put(new JsonObject().put("exprType", "LITERAL").put("dataType", 15).put("isNull", false) + .put("value", " ").put("width", 1)) + } else { + arguments.put(rewriteToOmniJsonExpressionLiteralJsonObject(children(1), exprsIndexMap)) + } + new JsonObject().put("exprType", "FUNCTION") + .addOmniExpJsonType("returnType", trim.dataType) + .put("function_name", "trim") + .put("arguments", arguments) + } + private def getJsonExprArgumentsByChildren(children: Seq[Expression], exprsIndexMap: Map[ExprId, Int]): JsonArray = { val size = children.size -- Gitee From 1ce1612f725d587ec407fe98e4474f9224888d1f Mon Sep 17 00:00:00 2001 From: panmingyi Date: Fri, 11 Jul 2025 15:58:25 +0800 Subject: [PATCH 4/5] support floor --- .../boostkit/spark/expression/OmniExpressionAdaptor.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index f9bdf1317..3cd2fb382 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -533,6 +533,12 @@ object OmniExpressionAdaptor extends Logging { case trim: StringTrim => getTrimStr(trim, exprsIndexMap) + case floor: Floor => + new JsonObject().put("exprType", "FUNCTION") + .addOmniExpJsonType("returnType", floor.dataType) + .put("function_name", "floor") + .put("arguments", new JsonArray().put(rewriteToOmniJsonExpressionLiteralJsonObject(floor.child, exprsIndexMap))) + case _ => val jsonObj = ModifyUtilAdaptor.rewriteToOmniJsonExpression(expr, exprsIndexMap, returnDatatype, rewriteToOmniJsonExpressionLiteralJsonObject) if (jsonObj != null) { -- Gitee From cfa9c3cd43020c843d84f92bff54ca117732bff3 Mon Sep 17 00:00:00 2001 From: panmingyi Date: Tue, 15 Jul 2025 19:28:44 +0800 Subject: [PATCH 5/5] add concat_ws, regexp, regexp_replace, trim, floor UT --- .../expressions/ColumnarFuncSuite.scala | 55 +++++++++++++++++++ .../expressions/ColumnarFuncSuite.scala | 55 +++++++++++++++++++ .../expressions/ColumnarFuncSuite.scala | 55 +++++++++++++++++++ 3 files changed, 165 insertions(+) diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala index 20c861eea..ac2315444 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark33-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala @@ -150,6 +150,61 @@ class ColumnarFuncSuite extends ColumnarSparkPlanTest { checkAnswer(res3, Seq(Row("2086-08-10 05:05:05", "2086-08-10"))) } + test("Test concat_ws Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select concat_ws('--', 'aaa', 'bbb')") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row("aaa--bbb"))) + + val res2 = spark.sql("select concat_ws('一一', '哈哈哈', '啦啦啦')") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row("哈哈哈一一啦啦啦"))) + } + + test("Test regexp Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select 'hello' regexp 'hel.o' as test1") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row(true))) + + val res2 = spark.sql("select 'aaa' regexp 'a{2,4}' as test2") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row(true))) + } + + test("Test regexp_replace Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select regexp_replace('abcabc', 'a', 'x')") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row("xbcxbc"))) + + val res2 = spark.sql("select regexp_replace('你好世界', '好', '差')") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row("你差世界"))) + } + + test("Test trim Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select trim(' hello ')") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row("hello"))) + + val res2 = spark.sql("select trim(both '空' from '空稀少珍稀空')") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row("稀少珍稀"))) + } + + test("Test floor Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select floor(1.9)") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row(1L))) + + val res2 = spark.sql("select floor(-1.9)") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row(-2L))) + } + private def assertOmniProjectHappened(res: DataFrame) = { val executedPlan = res.queryExecution.executedPlan assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala index 20c861eea..ac2315444 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark34-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala @@ -150,6 +150,61 @@ class ColumnarFuncSuite extends ColumnarSparkPlanTest { checkAnswer(res3, Seq(Row("2086-08-10 05:05:05", "2086-08-10"))) } + test("Test concat_ws Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select concat_ws('--', 'aaa', 'bbb')") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row("aaa--bbb"))) + + val res2 = spark.sql("select concat_ws('一一', '哈哈哈', '啦啦啦')") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row("哈哈哈一一啦啦啦"))) + } + + test("Test regexp Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select 'hello' regexp 'hel.o' as test1") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row(true))) + + val res2 = spark.sql("select 'aaa' regexp 'a{2,4}' as test2") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row(true))) + } + + test("Test regexp_replace Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select regexp_replace('abcabc', 'a', 'x')") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row("xbcxbc"))) + + val res2 = spark.sql("select regexp_replace('你好世界', '好', '差')") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row("你差世界"))) + } + + test("Test trim Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select trim(' hello ')") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row("hello"))) + + val res2 = spark.sql("select trim(both '空' from '空稀少珍稀空')") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row("稀少珍稀"))) + } + + test("Test floor Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select floor(1.9)") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row(1L))) + + val res2 = spark.sql("select floor(-1.9)") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row(-2L))) + } + private def assertOmniProjectHappened(res: DataFrame) = { val executedPlan = res.queryExecution.executedPlan assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") diff --git a/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala b/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala index 20c861eea..ac2315444 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-ut/spark35-ut/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala @@ -150,6 +150,61 @@ class ColumnarFuncSuite extends ColumnarSparkPlanTest { checkAnswer(res3, Seq(Row("2086-08-10 05:05:05", "2086-08-10"))) } + test("Test concat_ws Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select concat_ws('--', 'aaa', 'bbb')") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row("aaa--bbb"))) + + val res2 = spark.sql("select concat_ws('一一', '哈哈哈', '啦啦啦')") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row("哈哈哈一一啦啦啦"))) + } + + test("Test regexp Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select 'hello' regexp 'hel.o' as test1") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row(true))) + + val res2 = spark.sql("select 'aaa' regexp 'a{2,4}' as test2") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row(true))) + } + + test("Test regexp_replace Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select regexp_replace('abcabc', 'a', 'x')") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row("xbcxbc"))) + + val res2 = spark.sql("select regexp_replace('你好世界', '好', '差')") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row("你差世界"))) + } + + test("Test trim Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select trim(' hello ')") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row("hello"))) + + val res2 = spark.sql("select trim(both '空' from '空稀少珍稀空')") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row("稀少珍稀"))) + } + + test("Test floor Function") { + spark.conf.set("spark.sql.optimizer.excludedRules", "org.apache.spark.sql.catalyst.optimizer.ConstantFolding") + val res1 = spark.sql("select floor(1.9)") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row(1L))) + + val res2 = spark.sql("select floor(-1.9)") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row(-2L))) + } + private def assertOmniProjectHappened(res: DataFrame) = { val executedPlan = res.queryExecution.executedPlan assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan") -- Gitee