From 9e6d4251e754f16d61a85999619e11a03b40b817 Mon Sep 17 00:00:00 2001 From: suiyi <1520835527@qq.com> Date: Thu, 14 Sep 2023 18:35:45 +0800 Subject: [PATCH 1/3] 1.fix partition column name to equalsIgnoreCase 2.get partition column value from PartitionedFile --- .../org/apache/spark/sql/DataIoAdapter.java | 34 +++++++++---------- .../java/org/apache/spark/sql/NdpUtils.java | 26 +++++++------- .../execution/command/NdpCommandUtils.scala | 33 +++++++++++++++++- .../datasources/FileScanRDDPushDown.scala | 11 ++++-- 4 files changed, 70 insertions(+), 34 deletions(-) diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java index 5f8f48b94..55907551a 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/DataIoAdapter.java @@ -134,7 +134,7 @@ public class DataIoAdapter { private Map columnNameMap = new HashMap<>(); - private Set partitionColumnName = new HashSet<>(); + private Map partitionColumnValues = new HashMap<>(); private List listAtt = new ArrayList<>(); @@ -170,7 +170,6 @@ public class DataIoAdapter { * * @param pageCandidate file split info * @param sparkOutPut data schema - * @param partitionColumn partition column * @param filterOutPut filter schema * @param pushDownOperators push down expressions * @param domains domain map @@ -181,17 +180,17 @@ public class DataIoAdapter { public Iterator getPageIterator( PageCandidate pageCandidate, Seq sparkOutPut, - Seq partitionColumn, Seq filterOutPut, PushDownInfo pushDownOperators, ImmutableMap domains, Boolean isColumnVector, - String omniGroupId) throws TaskExecutionException, UnknownHostException { + String omniGroupId, + Map partitionValues) throws TaskExecutionException, UnknownHostException { // initCandidates initCandidates(pageCandidate, filterOutPut); // add partition column - JavaConverters.seqAsJavaList(partitionColumn).forEach(a -> partitionColumnName.add(a.name())); + partitionColumnValues = partitionValues; // init column info if (pushDownOperators.aggExecutions().size() == 0) { @@ -376,8 +375,8 @@ public class DataIoAdapter { aggProjectionId = fieldMap.size(); fieldMap.put(aggColumnName, aggProjectionId); omnidataTypes.add(prestoType); - boolean isPartitionKey = partitionColumnName.contains(aggColumnName); - String partitionValue = NdpUtils.getPartitionValue(filePath, aggColumnName); + boolean isPartitionKey = partitionColumnValues.containsKey(aggColumnName); + String partitionValue = NdpUtils.getPartitionValue(aggColumnName, isPartitionKey, partitionColumnValues); omnidataColumns.add(new Column(columnId, aggColumnName, prestoType, isPartitionKey, partitionValue)); columnNameSet.add(aggColumnName); @@ -489,9 +488,9 @@ public class DataIoAdapter { columnNameMap.put(aggColumnName, field); int columnId = NdpUtils .getColumnId(expression.toString()) - columnOffset; - boolean isPartitionKey = partitionColumnName.contains(aggColumnName); + boolean isPartitionKey = partitionColumnValues.containsKey(aggColumnName); String partitionValue = NdpUtils - .getPartitionValue(filePath, aggColumnName); + .getPartitionValue(aggColumnName, isPartitionKey, partitionColumnValues); omnidataColumns.add( new Column(columnId, aggColumnName, prestoType, isPartitionKey, partitionValue)); @@ -776,11 +775,11 @@ public class DataIoAdapter { if (null != fieldMap.get(filterColumnName)) { return fieldMap.get(filterColumnName); } - boolean isPartitionKey = partitionColumnName.contains(filterColumnName); + boolean isPartitionKey = partitionColumnValues.containsKey(filterColumnName); int filterProjectionId = fieldMap.size(); fieldMap.put(filterColumnName, filterProjectionId); - String partitionValue = NdpUtils.getPartitionValue(filePath, filterColumnName); + String partitionValue = NdpUtils.getPartitionValue(filterColumnName, isPartitionKey, partitionColumnValues); columnNameSet.add(filterColumnName); omnidataColumns.add(new Column(columnId, filterColumnName, prestoType, isPartitionKey, partitionValue)); @@ -815,9 +814,9 @@ public class DataIoAdapter { Long fileStartPos = pageCandidate.getStartPos(); Long fileLen = pageCandidate.getSplitLen(); if ("ORC".equalsIgnoreCase(fileFormat)) { - dataSource = new HdfsOrcDataSource(filePath, fileStartPos, fileLen, false); + dataSource = new HdfsOrcDataSource(filePath, fileStartPos, fileLen, true); } else if ("PARQUET".equalsIgnoreCase(fileFormat)) { - dataSource = new HdfsParquetDataSource(filePath, fileStartPos, fileLen, false); + dataSource = new HdfsParquetDataSource(filePath, fileStartPos, fileLen, true); } else { throw new UnsupportedOperationException("unsupported data format : " + fileFormat); } @@ -861,8 +860,8 @@ public class DataIoAdapter { String columnName = resAttribute.name().toLowerCase(Locale.ENGLISH); Type type = NdpUtils.transOlkDataType(resAttribute.dataType(), resAttribute, false); int columnId = NdpUtils.getColumnId(resAttribute.toString()) - columnOffset; - isPartitionKey = partitionColumnName.contains(columnName); - String partitionValue = NdpUtils.getPartitionValue(filePath, columnName); + isPartitionKey = partitionColumnValues.containsKey(columnName); + String partitionValue = NdpUtils.getPartitionValue(columnName, isPartitionKey, partitionColumnValues); omnidataColumns.add(new Column(columnId, columnName, type, isPartitionKey, partitionValue)); omnidataTypes.add(type); @@ -890,7 +889,7 @@ public class DataIoAdapter { filterOrdersList.clear(); columnNameMap.clear(); columnOrder = 0; - partitionColumnName.clear(); + partitionColumnValues.clear(); listAtt = JavaConverters.seqAsJavaList(filterOutPut); isPushDownAgg = true; } @@ -905,7 +904,8 @@ public class DataIoAdapter { initCandidatesBeforeDomain(filterOutPut); // add partition column - JavaConverters.seqAsJavaList(partitionColumn).forEach(a -> partitionColumnName.add(a.name())); + JavaConverters.seqAsJavaList(partitionColumn).forEach(a -> + partitionColumnValues.put(a.name().toLowerCase(Locale.ENGLISH), "")); // init column info if (pushDownOperators.aggExecutions().size() == 0) { diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java index 1e787d7c0..18d279203 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java @@ -450,24 +450,24 @@ public class NdpUtils { : OptionalLong.of(limitExeInfo.get().limit()); } - public static String getPartitionValue(String filePath, String columnName) { - String[] filePathStrArray = filePath.split("\\/"); + public static String getPartitionValue( + String columnName, + boolean isPartitionKey, + Map partitionColumnValues) { String partitionValue = ""; - Pattern pn = Pattern.compile(columnName + "\\="); - for (String strColumn : filePathStrArray) { - Matcher matcher = pn.matcher(strColumn); - if (matcher.find()) { - partitionValue = strColumn.split("\\=")[1]; - if (partitionValue.contains("__HIVE_DEFAULT_PARTITION__")) { - partitionValue = null; - } - break; - } + if (!isPartitionKey) { + return partitionValue; + } + partitionValue = partitionColumnValues.get(columnName); + if (partitionValue.contains("__HIVE_DEFAULT_PARTITION__")) { + partitionValue = null; } return partitionValue; } - public static int getFpuHosts(int hostSize) { + + + public static int getFpuHosts(int hostSize) { return (int) (Math.random() * hostSize); } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala index 1d63ab4da..f09939c90 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala @@ -1,10 +1,12 @@ package org.apache.spark.sql.execution.command -import scala.collection.mutable +import org.apache.hadoop.fs.Path +import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -14,6 +16,8 @@ import org.apache.spark.sql.functions.countDistinct import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import java.util.Locale + object NdpCommandUtils extends Logging { private[sql] def computeColumnStats( @@ -200,4 +204,31 @@ object NdpCommandUtils extends Logging { cs.copy(histogram = Some(histogram)) } } + + private def partitionPathExpression(partitionColumns: Seq[Attribute], timeZoneId: String): Expression = { + Concat( + partitionColumns.zipWithIndex.flatMap { case (c, i) => + val partitionName = ScalaUDF( + ExternalCatalogUtils.getPartitionPathString _, + StringType, + Seq(Literal(c.name), Cast(c, StringType, Option(timeZoneId)))) + if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR_CHAR), partitionName) + }) + } + + def getPartitionPath( + partitionColumns: Seq[Attribute], + timeZoneId: String, + row: InternalRow): java.util.Map[String, String] = { + val proj = UnsafeProjection.create(Seq(partitionPathExpression(partitionColumns, timeZoneId)), partitionColumns) + val partitionValues: java.util.Map[String, String] = new java.util.HashMap[String, String]() + proj(row).getString(0).split(Path.SEPARATOR_CHAR).foreach { line => + val values = line.split("=") + if (values.length == 2) { + partitionValues.put(values(0).toLowerCase(Locale.ENGLISH), ExternalCatalogUtils.unescapePathName(values(1))) + } + } + partitionValues + } +} } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala index fadb8eae1..71051141a 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.datasources import com.google.common.collect.ImmutableMap import com.huawei.boostkit.omnidata.exception.OmniDataException - import java.util + import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.parquet.io.ParquetDecodingException @@ -40,8 +40,11 @@ import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.NextIterator - import java.io.{FileNotFoundException, IOException} + +import org.apache.spark.sql.execution.command.NdpCommandUtils +import org.apache.spark.sql.internal.SQLConf + import scala.util.Random @@ -517,7 +520,9 @@ class FileScanRDDPushDown( currentFile.length, columnOffset, sdiHosts, fileFormat.toString, maxFailedTimes, taskTimeout,operatorCombineEnabled) val dataIoPage = dataIoClass.getPageIterator(pageCandidate, output, - partitionColumns, filterOutput, pushDownOperators, domains, isColumnVector, omniGroupId) + filterOutput, pushDownOperators, domains, isColumnVector, omniGroupId, + NdpCommandUtils.getPartitionPath( + partitionColumns, SQLConf.get.sessionLocalTimeZone, currentFile.partitionValues)) currentIterator = pageToColumnarClass.transPageToColumnar(dataIoPage, isColumnVector, dataIoClass.isOperatorCombineEnabled, output, orcImpl).asScala.iterator iteHasNext() -- Gitee From 56c5a235934568299e20d28efd9914bca4988933 Mon Sep 17 00:00:00 2001 From: suiyi <1520835527@qq.com> Date: Thu, 14 Sep 2023 18:41:59 +0800 Subject: [PATCH 2/3] code review --- .../src/main/java/org/apache/spark/sql/NdpUtils.java | 4 +--- .../spark/sql/execution/command/NdpCommandUtils.scala | 8 +++----- .../sql/execution/datasources/FileScanRDDPushDown.scala | 5 ++--- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java index 18d279203..3cdc3c5bc 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java +++ b/omnidata/omnidata-spark-connector/connector/src/main/java/org/apache/spark/sql/NdpUtils.java @@ -465,9 +465,7 @@ public class NdpUtils { return partitionValue; } - - - public static int getFpuHosts(int hostSize) { + public static int getFpuHosts(int hostSize) { return (int) (Math.random() * hostSize); } diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala index f09939c90..ebaf2d4c3 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/command/NdpCommandUtils.scala @@ -216,10 +216,9 @@ object NdpCommandUtils extends Logging { }) } - def getPartitionPath( - partitionColumns: Seq[Attribute], - timeZoneId: String, - row: InternalRow): java.util.Map[String, String] = { + def getPartitionPath(partitionColumns: Seq[Attribute], + timeZoneId: String, + row: InternalRow): java.util.Map[String, String] = { val proj = UnsafeProjection.create(Seq(partitionPathExpression(partitionColumns, timeZoneId)), partitionColumns) val partitionValues: java.util.Map[String, String] = new java.util.HashMap[String, String]() proj(row).getString(0).split(Path.SEPARATOR_CHAR).foreach { line => @@ -231,4 +230,3 @@ object NdpCommandUtils extends Logging { partitionValues } } -} diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala index 71051141a..82e63ee25 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala @@ -519,9 +519,8 @@ class FileScanRDDPushDown( val pageCandidate = new PageCandidate(currentFile.filePath, currentFile.start, currentFile.length, columnOffset, sdiHosts, fileFormat.toString, maxFailedTimes, taskTimeout,operatorCombineEnabled) - val dataIoPage = dataIoClass.getPageIterator(pageCandidate, output, - filterOutput, pushDownOperators, domains, isColumnVector, omniGroupId, - NdpCommandUtils.getPartitionPath( + val dataIoPage = dataIoClass.getPageIterator(pageCandidate, output, filterOutput, pushDownOperators, domains, + isColumnVector, omniGroupId, NdpCommandUtils.getPartitionPath( partitionColumns, SQLConf.get.sessionLocalTimeZone, currentFile.partitionValues)) currentIterator = pageToColumnarClass.transPageToColumnar(dataIoPage, isColumnVector, dataIoClass.isOperatorCombineEnabled, output, orcImpl).asScala.iterator -- Gitee From 8998b5e1ed8aabb2176c06db40dfc257ca978638 Mon Sep 17 00:00:00 2001 From: suiyi <1520835527@qq.com> Date: Thu, 14 Sep 2023 18:48:02 +0800 Subject: [PATCH 3/3] code review --- .../sql/execution/datasources/FileScanRDDPushDown.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala index 82e63ee25..fcb07b74e 100644 --- a/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala +++ b/omnidata/omnidata-spark-connector/connector/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDDPushDown.scala @@ -32,19 +32,18 @@ import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.{DataIoAdapter, NdpUtils, PageCandidate, PageToColumnar, PushDownManager, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{And, Attribute, BasePredicate, Expression, Predicate, UnsafeProjection} +import org.apache.spark.sql.execution.command.NdpCommandUtils import org.apache.spark.sql.execution.ndp.NdpSupport.filterStripEnd import org.apache.spark.sql.execution.{QueryExecutionException, RowToColumnConverter} import org.apache.spark.sql.execution.ndp.{FilterExeInfo, NdpConf, PushDownInfo} import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ORC_IMPLEMENTATION import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.NextIterator import java.io.{FileNotFoundException, IOException} -import org.apache.spark.sql.execution.command.NdpCommandUtils -import org.apache.spark.sql.internal.SQLConf - import scala.util.Random -- Gitee