From 23d44765dc84e47a0cce28f9f11676b2f59a2768 Mon Sep 17 00:00:00 2001
From: x00823442 <1061529620@qq.com>
Date: Mon, 18 Sep 2023 11:12:51 +0800
Subject: [PATCH 1/4] [WIP] optimize: use assembly pom to download jars
---
.../omnidata-spark-connector-lib/README.md | 11 +
.../omnidata-spark-connector-lib/pom.xml | 313 ++++++++++++++++++
.../src/assembly/assembly.xml | 15 +
.../omnidata-spark-connector/spark_build.sh | 36 +-
4 files changed, 344 insertions(+), 31 deletions(-)
create mode 100644 omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/README.md
create mode 100644 omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml
create mode 100644 omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/src/assembly/assembly.xml
diff --git a/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/README.md b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/README.md
new file mode 100644
index 000000000..7c95fcb5a
--- /dev/null
+++ b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/README.md
@@ -0,0 +1,11 @@
+# OmniData Spark Connector Lib
+
+## Building OmniData Spark Connector Lib
+
+1. Simply run the following command from the project root directory:
+`mvn clean package`
+Then you will find jars in the "omnidata-spark-connector-lib/target/" directory.
+
+## More Information
+
+For further assistance, send an email to kunpengcompute@huawei.com.
\ No newline at end of file
diff --git a/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml
new file mode 100644
index 000000000..f46393813
--- /dev/null
+++ b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml
@@ -0,0 +1,313 @@
+
+
+ 4.0.0
+
+ com.huawei.boostkit
+ omnidata-spark-connector-lib
+ pom
+ 1.5.0
+
+
+ 2.12.4
+ 1.2.3
+ 1.6.1
+ 206
+ 2.12.0
+
+
+
+
+ org.bouncycastle
+ bcpkix-jdk15on
+ 1.68
+
+
+ *
+ *
+
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.12.0
+
+
+ it.unimi.dsi
+ fastutil
+ 6.5.9
+
+
+ com.alibaba
+ fastjson
+ 1.2.76
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-guava
+ ${dep.json.version}
+
+
+ *
+ *
+
+
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jdk8
+ ${dep.json.version}
+
+
+ *
+ *
+
+
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-joda
+ ${dep.json.version}
+
+
+ *
+ *
+
+
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ ${dep.json.version}
+
+
+ *
+ *
+
+
+
+
+ com.fasterxml.jackson.module
+ jackson-module-parameter-names
+ ${dep.json.version}
+
+
+ *
+ *
+
+
+
+
+ io.hetu.core
+ presto-spi
+ ${dep.hetu.version}
+
+
+ *
+ *
+
+
+
+
+ io.hetu.core
+ hetu-transport
+ ${dep.hetu.version}
+
+
+ *
+ *
+
+
+
+
+ io.hetu.core
+ presto-parser
+ ${dep.hetu.version}
+
+
+ *
+ *
+
+
+
+
+ io.hetu.core
+ presto-main
+ ${dep.hetu.version}
+
+
+ *
+ *
+
+
+
+
+ io.hetu.core
+ presto-expressions
+ ${dep.hetu.version}
+
+
+ com.google.guava
+ guava
+ 26.0-jre
+
+
+ *
+ *
+
+
+
+
+ io.airlift
+ json
+ ${dep.airlift.version}
+
+
+ *
+ *
+
+
+
+
+ io.airlift
+ slice
+ 0.38
+
+
+ cobugsm.google.code.find
+ jsr305
+
+
+
+
+ io.airlift
+ stats
+ 0.193
+
+
+ cobugsm.google.code.find
+ jsr305
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ org.hdrhistogram
+ HdrHistogram
+
+
+ org.weakref
+ jmxutils
+
+
+
+
+ io.airlift
+ joni
+ 2.1.5.3
+
+
+ io.airlift
+ bytecode
+ 1.2
+
+
+ *
+ *
+
+
+
+
+ org.jasypt
+ jasypt
+ 1.9.3
+
+
+ org.apache.lucene
+ lucene-analyzers-common
+ 7.2.1
+
+
+ *
+ *
+
+
+
+
+ org.apache.curator
+ curator-framework
+ ${dep.curator.version}
+
+
+ com.google.guava
+ guava
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.apache.curator
+ curator-recipes
+ ${dep.curator.version}
+
+
+ io.perfmark
+ perfmark-api
+ 0.23.0
+
+
+ de.ruedigermoeller
+ fst
+ 2.57
+
+
+ org.javassist
+ javassist
+
+
+ org.objenesis
+ objenesis
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.4.0
+
+ src/assembly/assembly.xml
+ false
+ boostkit-omnidata-spark-connector-lib
+
+
+
+ package
+
+ single
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/src/assembly/assembly.xml b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/src/assembly/assembly.xml
new file mode 100644
index 000000000..ccf4481d5
--- /dev/null
+++ b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/src/assembly/assembly.xml
@@ -0,0 +1,15 @@
+
+ bin
+
+ dir
+
+
+
+
+ ./
+ true
+
+
+
\ No newline at end of file
diff --git a/omnidata/omnidata-spark-connector/spark_build.sh b/omnidata/omnidata-spark-connector/spark_build.sh
index bacbfbe75..12c807d8c 100644
--- a/omnidata/omnidata-spark-connector/spark_build.sh
+++ b/omnidata/omnidata-spark-connector/spark_build.sh
@@ -6,36 +6,10 @@ if [ -d "${dir_name}-aarch64" ];then rm -rf ${dir_name}-aarch64; fi
if [ -d "${dir_name}-aarch64.zip" ];then rm -rf ${dir_name}-aarch64.zip; fi
mkdir -p $dir_name-aarch64
cp connector/target/$jar_name $dir_name-aarch64
+cd omnidata-spark-connector-lib/
+mvn clean package
+cd ..
cd $dir_name-aarch64
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/bcpkix-jdk15on/1.68/package/bcpkix-jdk15on-1.68.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/curator-client/2.12.0/package/curator-client-2.12.0.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/curator-framework/2.12.0/package/curator-framework-2.12.0.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/curator-recipes/2.12.0/package/curator-recipes-2.12.0.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/fastjson/1.2.76/package/fastjson-1.2.76.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/fst/2.57/package/fst-2.57.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/guava/26.0-jre/package/guava-26.0-jre.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/hetu-transport/1.6.1/package/hetu-transport-1.6.1.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/jackson-datatype-guava/2.12.4/package/jackson-datatype-guava-2.12.4.jar
-wget --proxy=off --no-check-certificate https://cmc-hgh-artifactory.cmc.tools.huawei.com/artifactory/opensource_general/jackson-datatype-jdk8/2.12.4/package/jackson-datatype-jdk8-2.12.4.jar
-wget --proxy=off --no-check-certificate https://cmc-hgh-artifactory.cmc.tools.huawei.com/artifactory/opensource_general/Jackson-datatype-Joda/2.12.4/package/jackson-datatype-joda-2.12.4.jar
-wget --proxy=off --no-check-certificate https://cmc-hgh-artifactory.cmc.tools.huawei.com/artifactory/opensource_general/jackson-datatype-jsr310/2.12.4/package/jackson-datatype-jsr310-2.12.4.jar
-wget --proxy=off --no-check-certificate https://cmc-hgh-artifactory.cmc.tools.huawei.com/artifactory/opensource_general/jackson-module-parameter-names/2.12.4/package/jackson-module-parameter-names-2.12.4.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/jasypt/1.9.3/package/jasypt-1.9.3.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/jol-core/0.2/package/jol-core-0.2.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/joni/2.1.5.3/package/joni-2.1.5.3.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/log/0.193/package/log-0.193.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/perfmark-api/0.23.0/package/perfmark-api-0.23.0.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/presto-main/1.6.1/package/presto-main-1.6.1.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/presto-spi/1.6.1/package/presto-spi-1.6.1.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/protobuf-java/3.12.0/package/protobuf-java-3.12.0.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/slice/0.38/package/slice-0.38.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/bytecode/1.2/package/bytecode-1.2.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/fastutil/6.5.9/package/fastutil-6.5.9.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/json/206/package/json-206.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/lucene-analyzers-common/7.2.1/package/lucene-analyzers-common-7.2.1.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/presto-parser/1.6.1/package/presto-parser-1.6.1.jar
-wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/units/1.3/package/units-1.3.jar
-wget --proxy=off --no-check-certificate https://cmc.centralrepo.rnd.huawei.com/artifactory/maven-central-repo/io/airlift/stats/0.193/stats-0.193.jar
-wget --proxy=off --no-check-certificate https://cmc.centralrepo.rnd.huawei.com/artifactory/maven-central-repo/io/hetu/core/presto-expressions/1.6.1/presto-expressions-1.6.1.jar
+cp ../omnidata-spark-connector-lib/target/boostkit-omnidata-spark-connector-lib/boostkit-omnidata-spark-connector-lib/* .
cd ..
-zip -r -o $dir_name-aarch64.zip $dir_name-aarch64
\ No newline at end of file
+zip -r -o "${dir_name}-aarch64.zip" "${dir_name}-aarch64"
\ No newline at end of file
--
Gitee
From 7f47bbccd844079d98da74604e1a8db651cf616f Mon Sep 17 00:00:00 2001
From: x00823442 <1061529620@qq.com>
Date: Tue, 19 Sep 2023 09:30:48 +0800
Subject: [PATCH 2/4] [WIP] optimize: use assembly pom to download jars
---
omnidata/omnidata-spark-connector/spark_build.sh | 2 ++
1 file changed, 2 insertions(+)
diff --git a/omnidata/omnidata-spark-connector/spark_build.sh b/omnidata/omnidata-spark-connector/spark_build.sh
index 12c807d8c..42c2b908f 100644
--- a/omnidata/omnidata-spark-connector/spark_build.sh
+++ b/omnidata/omnidata-spark-connector/spark_build.sh
@@ -1,4 +1,5 @@
#!/bin/bash
+echo "aaa"
mvn clean package
jar_name=`ls -n connector/target/*.jar | grep omnidata-spark | awk -F ' ' '{print$9}' | awk -F '/' '{print$3}'`
dir_name=`ls -n connector/target/*.jar | grep omnidata-spark | awk -F ' ' '{print$9}' | awk -F '/' '{print$3}' | awk -F '.jar' '{print$1}'`
@@ -8,6 +9,7 @@ mkdir -p $dir_name-aarch64
cp connector/target/$jar_name $dir_name-aarch64
cd omnidata-spark-connector-lib/
mvn clean package
+echo "bbb"
cd ..
cd $dir_name-aarch64
cp ../omnidata-spark-connector-lib/target/boostkit-omnidata-spark-connector-lib/boostkit-omnidata-spark-connector-lib/* .
--
Gitee
From d750f3861b63def1baf9ba5b3d6d75abd798aafa Mon Sep 17 00:00:00 2001
From: x00823442 <1061529620@qq.com>
Date: Tue, 19 Sep 2023 16:59:38 +0800
Subject: [PATCH 3/4] [WIP] optimize: use assembly pom to download jars
---
.../omnidata-spark-connector-lib/pom.xml | 11 +++++++++++
omnidata/omnidata-spark-connector/spark_build.sh | 2 --
2 files changed, 11 insertions(+), 2 deletions(-)
diff --git a/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml
index f46393813..1244d4de9 100644
--- a/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml
+++ b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml
@@ -220,6 +220,17 @@
+
+ io.airlift
+ units
+ 1.3
+
+
+ *
+ *
+
+
+
org.jasypt
jasypt
diff --git a/omnidata/omnidata-spark-connector/spark_build.sh b/omnidata/omnidata-spark-connector/spark_build.sh
index 42c2b908f..12c807d8c 100644
--- a/omnidata/omnidata-spark-connector/spark_build.sh
+++ b/omnidata/omnidata-spark-connector/spark_build.sh
@@ -1,5 +1,4 @@
#!/bin/bash
-echo "aaa"
mvn clean package
jar_name=`ls -n connector/target/*.jar | grep omnidata-spark | awk -F ' ' '{print$9}' | awk -F '/' '{print$3}'`
dir_name=`ls -n connector/target/*.jar | grep omnidata-spark | awk -F ' ' '{print$9}' | awk -F '/' '{print$3}' | awk -F '.jar' '{print$1}'`
@@ -9,7 +8,6 @@ mkdir -p $dir_name-aarch64
cp connector/target/$jar_name $dir_name-aarch64
cd omnidata-spark-connector-lib/
mvn clean package
-echo "bbb"
cd ..
cd $dir_name-aarch64
cp ../omnidata-spark-connector-lib/target/boostkit-omnidata-spark-connector-lib/boostkit-omnidata-spark-connector-lib/* .
--
Gitee
From 84fc9b3379b94092c107c33e9aa4f6b5bf695df3 Mon Sep 17 00:00:00 2001
From: x00823442 <1061529620@qq.com>
Date: Thu, 21 Sep 2023 19:09:25 +0800
Subject: [PATCH 4/4] fix ColumnarPlugin bugs: 1.check whether literal is null.
2.left anti join rule is added. 3.create table as parquet add method
:checkParquetFieldNames
---
.../omnioffload/spark/ColumnarPlugin.scala | 119 ++++++++++--------
1 file changed, 68 insertions(+), 51 deletions(-)
diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/com/huawei/boostkit/omnioffload/spark/ColumnarPlugin.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/com/huawei/boostkit/omnioffload/spark/ColumnarPlugin.scala
index 4e7fdaf6f..917b40d86 100644
--- a/omnidata/omnidata-spark-connector/connector/src/main/scala/com/huawei/boostkit/omnioffload/spark/ColumnarPlugin.scala
+++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/com/huawei/boostkit/omnioffload/spark/ColumnarPlugin.scala
@@ -39,7 +39,6 @@ import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataTypes, DoubleType, LongType}
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
-
import java.net.URI
import scala.collection.JavaConverters
@@ -52,7 +51,6 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] {
var hasCoalesce = false
var hasShuffle = false
var ACCURATE_QUERY_HD = "153"
- var ACCURATE_QUERY = "000"
var RADIX_SORT_COLUMN_NUMS = 2
def apply(plan: SparkPlan): SparkPlan = {
@@ -128,30 +126,30 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] {
DataWritingCommandExec(cmd, CoalesceExec(numPartitions, child))
}
case p@ColumnarSortMergeJoinExec(_, _, joinType, _, _, _, _, projectList)
- if joinType.equals(LeftOuter) =>
+ if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) =>
isSMJ = true
numPartitions = NdpConnectorUtils.getSMJNumPartitions(5000)
ColumnarSortMergeJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti,
condition = p.condition, left = p.left, right = p.right, isSkewJoin = p.isSkewJoin, projectList)
case p@SortMergeJoinExec(_, _, joinType, _, _, _, _)
- if joinType.equals(LeftOuter) =>
+ if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) =>
isSMJ = true
numPartitions = NdpConnectorUtils.getSMJNumPartitions(5000)
SortMergeJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti, condition = p.condition,
left = p.left, right = p.right, isSkewJoin = p.isSkewJoin)
- case p@ColumnarBroadcastHashJoinExec(_, _, joinType, _, _, _, _, _, projectList) if joinType.equals(LeftOuter) =>
+ case p@ColumnarBroadcastHashJoinExec(_, _, joinType, _, _, _, _, _, projectList) if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) =>
ColumnarBroadcastHashJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys,
joinType = LeftAnti, buildSide = p.buildSide, condition = p.condition, left = p.left,
right = p.right, isNullAwareAntiJoin = p.isNullAwareAntiJoin, projectList)
- case p@BroadcastHashJoinExec(_, _, joinType, _, _, _, _, _) if joinType.equals(LeftOuter) =>
+ case p@BroadcastHashJoinExec(_, _, joinType, _, _, _, _, _) if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) =>
BroadcastHashJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti,
buildSide = p.buildSide, condition = p.condition, left = p.left, right = p.right,
isNullAwareAntiJoin = p.isNullAwareAntiJoin)
case p@ColumnarShuffledHashJoinExec(_, _, joinType, _, _, _, _, projectList)
- if joinType.equals(LeftOuter) =>
+ if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) =>
ColumnarShuffledHashJoinExec(p.leftKeys, p.rightKeys, LeftAnti, p.buildSide, p.condition,
p.left, p.right, projectList)
- case p@ShuffledHashJoinExec(_, _, joinType, _, _, _, _) if joinType.equals(LeftOuter) =>
+ case p@ShuffledHashJoinExec(_, _, joinType, _, _, _, _) if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) =>
ShuffledHashJoinExec(p.leftKeys, p.rightKeys, LeftAnti, p.buildSide, p.condition, p.left, p.right)
case p@FilterExec(condition, child: OmniColumnarToRowExec, selectivity) =>
val childPlan = child.transform {
@@ -173,21 +171,21 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] {
FilterExec(condition, childPlan, selectivity)
case c1@OmniColumnarToRowExec(c2@ColumnarFilterExec(condition, c3: FileSourceScanExec)) =>
numPartitions = NdpConnectorUtils.getOmniColumnarNumPartitions(1000)
- if (isAccurate(condition)) {
+ if (NdpPluginEnableFlag.isAccurate(condition)) {
pushDownTaskCount = NdpConnectorUtils.getOmniColumnarTaskCount(50)
}
FilterExec(condition, ColumnarToRowExec(c3))
- case p@FilterExec(condition, _, _) if isAccurate(condition) =>
+ case p@FilterExec(condition, _, _) if NdpPluginEnableFlag.isAccurate(condition) =>
numPartitions = NdpConnectorUtils.getFilterPartitions(1000)
pushDownTaskCount = NdpConnectorUtils.getFilterTaskCount(50)
p
case p@ColumnarConditionProjectExec(projectList, condition, child)
if condition.toString().startsWith("isnull") && (child.isInstanceOf[ColumnarSortMergeJoinExec]
- || child.isInstanceOf[ColumnarBroadcastHashJoinExec] || child.isInstanceOf[ColumnarShuffledHashJoinExec]) =>
+ || child.isInstanceOf[ColumnarBroadcastHashJoinExec] || child.isInstanceOf[ColumnarShuffledHashJoinExec]) && isTenPocProject(projectList) =>
ColumnarProjectExec(changeProjectList(projectList), child)
case p@ProjectExec(projectList, filter: FilterExec)
if filter.condition.toString().startsWith("isnull") && (filter.child.isInstanceOf[SortMergeJoinExec]
- || filter.child.isInstanceOf[BroadcastHashJoinExec] || filter.child.isInstanceOf[ShuffledHashJoinExec]) =>
+ || filter.child.isInstanceOf[BroadcastHashJoinExec] || filter.child.isInstanceOf[ShuffledHashJoinExec]) && isTenPocProject(projectList) =>
ProjectExec(changeProjectList(projectList), filter.child)
case p: SortAggregateExec if p.child.isInstanceOf[OmniColumnarToRowExec]
&& p.child.asInstanceOf[OmniColumnarToRowExec].child.isInstanceOf[ColumnarSortExec]
@@ -262,24 +260,24 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] {
DataWritingCommandExec(cmd, CoalesceExec(numPartitions, child))
}
case p@SortMergeJoinExec(_, _, joinType, _, _, _, _)
- if joinType.equals(LeftOuter) =>
+ if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) =>
isSMJ = true
numPartitions = NdpConnectorUtils.getSMJNumPartitions(5000)
SortMergeJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti, condition = p.condition,
left = p.left, right = p.right, isSkewJoin = p.isSkewJoin)
- case p@BroadcastHashJoinExec(_, _, joinType, _, _, _, _, _) if joinType.equals(LeftOuter) =>
+ case p@BroadcastHashJoinExec(_, _, joinType, _, _, _, _, _) if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) =>
BroadcastHashJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti,
buildSide = p.buildSide, condition = p.condition, left = p.left, right = p.right,
isNullAwareAntiJoin = p.isNullAwareAntiJoin)
- case p@ShuffledHashJoinExec(_, _, joinType, _, _, _, _) if joinType.equals(LeftOuter) =>
+ case p@ShuffledHashJoinExec(_, _, joinType, _, _, _, _) if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) =>
ShuffledHashJoinExec(p.leftKeys, p.rightKeys, LeftAnti, p.buildSide, p.condition, p.left, p.right)
- case p@FilterExec(condition, _, _) if isAccurate(condition) =>
+ case p@FilterExec(condition, _, _) if NdpPluginEnableFlag.isAccurate(condition) =>
numPartitions = NdpConnectorUtils.getFilterPartitions(1000)
pushDownTaskCount = NdpConnectorUtils.getFilterTaskCount(50)
p
case p@ProjectExec(projectList, filter: FilterExec)
if filter.condition.toString().startsWith("isnull") && (filter.child.isInstanceOf[SortMergeJoinExec]
- || filter.child.isInstanceOf[BroadcastHashJoinExec] || filter.child.isInstanceOf[ShuffledHashJoinExec]) =>
+ || filter.child.isInstanceOf[BroadcastHashJoinExec] || filter.child.isInstanceOf[ShuffledHashJoinExec]) && isTenPocProject(projectList) =>
ProjectExec(changeProjectList(projectList), filter.child)
case p => p
}
@@ -306,7 +304,7 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] {
result = true
}
x match {
- case literal: Literal if literal.value.toString.startsWith(ACCURATE_QUERY_HD) =>
+ case literal: Literal if !literal.nullable && literal.value.toString.startsWith(ACCURATE_QUERY_HD) =>
result = true
case _ =>
}
@@ -319,16 +317,6 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] {
result
}
- def isAccurate(condition: Expression): Boolean = {
- var result = false
- condition.foreach {
- case literal: Literal if literal.value.toString.startsWith(ACCURATE_QUERY) =>
- result = true
- case _ =>
- }
- result
- }
-
def changeProjectList(projectList: Seq[NamedExpression]): Seq[NamedExpression] = {
val p = projectList.map {
case exp: Alias =>
@@ -344,10 +332,32 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] {
}
def isRadixSortExecEnable(sortOrder: Seq[SortOrder]): Boolean = {
- sortOrder.length == RADIX_SORT_COLUMN_NUMS &&
- sortOrder.head.dataType == LongType &&
- sortOrder(1).dataType == LongType &&
- SQLConf.get.getConfString("spark.omni.sql.ndpPlugin.radixSort.enabled", "true").toBoolean
+ sortOrder.lengthCompare(RADIX_SORT_COLUMN_NUMS) == 0 &&
+ sortOrder.head.dataType == LongType &&
+ sortOrder.head.child.isInstanceOf[AttributeReference] &&
+ sortOrder.head.child.asInstanceOf[AttributeReference].name.startsWith("col") &&
+ sortOrder(1).dataType == LongType &&
+ sortOrder(1).child.isInstanceOf[AttributeReference] &&
+ sortOrder(1).child.asInstanceOf[AttributeReference].name.startsWith("col") &&
+ SQLConf.get.getConfString("spark.omni.sql.ndpPlugin.radixSort.enabled", "true").toBoolean
+ }
+
+ def isTenPocProject(projectList: Seq[NamedExpression]): Boolean = {
+ projectList.forall {
+ case exp: Alias =>
+ exp.child.isInstanceOf[AttributeReference] && exp.child.asInstanceOf[AttributeReference].name.startsWith("col")
+ case exp: AttributeReference =>
+ exp.name.startsWith("col")
+ case _ => false
+ }
+ }
+
+ def isTenPocJoin(keys: Seq[Expression]): Boolean = {
+ keys.forall {
+ case exp: AttributeReference =>
+ exp.name.startsWith("col")
+ case _ => false
+ }
}
}
@@ -370,9 +380,6 @@ case class NdpRules(session: SparkSession) extends ColumnarRule with Logging {
}
case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] {
-
- var ACCURATE_QUERY = "000"
-
val SORT_REPARTITION_PLANS: Seq[String] = Seq(
"Sort,HiveTableRelation",
"Sort,LogicalRelation",
@@ -409,6 +416,7 @@ case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] {
plan.transformUp {
case CreateHiveTableAsSelectCommand(tableDesc, query, outputColumnNames, mode)
if isParquetEnable(tableDesc)
+ && checkParquetFieldNames(outputColumnNames)
&& SQLConf.get.getConfString("spark.omni.sql.ndpPlugin.parquetOutput.enabled", "true")
.toBoolean =>
CreateDataSourceTableAsSelectCommand(
@@ -471,6 +479,11 @@ case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] {
false
}
+ // ,;{}()\n\t= and space are special characters in Parquet schema
+ def checkParquetFieldNames(outputColumnNames: Seq[String]): Boolean = {
+ outputColumnNames.forall(!_.matches(".*[ ,;{}()\n\t=].*"))
+ }
+
def repartition(fs: FileSystem, plan: LogicalPlan): Unit = {
var tables = Seq[URI]()
var planContents = Seq[String]()
@@ -508,7 +521,7 @@ case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] {
existsAgg = true
planContents :+= p.nodeName
case p@Filter(condition, _) =>
- existAccurate |= isAccurate(condition)
+ existAccurate |= NdpPluginEnableFlag.isAccurate(condition)
existFilter = true
existLike |= isLike(condition)
planContents :+= p.nodeName
@@ -587,9 +600,13 @@ case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] {
def castSumAvgToBigInt(expression: Expression): Expression = {
val exp = expression.transform {
- case agg@Average(cast: Cast) if cast.dataType.isInstanceOf[DoubleType] =>
+ case Average(cast: Cast) if cast.dataType.isInstanceOf[DoubleType]
+ && cast.child.isInstanceOf[AttributeReference]
+ && cast.child.asInstanceOf[AttributeReference].name.startsWith("col")=>
Average(Cast(cast.child, DataTypes.LongType))
- case agg@Sum(cast: Cast) if cast.dataType.isInstanceOf[DoubleType] =>
+ case Sum(cast: Cast) if cast.dataType.isInstanceOf[DoubleType]
+ && cast.child.isInstanceOf[AttributeReference]
+ && cast.child.asInstanceOf[AttributeReference].name.startsWith("col")=>
Sum(Cast(cast.child, DataTypes.LongType))
case e =>
e
@@ -611,28 +628,17 @@ case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] {
def castStringExpressionToBigint(expression: Expression): Expression = {
expression match {
- case a@AttributeReference(_, DataTypes.StringType, _, _) =>
+ case a@AttributeReference(_, DataTypes.StringType, _, _) if a.name.startsWith("col") =>
Cast(a, DataTypes.LongType)
case e => e
}
}
-
def turnOffOperator(): Unit = {
session.sqlContext.setConf("org.apache.spark.sql.columnar.enabled", "false")
session.sqlContext.setConf("spark.sql.join.columnar.preferShuffledHashJoin", "false")
}
- def isAccurate(condition: Expression): Boolean = {
- var result = false
- condition.foreach {
- case literal: Literal if literal.value.toString.startsWith(ACCURATE_QUERY) =>
- result = true
- case _ =>
- }
- result
- }
-
def isLike(condition: Expression): Boolean = {
var result = false
condition.foreach {
@@ -652,8 +658,19 @@ class ColumnarPlugin extends (SparkSessionExtensions => Unit) with Logging {
}
object NdpPluginEnableFlag {
-
val ndpEnabledStr = "spark.omni.sql.ndpPlugin.enabled"
+ var ACCURATE_QUERY = "000"
+
+ def isAccurate(condition: Expression): Boolean = {
+ var result = false
+ condition.foreach {
+ // literal need to check null
+ case literal: Literal if !literal.nullable && literal.value.toString.startsWith(ACCURATE_QUERY) =>
+ result = true
+ case _ =>
+ }
+ result
+ }
def isMatchedIpAddress: Boolean = {
val ipSet = Set("xxx.xxx.xxx.xxx")
--
Gitee