diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala index d1f911a8cde2a7361c3d23a3877ba7940f27a883..477e0752aa6878829a45cf69516a45ff022f8031 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/expression/OmniExpressionAdaptor.scala @@ -100,6 +100,31 @@ object OmniExpressionAdaptor extends Logging { } + private val timeFormatSet: Set[String] = Set("yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd") + + private def unsupportedTimeFormatCheck(timeFormat: String): Unit = { + if (!timeFormatSet.contains(timeFormat)) { + throw new UnsupportedOperationException(s"Unsupported Time Format: $timeFormat") + } + } + + private val timeZoneSet: Set[String] = Set("GMT+08:00", "Asia/Beijing", "Asia/Shanghai") + + private def unsupportedTimeZoneCheck(timeZone: String): Unit = { + if (!timeZoneSet.contains(timeZone)) { + throw new UnsupportedOperationException(s"Unsupported Time Zone: $timeZone") + } + } + + def toOmniTimeFormat(format: String): String = { + format.replace("yyyy", "%Y") + .replace("MM", "%m") + .replace("dd", "%d") + .replace("HH", "%H") + .replace("mm", "%M") + .replace("ss", "%S") + } + def rewriteToOmniJsonExpressionLiteral(expr: Expression, exprsIndexMap: Map[ExprId, Int]): String = { rewriteToOmniJsonExpressionLiteral(expr, exprsIndexMap, expr.dataType) @@ -450,6 +475,29 @@ object OmniExpressionAdaptor extends Logging { case knownFloatingPointNormalized: KnownFloatingPointNormalized => rewriteToOmniJsonExpressionLiteralJsonObject(knownFloatingPointNormalized.child, exprsIndexMap) + // for date time functions + case unixTimestamp: UnixTimestamp => + val timeZone = unixTimestamp.timeZoneId.getOrElse("None") + unsupportedTimeZoneCheck(timeZone) + unsupportedTimeFormatCheck(unixTimestamp.format.toString) + new JSONObject().put("exprType", "FUNCTION") + .addOmniExpJsonType("returnType", unixTimestamp.dataType) + .put("function_name", "unix_timestamp") + .put("arguments", new JSONArray().put(rewriteToOmniJsonExpressionLiteralJsonObject(unixTimestamp.timeExp, exprsIndexMap)) + .put(new JSONObject(toOmniTimeFormat(rewriteToOmniJsonExpressionLiteral(unixTimestamp.format, exprsIndexMap)))) + .put(new JSONObject("{\"exprType\":\"LITERAL\",\"dataType\":15,\"isNull\":false,\"value\":\""+timeZone+"\",\"width\":"+timeZone.length+"}"))) + + case fromUnixTime: FromUnixTime => + val timeZone = fromUnixTime.timeZoneId.getOrElse("None") + unsupportedTimeZoneCheck(timeZone) + unsupportedTimeFormatCheck(fromUnixTime.format.toString) + new JSONObject().put("exprType", "FUNCTION") + .addOmniExpJsonType("returnType", fromUnixTime.dataType) + .put("function_name", "from_unixtime") + .put("arguments", new JSONArray().put(rewriteToOmniJsonExpressionLiteralJsonObject(fromUnixTime.sec, exprsIndexMap)) + .put(new JSONObject(toOmniTimeFormat(rewriteToOmniJsonExpressionLiteral(fromUnixTime.format, exprsIndexMap)))) + .put(new JSONObject("{\"exprType\":\"LITERAL\",\"dataType\":15,\"isNull\":false,\"value\":\""+timeZone+"\",\"width\":"+timeZone.length+"}"))) + // for like case startsWith: StartsWith => startsWith.right match { diff --git a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala index 467ad35cef0c97162ad4567cafdfa8c80670d5ba..f2a6407a122460373b36e93273d87af801bbec63 100644 --- a/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala +++ b/omnioperator/omniop-spark-extension/java/src/test/scala/org/apache/spark/sql/catalyst/expressions/ColumnarFuncSuite.scala @@ -47,6 +47,12 @@ class ColumnarFuncSuite extends ColumnarSparkPlanTest { (50, 50L, 50.0, true, "50", Decimal(50.00)) ).toDF("int_column", "long_column", "double_column", "bool_column", "str_column", "decimal_column") .createOrReplaceTempView("greatest_view") + + Seq[(java.lang.Integer, String, String)]( + (1, "1945-02-08", "1945-02-08 21:53:20"), + (2, "2008-08-02", "2008-08-02 11:54:53"), + (3, "2056-08-04", "2056-08-04 16:47:07") + ).toDF("id", "ds1", "ds2").createOrReplaceTempView("unix_timestamp_and_from_unixtime_view") } test("Test Contains Function") { @@ -110,6 +116,22 @@ class ColumnarFuncSuite extends ColumnarSparkPlanTest { assertOmniProjectNotHappened(rollbackRes) } + test("Test Unix_timestamp Function") { + var res1 = spark.sql("select unix_timestamp(cast(ds1 as date)) from unix_timestamp_and_from_unixtime_view order by id") + assertOmniProjectHappened(res1) + checkAnswer(res1, Seq(Row(-785667600L),Row(1217606400L),Row(2732544000L))) + + var res2 = spark.sql("select unix_timestamp(ds2) from unix_timestamp_and_from_unixtime_view order by id") + assertOmniProjectHappened(res2) + checkAnswer(res2, Seq(Row(-785588800L),Row(1217649293L),Row(2732604427L))) + } + + test("Test Unix_timestamp Function") { + var res = spark.sql("select from_unixtime(unix_timestamp(ds2)) from unix_timestamp_and_from_unixtime_view order by id") + assertOmniProjectHappened(res) + checkAnswer(res, Seq(Row("1945-02-08 21:53:20"),Row("2008-08-02 11:54:53"),Row("2056-08-04 16:47:07"))) + } + private def assertOmniProjectHappened(res: DataFrame) = { val executedPlan = res.queryExecution.executedPlan assert(executedPlan.find(_.isInstanceOf[ColumnarProjectExec]).isDefined, s"ColumnarProjectExec not happened, executedPlan as follows: \n$executedPlan")