From 23d44765dc84e47a0cce28f9f11676b2f59a2768 Mon Sep 17 00:00:00 2001 From: x00823442 <1061529620@qq.com> Date: Mon, 18 Sep 2023 11:12:51 +0800 Subject: [PATCH 1/4] [WIP] optimize: use assembly pom to download jars --- .../omnidata-spark-connector-lib/README.md | 11 + .../omnidata-spark-connector-lib/pom.xml | 313 ++++++++++++++++++ .../src/assembly/assembly.xml | 15 + .../omnidata-spark-connector/spark_build.sh | 36 +- 4 files changed, 344 insertions(+), 31 deletions(-) create mode 100644 omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/README.md create mode 100644 omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml create mode 100644 omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/src/assembly/assembly.xml diff --git a/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/README.md b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/README.md new file mode 100644 index 000000000..7c95fcb5a --- /dev/null +++ b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/README.md @@ -0,0 +1,11 @@ +# OmniData Spark Connector Lib + +## Building OmniData Spark Connector Lib + +1. Simply run the following command from the project root directory:
+`mvn clean package`
+Then you will find jars in the "omnidata-spark-connector-lib/target/" directory. + +## More Information + +For further assistance, send an email to kunpengcompute@huawei.com. \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml new file mode 100644 index 000000000..f46393813 --- /dev/null +++ b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml @@ -0,0 +1,313 @@ + + + 4.0.0 + + com.huawei.boostkit + omnidata-spark-connector-lib + pom + 1.5.0 + + + 2.12.4 + 1.2.3 + 1.6.1 + 206 + 2.12.0 + + + + + org.bouncycastle + bcpkix-jdk15on + 1.68 + + + * + * + + + + + com.google.protobuf + protobuf-java + 3.12.0 + + + it.unimi.dsi + fastutil + 6.5.9 + + + com.alibaba + fastjson + 1.2.76 + + + com.fasterxml.jackson.datatype + jackson-datatype-guava + ${dep.json.version} + + + * + * + + + + + com.fasterxml.jackson.datatype + jackson-datatype-jdk8 + ${dep.json.version} + + + * + * + + + + + com.fasterxml.jackson.datatype + jackson-datatype-joda + ${dep.json.version} + + + * + * + + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + ${dep.json.version} + + + * + * + + + + + com.fasterxml.jackson.module + jackson-module-parameter-names + ${dep.json.version} + + + * + * + + + + + io.hetu.core + presto-spi + ${dep.hetu.version} + + + * + * + + + + + io.hetu.core + hetu-transport + ${dep.hetu.version} + + + * + * + + + + + io.hetu.core + presto-parser + ${dep.hetu.version} + + + * + * + + + + + io.hetu.core + presto-main + ${dep.hetu.version} + + + * + * + + + + + io.hetu.core + presto-expressions + ${dep.hetu.version} + + + com.google.guava + guava + 26.0-jre + + + * + * + + + + + io.airlift + json + ${dep.airlift.version} + + + * + * + + + + + io.airlift + slice + 0.38 + + + cobugsm.google.code.find + jsr305 + + + + + io.airlift + stats + 0.193 + + + cobugsm.google.code.find + jsr305 + + + com.fasterxml.jackson.core + jackson-annotations + + + org.hdrhistogram + HdrHistogram + + + org.weakref + jmxutils + + + + + io.airlift + joni + 2.1.5.3 + + + io.airlift + bytecode + 1.2 + + + * + * + + + + + org.jasypt + jasypt + 1.9.3 + + + org.apache.lucene + lucene-analyzers-common + 7.2.1 + + + * + * + + + + + org.apache.curator + curator-framework + ${dep.curator.version} + + + com.google.guava + guava + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + + + org.apache.curator + curator-recipes + ${dep.curator.version} + + + io.perfmark + perfmark-api + 0.23.0 + + + de.ruedigermoeller + fst + 2.57 + + + org.javassist + javassist + + + org.objenesis + objenesis + + + com.fasterxml.jackson.core + jackson-core + + + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.4.0 + + src/assembly/assembly.xml + false + boostkit-omnidata-spark-connector-lib + + + + package + + single + + + + + + + + + \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/src/assembly/assembly.xml b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/src/assembly/assembly.xml new file mode 100644 index 000000000..ccf4481d5 --- /dev/null +++ b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/src/assembly/assembly.xml @@ -0,0 +1,15 @@ + + bin + + dir + + + + + ./ + true + + + \ No newline at end of file diff --git a/omnidata/omnidata-spark-connector/spark_build.sh b/omnidata/omnidata-spark-connector/spark_build.sh index bacbfbe75..12c807d8c 100644 --- a/omnidata/omnidata-spark-connector/spark_build.sh +++ b/omnidata/omnidata-spark-connector/spark_build.sh @@ -6,36 +6,10 @@ if [ -d "${dir_name}-aarch64" ];then rm -rf ${dir_name}-aarch64; fi if [ -d "${dir_name}-aarch64.zip" ];then rm -rf ${dir_name}-aarch64.zip; fi mkdir -p $dir_name-aarch64 cp connector/target/$jar_name $dir_name-aarch64 +cd omnidata-spark-connector-lib/ +mvn clean package +cd .. cd $dir_name-aarch64 -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/bcpkix-jdk15on/1.68/package/bcpkix-jdk15on-1.68.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/curator-client/2.12.0/package/curator-client-2.12.0.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/curator-framework/2.12.0/package/curator-framework-2.12.0.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/curator-recipes/2.12.0/package/curator-recipes-2.12.0.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/fastjson/1.2.76/package/fastjson-1.2.76.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/fst/2.57/package/fst-2.57.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/guava/26.0-jre/package/guava-26.0-jre.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/hetu-transport/1.6.1/package/hetu-transport-1.6.1.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/jackson-datatype-guava/2.12.4/package/jackson-datatype-guava-2.12.4.jar -wget --proxy=off --no-check-certificate https://cmc-hgh-artifactory.cmc.tools.huawei.com/artifactory/opensource_general/jackson-datatype-jdk8/2.12.4/package/jackson-datatype-jdk8-2.12.4.jar -wget --proxy=off --no-check-certificate https://cmc-hgh-artifactory.cmc.tools.huawei.com/artifactory/opensource_general/Jackson-datatype-Joda/2.12.4/package/jackson-datatype-joda-2.12.4.jar -wget --proxy=off --no-check-certificate https://cmc-hgh-artifactory.cmc.tools.huawei.com/artifactory/opensource_general/jackson-datatype-jsr310/2.12.4/package/jackson-datatype-jsr310-2.12.4.jar -wget --proxy=off --no-check-certificate https://cmc-hgh-artifactory.cmc.tools.huawei.com/artifactory/opensource_general/jackson-module-parameter-names/2.12.4/package/jackson-module-parameter-names-2.12.4.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/jasypt/1.9.3/package/jasypt-1.9.3.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/jol-core/0.2/package/jol-core-0.2.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/joni/2.1.5.3/package/joni-2.1.5.3.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/log/0.193/package/log-0.193.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/perfmark-api/0.23.0/package/perfmark-api-0.23.0.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/presto-main/1.6.1/package/presto-main-1.6.1.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/presto-spi/1.6.1/package/presto-spi-1.6.1.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/protobuf-java/3.12.0/package/protobuf-java-3.12.0.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/slice/0.38/package/slice-0.38.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/bytecode/1.2/package/bytecode-1.2.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/fastutil/6.5.9/package/fastutil-6.5.9.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/json/206/package/json-206.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/lucene-analyzers-common/7.2.1/package/lucene-analyzers-common-7.2.1.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/presto-parser/1.6.1/package/presto-parser-1.6.1.jar -wget --proxy=off --no-check-certificate https://cmc.cloudartifact.szv.dragon.tools.huawei.com/artifactory/opensource_general/units/1.3/package/units-1.3.jar -wget --proxy=off --no-check-certificate https://cmc.centralrepo.rnd.huawei.com/artifactory/maven-central-repo/io/airlift/stats/0.193/stats-0.193.jar -wget --proxy=off --no-check-certificate https://cmc.centralrepo.rnd.huawei.com/artifactory/maven-central-repo/io/hetu/core/presto-expressions/1.6.1/presto-expressions-1.6.1.jar +cp ../omnidata-spark-connector-lib/target/boostkit-omnidata-spark-connector-lib/boostkit-omnidata-spark-connector-lib/* . cd .. -zip -r -o $dir_name-aarch64.zip $dir_name-aarch64 \ No newline at end of file +zip -r -o "${dir_name}-aarch64.zip" "${dir_name}-aarch64" \ No newline at end of file -- Gitee From 7f47bbccd844079d98da74604e1a8db651cf616f Mon Sep 17 00:00:00 2001 From: x00823442 <1061529620@qq.com> Date: Tue, 19 Sep 2023 09:30:48 +0800 Subject: [PATCH 2/4] [WIP] optimize: use assembly pom to download jars --- omnidata/omnidata-spark-connector/spark_build.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/omnidata/omnidata-spark-connector/spark_build.sh b/omnidata/omnidata-spark-connector/spark_build.sh index 12c807d8c..42c2b908f 100644 --- a/omnidata/omnidata-spark-connector/spark_build.sh +++ b/omnidata/omnidata-spark-connector/spark_build.sh @@ -1,4 +1,5 @@ #!/bin/bash +echo "aaa" mvn clean package jar_name=`ls -n connector/target/*.jar | grep omnidata-spark | awk -F ' ' '{print$9}' | awk -F '/' '{print$3}'` dir_name=`ls -n connector/target/*.jar | grep omnidata-spark | awk -F ' ' '{print$9}' | awk -F '/' '{print$3}' | awk -F '.jar' '{print$1}'` @@ -8,6 +9,7 @@ mkdir -p $dir_name-aarch64 cp connector/target/$jar_name $dir_name-aarch64 cd omnidata-spark-connector-lib/ mvn clean package +echo "bbb" cd .. cd $dir_name-aarch64 cp ../omnidata-spark-connector-lib/target/boostkit-omnidata-spark-connector-lib/boostkit-omnidata-spark-connector-lib/* . -- Gitee From d750f3861b63def1baf9ba5b3d6d75abd798aafa Mon Sep 17 00:00:00 2001 From: x00823442 <1061529620@qq.com> Date: Tue, 19 Sep 2023 16:59:38 +0800 Subject: [PATCH 3/4] [WIP] optimize: use assembly pom to download jars --- .../omnidata-spark-connector-lib/pom.xml | 11 +++++++++++ omnidata/omnidata-spark-connector/spark_build.sh | 2 -- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml index f46393813..1244d4de9 100644 --- a/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml +++ b/omnidata/omnidata-spark-connector/omnidata-spark-connector-lib/pom.xml @@ -220,6 +220,17 @@ + + io.airlift + units + 1.3 + + + * + * + + + org.jasypt jasypt diff --git a/omnidata/omnidata-spark-connector/spark_build.sh b/omnidata/omnidata-spark-connector/spark_build.sh index 42c2b908f..12c807d8c 100644 --- a/omnidata/omnidata-spark-connector/spark_build.sh +++ b/omnidata/omnidata-spark-connector/spark_build.sh @@ -1,5 +1,4 @@ #!/bin/bash -echo "aaa" mvn clean package jar_name=`ls -n connector/target/*.jar | grep omnidata-spark | awk -F ' ' '{print$9}' | awk -F '/' '{print$3}'` dir_name=`ls -n connector/target/*.jar | grep omnidata-spark | awk -F ' ' '{print$9}' | awk -F '/' '{print$3}' | awk -F '.jar' '{print$1}'` @@ -9,7 +8,6 @@ mkdir -p $dir_name-aarch64 cp connector/target/$jar_name $dir_name-aarch64 cd omnidata-spark-connector-lib/ mvn clean package -echo "bbb" cd .. cd $dir_name-aarch64 cp ../omnidata-spark-connector-lib/target/boostkit-omnidata-spark-connector-lib/boostkit-omnidata-spark-connector-lib/* . -- Gitee From 84fc9b3379b94092c107c33e9aa4f6b5bf695df3 Mon Sep 17 00:00:00 2001 From: x00823442 <1061529620@qq.com> Date: Thu, 21 Sep 2023 19:09:25 +0800 Subject: [PATCH 4/4] fix ColumnarPlugin bugs: 1.check whether literal is null. 2.left anti join rule is added. 3.create table as parquet add method :checkParquetFieldNames --- .../omnioffload/spark/ColumnarPlugin.scala | 119 ++++++++++-------- 1 file changed, 68 insertions(+), 51 deletions(-) diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/com/huawei/boostkit/omnioffload/spark/ColumnarPlugin.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/com/huawei/boostkit/omnioffload/spark/ColumnarPlugin.scala index 4e7fdaf6f..917b40d86 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/com/huawei/boostkit/omnioffload/spark/ColumnarPlugin.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/com/huawei/boostkit/omnioffload/spark/ColumnarPlugin.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataTypes, DoubleType, LongType} import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} - import java.net.URI import scala.collection.JavaConverters @@ -52,7 +51,6 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] { var hasCoalesce = false var hasShuffle = false var ACCURATE_QUERY_HD = "153" - var ACCURATE_QUERY = "000" var RADIX_SORT_COLUMN_NUMS = 2 def apply(plan: SparkPlan): SparkPlan = { @@ -128,30 +126,30 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] { DataWritingCommandExec(cmd, CoalesceExec(numPartitions, child)) } case p@ColumnarSortMergeJoinExec(_, _, joinType, _, _, _, _, projectList) - if joinType.equals(LeftOuter) => + if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) => isSMJ = true numPartitions = NdpConnectorUtils.getSMJNumPartitions(5000) ColumnarSortMergeJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti, condition = p.condition, left = p.left, right = p.right, isSkewJoin = p.isSkewJoin, projectList) case p@SortMergeJoinExec(_, _, joinType, _, _, _, _) - if joinType.equals(LeftOuter) => + if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) => isSMJ = true numPartitions = NdpConnectorUtils.getSMJNumPartitions(5000) SortMergeJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti, condition = p.condition, left = p.left, right = p.right, isSkewJoin = p.isSkewJoin) - case p@ColumnarBroadcastHashJoinExec(_, _, joinType, _, _, _, _, _, projectList) if joinType.equals(LeftOuter) => + case p@ColumnarBroadcastHashJoinExec(_, _, joinType, _, _, _, _, _, projectList) if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) => ColumnarBroadcastHashJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti, buildSide = p.buildSide, condition = p.condition, left = p.left, right = p.right, isNullAwareAntiJoin = p.isNullAwareAntiJoin, projectList) - case p@BroadcastHashJoinExec(_, _, joinType, _, _, _, _, _) if joinType.equals(LeftOuter) => + case p@BroadcastHashJoinExec(_, _, joinType, _, _, _, _, _) if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) => BroadcastHashJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti, buildSide = p.buildSide, condition = p.condition, left = p.left, right = p.right, isNullAwareAntiJoin = p.isNullAwareAntiJoin) case p@ColumnarShuffledHashJoinExec(_, _, joinType, _, _, _, _, projectList) - if joinType.equals(LeftOuter) => + if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) => ColumnarShuffledHashJoinExec(p.leftKeys, p.rightKeys, LeftAnti, p.buildSide, p.condition, p.left, p.right, projectList) - case p@ShuffledHashJoinExec(_, _, joinType, _, _, _, _) if joinType.equals(LeftOuter) => + case p@ShuffledHashJoinExec(_, _, joinType, _, _, _, _) if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) => ShuffledHashJoinExec(p.leftKeys, p.rightKeys, LeftAnti, p.buildSide, p.condition, p.left, p.right) case p@FilterExec(condition, child: OmniColumnarToRowExec, selectivity) => val childPlan = child.transform { @@ -173,21 +171,21 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] { FilterExec(condition, childPlan, selectivity) case c1@OmniColumnarToRowExec(c2@ColumnarFilterExec(condition, c3: FileSourceScanExec)) => numPartitions = NdpConnectorUtils.getOmniColumnarNumPartitions(1000) - if (isAccurate(condition)) { + if (NdpPluginEnableFlag.isAccurate(condition)) { pushDownTaskCount = NdpConnectorUtils.getOmniColumnarTaskCount(50) } FilterExec(condition, ColumnarToRowExec(c3)) - case p@FilterExec(condition, _, _) if isAccurate(condition) => + case p@FilterExec(condition, _, _) if NdpPluginEnableFlag.isAccurate(condition) => numPartitions = NdpConnectorUtils.getFilterPartitions(1000) pushDownTaskCount = NdpConnectorUtils.getFilterTaskCount(50) p case p@ColumnarConditionProjectExec(projectList, condition, child) if condition.toString().startsWith("isnull") && (child.isInstanceOf[ColumnarSortMergeJoinExec] - || child.isInstanceOf[ColumnarBroadcastHashJoinExec] || child.isInstanceOf[ColumnarShuffledHashJoinExec]) => + || child.isInstanceOf[ColumnarBroadcastHashJoinExec] || child.isInstanceOf[ColumnarShuffledHashJoinExec]) && isTenPocProject(projectList) => ColumnarProjectExec(changeProjectList(projectList), child) case p@ProjectExec(projectList, filter: FilterExec) if filter.condition.toString().startsWith("isnull") && (filter.child.isInstanceOf[SortMergeJoinExec] - || filter.child.isInstanceOf[BroadcastHashJoinExec] || filter.child.isInstanceOf[ShuffledHashJoinExec]) => + || filter.child.isInstanceOf[BroadcastHashJoinExec] || filter.child.isInstanceOf[ShuffledHashJoinExec]) && isTenPocProject(projectList) => ProjectExec(changeProjectList(projectList), filter.child) case p: SortAggregateExec if p.child.isInstanceOf[OmniColumnarToRowExec] && p.child.asInstanceOf[OmniColumnarToRowExec].child.isInstanceOf[ColumnarSortExec] @@ -262,24 +260,24 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] { DataWritingCommandExec(cmd, CoalesceExec(numPartitions, child)) } case p@SortMergeJoinExec(_, _, joinType, _, _, _, _) - if joinType.equals(LeftOuter) => + if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) => isSMJ = true numPartitions = NdpConnectorUtils.getSMJNumPartitions(5000) SortMergeJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti, condition = p.condition, left = p.left, right = p.right, isSkewJoin = p.isSkewJoin) - case p@BroadcastHashJoinExec(_, _, joinType, _, _, _, _, _) if joinType.equals(LeftOuter) => + case p@BroadcastHashJoinExec(_, _, joinType, _, _, _, _, _) if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) => BroadcastHashJoinExec(leftKeys = p.leftKeys, rightKeys = p.rightKeys, joinType = LeftAnti, buildSide = p.buildSide, condition = p.condition, left = p.left, right = p.right, isNullAwareAntiJoin = p.isNullAwareAntiJoin) - case p@ShuffledHashJoinExec(_, _, joinType, _, _, _, _) if joinType.equals(LeftOuter) => + case p@ShuffledHashJoinExec(_, _, joinType, _, _, _, _) if joinType.equals(LeftOuter) && isTenPocJoin(p.leftKeys) && isTenPocJoin(p.rightKeys) => ShuffledHashJoinExec(p.leftKeys, p.rightKeys, LeftAnti, p.buildSide, p.condition, p.left, p.right) - case p@FilterExec(condition, _, _) if isAccurate(condition) => + case p@FilterExec(condition, _, _) if NdpPluginEnableFlag.isAccurate(condition) => numPartitions = NdpConnectorUtils.getFilterPartitions(1000) pushDownTaskCount = NdpConnectorUtils.getFilterTaskCount(50) p case p@ProjectExec(projectList, filter: FilterExec) if filter.condition.toString().startsWith("isnull") && (filter.child.isInstanceOf[SortMergeJoinExec] - || filter.child.isInstanceOf[BroadcastHashJoinExec] || filter.child.isInstanceOf[ShuffledHashJoinExec]) => + || filter.child.isInstanceOf[BroadcastHashJoinExec] || filter.child.isInstanceOf[ShuffledHashJoinExec]) && isTenPocProject(projectList) => ProjectExec(changeProjectList(projectList), filter.child) case p => p } @@ -306,7 +304,7 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] { result = true } x match { - case literal: Literal if literal.value.toString.startsWith(ACCURATE_QUERY_HD) => + case literal: Literal if !literal.nullable && literal.value.toString.startsWith(ACCURATE_QUERY_HD) => result = true case _ => } @@ -319,16 +317,6 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] { result } - def isAccurate(condition: Expression): Boolean = { - var result = false - condition.foreach { - case literal: Literal if literal.value.toString.startsWith(ACCURATE_QUERY) => - result = true - case _ => - } - result - } - def changeProjectList(projectList: Seq[NamedExpression]): Seq[NamedExpression] = { val p = projectList.map { case exp: Alias => @@ -344,10 +332,32 @@ case class NdpOverrides(sparkSession: SparkSession) extends Rule[SparkPlan] { } def isRadixSortExecEnable(sortOrder: Seq[SortOrder]): Boolean = { - sortOrder.length == RADIX_SORT_COLUMN_NUMS && - sortOrder.head.dataType == LongType && - sortOrder(1).dataType == LongType && - SQLConf.get.getConfString("spark.omni.sql.ndpPlugin.radixSort.enabled", "true").toBoolean + sortOrder.lengthCompare(RADIX_SORT_COLUMN_NUMS) == 0 && + sortOrder.head.dataType == LongType && + sortOrder.head.child.isInstanceOf[AttributeReference] && + sortOrder.head.child.asInstanceOf[AttributeReference].name.startsWith("col") && + sortOrder(1).dataType == LongType && + sortOrder(1).child.isInstanceOf[AttributeReference] && + sortOrder(1).child.asInstanceOf[AttributeReference].name.startsWith("col") && + SQLConf.get.getConfString("spark.omni.sql.ndpPlugin.radixSort.enabled", "true").toBoolean + } + + def isTenPocProject(projectList: Seq[NamedExpression]): Boolean = { + projectList.forall { + case exp: Alias => + exp.child.isInstanceOf[AttributeReference] && exp.child.asInstanceOf[AttributeReference].name.startsWith("col") + case exp: AttributeReference => + exp.name.startsWith("col") + case _ => false + } + } + + def isTenPocJoin(keys: Seq[Expression]): Boolean = { + keys.forall { + case exp: AttributeReference => + exp.name.startsWith("col") + case _ => false + } } } @@ -370,9 +380,6 @@ case class NdpRules(session: SparkSession) extends ColumnarRule with Logging { } case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] { - - var ACCURATE_QUERY = "000" - val SORT_REPARTITION_PLANS: Seq[String] = Seq( "Sort,HiveTableRelation", "Sort,LogicalRelation", @@ -409,6 +416,7 @@ case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] { plan.transformUp { case CreateHiveTableAsSelectCommand(tableDesc, query, outputColumnNames, mode) if isParquetEnable(tableDesc) + && checkParquetFieldNames(outputColumnNames) && SQLConf.get.getConfString("spark.omni.sql.ndpPlugin.parquetOutput.enabled", "true") .toBoolean => CreateDataSourceTableAsSelectCommand( @@ -471,6 +479,11 @@ case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] { false } + // ,;{}()\n\t= and space are special characters in Parquet schema + def checkParquetFieldNames(outputColumnNames: Seq[String]): Boolean = { + outputColumnNames.forall(!_.matches(".*[ ,;{}()\n\t=].*")) + } + def repartition(fs: FileSystem, plan: LogicalPlan): Unit = { var tables = Seq[URI]() var planContents = Seq[String]() @@ -508,7 +521,7 @@ case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] { existsAgg = true planContents :+= p.nodeName case p@Filter(condition, _) => - existAccurate |= isAccurate(condition) + existAccurate |= NdpPluginEnableFlag.isAccurate(condition) existFilter = true existLike |= isLike(condition) planContents :+= p.nodeName @@ -587,9 +600,13 @@ case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] { def castSumAvgToBigInt(expression: Expression): Expression = { val exp = expression.transform { - case agg@Average(cast: Cast) if cast.dataType.isInstanceOf[DoubleType] => + case Average(cast: Cast) if cast.dataType.isInstanceOf[DoubleType] + && cast.child.isInstanceOf[AttributeReference] + && cast.child.asInstanceOf[AttributeReference].name.startsWith("col")=> Average(Cast(cast.child, DataTypes.LongType)) - case agg@Sum(cast: Cast) if cast.dataType.isInstanceOf[DoubleType] => + case Sum(cast: Cast) if cast.dataType.isInstanceOf[DoubleType] + && cast.child.isInstanceOf[AttributeReference] + && cast.child.asInstanceOf[AttributeReference].name.startsWith("col")=> Sum(Cast(cast.child, DataTypes.LongType)) case e => e @@ -611,28 +628,17 @@ case class NdpOptimizerRules(session: SparkSession) extends Rule[LogicalPlan] { def castStringExpressionToBigint(expression: Expression): Expression = { expression match { - case a@AttributeReference(_, DataTypes.StringType, _, _) => + case a@AttributeReference(_, DataTypes.StringType, _, _) if a.name.startsWith("col") => Cast(a, DataTypes.LongType) case e => e } } - def turnOffOperator(): Unit = { session.sqlContext.setConf("org.apache.spark.sql.columnar.enabled", "false") session.sqlContext.setConf("spark.sql.join.columnar.preferShuffledHashJoin", "false") } - def isAccurate(condition: Expression): Boolean = { - var result = false - condition.foreach { - case literal: Literal if literal.value.toString.startsWith(ACCURATE_QUERY) => - result = true - case _ => - } - result - } - def isLike(condition: Expression): Boolean = { var result = false condition.foreach { @@ -652,8 +658,19 @@ class ColumnarPlugin extends (SparkSessionExtensions => Unit) with Logging { } object NdpPluginEnableFlag { - val ndpEnabledStr = "spark.omni.sql.ndpPlugin.enabled" + var ACCURATE_QUERY = "000" + + def isAccurate(condition: Expression): Boolean = { + var result = false + condition.foreach { + // literal need to check null + case literal: Literal if !literal.nullable && literal.value.toString.startsWith(ACCURATE_QUERY) => + result = true + case _ => + } + result + } def isMatchedIpAddress: Boolean = { val ipSet = Set("xxx.xxx.xxx.xxx") -- Gitee